1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq;
19  
20  import javax.jms.Destination;
21  import javax.jms.Message;
22  import javax.jms.MessageConsumer;
23  import javax.jms.MessageListener;
24  import javax.jms.MessageProducer;
25  import javax.jms.Session;
26  import javax.jms.TextMessage;
27  import java.util.ArrayList;
28  import java.util.Arrays;
29  import java.util.Collections;
30  import java.util.Date;
31  import java.util.Iterator;
32  import java.util.List;
33  
34  /***
35   * @version $Revision: 1.18 $
36   */
37  public class JmsSendReceiveTestSupport extends TestSupport implements MessageListener {
38      protected int messageCount = 100;
39      protected String[] data;
40  
41      protected Session session;
42      protected MessageConsumer consumer;
43      protected MessageProducer producer;
44      protected Destination destination;
45  
46      protected List messages = Collections.synchronizedList(new ArrayList());
47      protected boolean topic = true;
48      protected boolean durable = false;
49      protected final Object lock = new Object();
50      protected boolean verbose = false;
51  
52      public void testSendReceive() throws Exception {
53          messages.clear();
54  
55          for (int i = 0; i < data.length; i++) {
56              Message message = session.createTextMessage(data[i]);
57  
58              if (verbose) {
59                  System.out.println("About to send a message: " + message + " with text: " + data[i]);
60              }
61  
62              producer.send(destination, message);
63          }
64  
65          waitForMessagesToBeDelivered();
66  
67          int counter = 0;
68          List copyOfMessages = Arrays.asList(messages.toArray());
69          if (data.length != copyOfMessages.size()) {
70              for (Iterator iter = copyOfMessages.iterator(); iter.hasNext();) {
71                  TextMessage message = (TextMessage) iter.next();
72                  System.out.println("<== " + counter++ + " = " + message);
73              }
74          }
75  
76          assertEquals("Not enough messages received", data.length, messages.size());
77  
78          for (int i = 0; i < data.length; i++) {
79              TextMessage received = (TextMessage) messages.get(i);
80              String text = received.getText();
81  
82              if (verbose) {
83                  System.out.println("Received Text: " + text);
84              }
85  
86              assertEquals("Message: " + i, data[i], text);
87          }
88      }
89  
90      protected void waitForMessagesToBeDelivered() {
91          // lets wait a little while - note we might miss some notify() calls
92          // as we might not be always waiting for the lock when the notify occurs
93          int count = 0;
94          while (count < data.length) {
95              waitForTimeOrNotify(4000L);
96  
97              // we may have missed a few notify() calls, so skip them
98              int size = messages.size();
99              if (size > count) {
100                 count = size;
101             }
102             else {
103                 count++;
104             }
105         }
106 
107         if (messages.size() < data.length) {
108             // one more just for good luck
109             waitForTimeOrNotify(5000L);
110         }
111     }
112 
113     private void waitForTimeOrNotify(long timeout) {
114         try {
115             synchronized (lock) {
116                 lock.wait(timeout);
117             }
118         }
119         catch (InterruptedException e) {
120             System.out.println("Caught: " + e);
121         }
122     }
123 
124 
125     protected void setUp() throws Exception {
126         super.setUp();
127         String temp = System.getProperty("messageCount");
128         if (temp != null) {
129             int i = Integer.parseInt(temp);
130             if (i > 0) {
131                 messageCount = i;
132             }
133         }
134         System.out.println("Message count for test case is: " + messageCount);
135 
136         data = new String[messageCount];
137         for (int i = 0; i < messageCount; i++) {
138             data[i] = "Text for message: " + i + " at " + new Date();
139         }
140     }
141 
142     public synchronized void onMessage(Message message) {
143         if (verbose) {
144             System.out.println("Received message: " + message);
145         }
146 
147         messages.add(message);
148         synchronized (lock) {
149             lock.notifyAll();
150         }
151     }
152 }