1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.service;
19
20 import junit.framework.TestCase;
21 import org.codehaus.activemq.broker.BrokerClient;
22 import org.codehaus.activemq.broker.impl.BrokerClientImpl;
23 import org.codehaus.activemq.filter.DestinationFilter;
24 import org.codehaus.activemq.filter.FilterFactoryImpl;
25 import org.codehaus.activemq.message.ActiveMQDestination;
26 import org.codehaus.activemq.message.ActiveMQMessage;
27 import org.codehaus.activemq.message.ActiveMQTextMessage;
28 import org.codehaus.activemq.message.ConsumerInfo;
29 import org.codehaus.activemq.service.impl.DispatcherImpl;
30 import org.codehaus.activemq.service.impl.DurableTopicMessageContainerManager;
31 import org.codehaus.activemq.service.impl.DurableTopicSubscription;
32 import org.codehaus.activemq.service.impl.DurableTopicSubscriptionContainerImpl;
33 import org.codehaus.activemq.service.impl.QueueMessageContainerManager;
34 import org.codehaus.activemq.store.PersistenceAdapter;
35 import org.codehaus.activemq.util.Callback;
36 import org.codehaus.activemq.util.IdGenerator;
37 import org.codehaus.activemq.util.TransactionTemplate;
38
39 import javax.jms.JMSException;
40 import javax.jms.TextMessage;
41 import java.io.IOException;
42
43 /***
44 * @version $Revision: 1.9 $
45 */
46 public abstract class MessageStoreTestSupport extends TestCase {
47 protected PersistenceAdapter persistenceAapter;
48 protected MessageContainer container;
49 protected Subscription subscription;
50 protected int publishMessageCount = 10;
51 protected int ackCount = 5;
52 protected ActiveMQMessage[] messages;
53 protected ActiveMQDestination destination;
54 protected IdGenerator idGenerator = new IdGenerator();
55 protected MessageContainerManager messageContainerManager;
56 protected BrokerClient client = new BrokerClientImpl();
57 protected TransactionTemplate template;
58
59 public void testRecovery() throws Exception {
60 System.out.println("Publishing: " + publishMessageCount + " messages");
61
62 for (int i = 0; i < publishMessageCount; i++) {
63 doAddMessage(i);
64 }
65
66 assertDeliveryList(0, publishMessageCount);
67
68
69 System.out.println("Acknowledging the first: " + ackCount + " messages");
70 for (int i = 0; i < ackCount; i++) {
71 doAcknowledgeMessage(i);
72 }
73
74
75 assertDeliveryList(0, 0);
76
77
78 closeAndReopenContainer();
79
80 assertDeliveryList(ackCount, publishMessageCount);
81
82 for (int i = ackCount; i < publishMessageCount; i++) {
83 doAcknowledgeMessage(i);
84 }
85 }
86
87 public void testRecoveryOfNewConsumerWhichHasYetToAck() throws Exception {
88 for (int i = 0; i < publishMessageCount; i++) {
89 doAddMessage(i);
90 }
91
92 assertDeliveryList(0, publishMessageCount);
93
94
95 assertDeliveryList(0, 0);
96
97
98 closeAndReopenContainer();
99
100 assertDeliveryList(0, publishMessageCount);
101 }
102
103 protected abstract void acknowledgeMessage(int i) throws JMSException;
104
105 protected abstract PersistenceAdapter createPersistenceAdapter() throws IOException, Exception;
106
107 protected abstract ActiveMQDestination createDestination();
108
109 protected abstract ActiveMQMessage[] getMessagesToDispatch() throws JMSException;
110
111
112 protected void doAcknowledgeMessage(final int i) throws JMSException {
113 template.run(new Callback() {
114 public void execute() throws Throwable {
115 acknowledgeMessage(i);
116 }
117 });
118 }
119
120 protected void doAddMessage(int i) throws JMSException {
121 final ActiveMQMessage message = getMessage(i);
122 template.run(new Callback() {
123 public void execute() throws Throwable {
124 container.addMessage(message);
125 }
126 });
127 }
128
129
130 protected MessageContainer createTopicMessageContainer() throws JMSException {
131 if (destination.isTopic()) {
132 return persistenceAapter.createTopicMessageContainer(destination.toString());
133 }
134 else {
135 return persistenceAapter.createQueueMessageContainer(destination.toString());
136 }
137 }
138
139 protected Subscription createSubscription() throws JMSException {
140 DestinationFilter filter = DestinationFilter.parseFilter(destination);
141 ConsumerInfo consumerInfo = createConsumerInfo();
142
143
144 messageContainerManager.addMessageConsumer(client, consumerInfo);
145
146 return new DurableTopicSubscription(new DispatcherImpl(), consumerInfo, filter);
147 }
148
149 protected ConsumerInfo createConsumerInfo() {
150 ConsumerInfo answer = new ConsumerInfo();
151 answer.setClientId(getClientID());
152 answer.setConsumerId(idGenerator.generateId());
153 answer.setConsumerName(getConsumerName());
154 answer.setDestination(destination);
155 answer.setPrefetchNumber(100);
156 answer.setSessionId(idGenerator.generateId());
157 answer.setStarted(true);
158 return answer;
159 }
160
161 protected String getConsumerName() {
162 return getName();
163 }
164
165 protected String getClientID() {
166 return getClass().getName();
167 }
168
169 protected void setUp() throws Exception {
170 super.setUp();
171 this.messages = new ActiveMQMessage[publishMessageCount];
172 this.destination = createDestination();
173
174 this.persistenceAapter = createPersistenceAdapter();
175 persistenceAapter.start();
176 persistenceAapter.beginTransaction();
177
178 template = new TransactionTemplate(persistenceAapter);
179
180 this.messageContainerManager = createMessageContainerManager();
181
182 this.container = messageContainerManager.getContainer(this.destination.getPhysicalName());
183 assertTrue("Should have created a container", container != null);
184
185 this.subscription = createSubscription();
186
187 }
188
189 protected void tearDown() throws Exception {
190 messageContainerManager.stop();
191 persistenceAapter.commitTransaction();
192 persistenceAapter.stop();
193 super.tearDown();
194 }
195
196 protected MessageContainerManager createMessageContainerManager() {
197 if (destination.isTopic()) {
198 return new DurableTopicMessageContainerManager(persistenceAapter, new DurableTopicSubscriptionContainerImpl(), new FilterFactoryImpl(), new DispatcherImpl());
199 }
200 else {
201 return new QueueMessageContainerManager(persistenceAapter, new DurableTopicSubscriptionContainerImpl(), new FilterFactoryImpl(), new DispatcherImpl());
202 }
203 }
204
205 protected void assertDeliveryList(final int startIndex, final int lastIndex) throws JMSException {
206 template.run(new Callback() {
207 public void execute() throws Throwable {
208 ActiveMQMessage[] messagesToDispatch = getMessagesToDispatch();
209 int count = lastIndex - startIndex;
210 assertTrue("Not enough messages available to dispatch. Expected: " + count
211 + " messages but was: " + messagesToDispatch.length, messagesToDispatch.length >= count);
212
213 for (int i = 0; i < count; i++) {
214 ActiveMQMessage expected = getMessage(i + startIndex);
215 ActiveMQMessage actual = messagesToDispatch[i];
216 assertMessagesEqual("Dispatched message at index: " + i, expected, actual);
217 }
218 }
219 });
220 }
221
222 protected void assertMessagesEqual(String description, ActiveMQMessage expected, ActiveMQMessage actual) throws JMSException {
223 assertEquals("MessageText compare. " + description, ((TextMessage) expected).getText(), ((TextMessage) actual).getText());
224 assertEquals("MessageID compare. " + description + " expected: " + expected + " actual: " + actual, expected.getJMSMessageID(), actual.getJMSMessageID());
225 assertEquals(description, expected, actual);
226 }
227
228 protected ActiveMQMessage getMessage(int i) throws JMSException {
229 if (messages[i] == null) {
230 messages[i] = createMessage(i);
231 }
232 return messages[i];
233 }
234
235 protected ActiveMQMessage createMessage(int i) throws JMSException {
236 ActiveMQTextMessage answer = new ActiveMQTextMessage();
237 answer.setJMSMessageID(idGenerator.generateId());
238 answer.setJMSClientID(getClientID());
239 answer.setJMSDestination(destination);
240 answer.setText("message index: " + i);
241 return answer;
242 }
243
244 protected void closeAndReopenContainer() throws Exception {
245 subscription.clear();
246
247 messageContainerManager.stop();
248 persistenceAapter.commitTransaction();
249 persistenceAapter.stop();
250
251 persistenceAapter = createPersistenceAdapter();
252 persistenceAapter.start();
253 persistenceAapter.beginTransaction();
254
255 template = new TransactionTemplate(persistenceAapter);
256
257 this.messageContainerManager = createMessageContainerManager();
258
259 container = messageContainerManager.getContainer(destination.getPhysicalName());
260
261 this.subscription = createSubscription();
262
263 template.run(new Callback() {
264 public void execute() throws Throwable {
265 recover();
266 }
267 });
268 }
269
270 protected void recover() throws JMSException {
271 }
272
273 protected String getSubject() {
274 return getClass().getName() + "." + getName();
275 }
276 }