1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq;
19  
20  import javax.jms.Connection;
21  import javax.jms.ConnectionFactory;
22  import javax.jms.Destination;
23  import javax.jms.Message;
24  import javax.jms.MessageConsumer;
25  import javax.jms.MessageProducer;
26  import javax.jms.Session;
27  import java.util.ArrayList;
28  
29  /***
30   * @version $Revision: 1.6 $
31   */
32  abstract public class JmsTransactionTestSupport extends TestSupport {
33  
34      protected ConnectionFactory connectionFactory;
35      protected Connection connection;
36      protected Session session;
37      protected MessageConsumer consumer;
38      protected MessageProducer producer;
39  
40      public JmsTransactionTestSupport() {
41          super();
42      }
43  
44      public JmsTransactionTestSupport(String name) {
45          super(name);
46      }
47  
48      protected abstract JmsResourceProvider getJmsResourceProvider();
49  
50      protected void setUp() throws Exception {
51          super.setUp();
52  
53          JmsResourceProvider p = getJmsResourceProvider();
54          // We will be using transacted sessions.
55          p.setTransacted(true);
56  
57          connectionFactory = p.createConnectionFactory();
58          connection = p.createConnection(connectionFactory);
59          System.out.println("Created connection: " + connection);
60          session = p.createSession(connection);
61          System.out.println("Created session: " + session);
62          Destination destination = p.createDestination(session, getSubject() + "." + getName());
63          System.out.println("Created destination: " + destination + " of type: " + destination.getClass());
64          producer = p.createProducer(session, destination);
65          System.out.println("Created producer: " + producer);
66          consumer = p.createConsumer(session, destination);
67          System.out.println("Created consumer: " + consumer);
68          connection.start();
69      }
70  
71      protected void tearDown() throws Exception {
72          //System.out.println("Test Done.  Stats");
73          //((ActiveMQConnectionFactory) connectionFactory).getFactoryStats().dump(new IndentPrinter());
74          System.out.println("Closing down connection");
75  
76          session.close();
77          connection.close();
78          System.out.println("Connection closed.");
79      }
80  
81      public void testSendRollback() throws Exception {
82  
83          Message[] outbound = new Message[]{
84              session.createTextMessage("First Message"),
85              session.createTextMessage("Second Message")
86          };
87  
88          producer.send(outbound[0]);
89          session.commit();
90          producer.send(session.createTextMessage("I'm going to get rolled back."));
91          session.rollback();
92          producer.send(outbound[1]);
93          session.commit();
94  
95          ArrayList messages = new ArrayList();
96          System.out.println("About to consume message 1");
97          Message message = consumer.receive(1000);
98          messages.add(message);
99          System.out.println("Received: " + message);
100 
101         System.out.println("About to consume message 2");
102         message = consumer.receive(4000);
103         messages.add(message);
104         System.out.println("Received: " + message);
105 
106         session.commit();
107 
108         Message inbound[] = new Message[messages.size()];
109         messages.toArray(inbound);
110 
111         assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
112     }
113 
114     public void testReceiveRollback() throws Exception {
115 
116         Message[] outbound = new Message[]{
117             session.createTextMessage("First Message"),
118             session.createTextMessage("Second Message")
119         };
120 
121         // lets consume any outstanding messages from previous test runs
122         while (consumer.receive(1000) != null) {
123         }
124         session.commit();
125 
126         producer.send(outbound[0]);
127         producer.send(outbound[1]);
128         session.commit();
129 
130         System.out.println("Sent 0: " + outbound[0]);
131         System.out.println("Sent 1: " + outbound[1]);
132 
133         ArrayList messages = new ArrayList();
134         Message message = consumer.receive(1000);
135         messages.add(message);
136         assertEquals(outbound[0], message);
137         session.commit();
138 
139         // rollback so we can get that last message again.
140         message = consumer.receive(1000);
141         assertNotNull(message);
142         assertEquals(outbound[1], message);
143         session.rollback();
144 
145         // Consume again.. the previous message should
146         // get redelivered.
147         message = consumer.receive(5000);
148         assertNotNull("Should have re-received the message again!", message);
149         messages.add(message);
150         session.commit();
151 
152         Message inbound[] = new Message[messages.size()];
153         messages.toArray(inbound);
154 
155         assertTextMessagesEqual("Rollback did not work", outbound, inbound);
156     }
157 
158     public void testSendRollbackWithPrefetchOfOne() throws Exception {
159         setPrefetchToOne();
160         testSendRollback();
161     }
162 
163     public void testReceiveRollbackWithPrefetchOfOne() throws Exception {
164         setPrefetchToOne();
165         testReceiveRollback();
166     }
167 
168     protected void setPrefetchToOne() {
169         ActiveMQPrefetchPolicy prefetchPolicy = ((ActiveMQConnection) connection).getPrefetchPolicy();
170         prefetchPolicy.setQueuePrefetch(1);
171         prefetchPolicy.setTopicPrefetch(1);
172         prefetchPolicy.setDurableTopicPrefetch(1);
173     }
174 }