1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.usecases;
19
20 import org.codehaus.activemq.TestSupport;
21
22 import javax.jms.Connection;
23 import javax.jms.DeliveryMode;
24 import javax.jms.Destination;
25 import javax.jms.JMSException;
26 import javax.jms.Message;
27 import javax.jms.MessageConsumer;
28 import javax.jms.MessageProducer;
29 import javax.jms.Session;
30 import javax.jms.Topic;
31
32 /***
33 * @version $Revision: 1.5 $
34 */
35 public class DurableConsumerCloseAndReconnectTest extends TestSupport {
36 protected static final long RECEIVE_TIMEOUT = 5000L;
37
38 private Connection connection;
39 private Session session;
40 private MessageConsumer consumer;
41 private MessageProducer producer;
42 private Destination destination;
43
44 public void testCreateDurableConsumerCloseThenReconnect() throws Exception {
45
46 Connection dummyConnection = createConnection();
47
48 testConsumeMessagesDeliveredWhileConsumerClosed();
49
50 dummyConnection.close();
51 }
52
53 public void testConsumeMessagesDeliveredWhileConsumerClosed() throws Exception {
54 makeConsumer();
55 closeConsumer();
56
57 publish();
58
59
60 Thread.sleep(1000);
61
62 makeConsumer();
63
64 Message message = consumer.receive(RECEIVE_TIMEOUT);
65 assertTrue("Should have received a message!", message != null);
66
67 closeConsumer();
68
69 System.out.println("Now lets create the consumer again and because we didn't ack, we should get it again");
70 makeConsumer();
71
72 message = consumer.receive(RECEIVE_TIMEOUT);
73 assertTrue("Should have received a message!", message != null);
74 message.acknowledge();
75
76 closeConsumer();
77
78 System.out.println("Now lets create the consumer again and because we didn't ack, we should get it again");
79 makeConsumer();
80
81 message = consumer.receive(2000);
82 assertTrue("Should have no more messages left!", message == null);
83
84 closeConsumer();
85
86 System.out.println("Lets publish one more message now");
87 publish();
88
89 makeConsumer();
90 message = consumer.receive(RECEIVE_TIMEOUT);
91 assertTrue("Should have received a message!", message != null);
92 message.acknowledge();
93
94 closeConsumer();
95 }
96
97 protected void publish() throws JMSException {
98 connection = createConnection();
99 connection.start();
100
101 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
102 destination = createDestination();
103
104 producer = session.createProducer(destination);
105 producer.setDeliveryMode(DeliveryMode.PERSISTENT);
106
107 producer.send(session.createTextMessage("This is a test"));
108
109 producer.close();
110 producer = null;
111 closeSession();
112 }
113
114 protected Destination createDestination() throws JMSException {
115 if (isTopic()) {
116 return session.createTopic(getSubject());
117 }
118 else {
119 return session.createQueue(getSubject());
120 }
121 }
122
123 protected boolean isTopic() {
124 return true;
125 }
126
127 protected void closeConsumer() throws JMSException {
128 consumer.close();
129 consumer = null;
130 closeSession();
131 }
132
133 protected void closeSession() throws JMSException {
134 session.close();
135 session = null;
136 connection.close();
137 connection = null;
138 }
139
140 protected void makeConsumer() throws JMSException {
141 String durableName = getName();
142 String clientID = getSubject();
143 System.out.println("Creating a durable subscribe for clientID: " + clientID + " and durable name: " + durableName);
144 createSession(clientID);
145 consumer = createConsumer(durableName);
146 }
147
148 private MessageConsumer createConsumer(String durableName) throws JMSException {
149 if (destination instanceof Topic) {
150 return session.createDurableSubscriber((Topic) destination, durableName);
151 }
152 else {
153 return session.createConsumer(destination);
154 }
155 }
156
157 protected void createSession(String clientID) throws JMSException {
158 connection = createConnection();
159 connection.setClientID(clientID);
160 connection.start();
161
162 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
163 destination = createDestination();
164 }
165 }