1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.transport;
19  
20  import junit.framework.TestCase;
21  import org.codehaus.activemq.message.ActiveMQMessage;
22  import org.codehaus.activemq.message.DefaultWireFormat;
23  import org.codehaus.activemq.message.Packet;
24  import org.codehaus.activemq.message.PacketListener;
25  import org.codehaus.activemq.message.Receipt;
26  import org.codehaus.activemq.message.WireFormat;
27  import org.codehaus.activemq.util.IdGenerator;
28  
29  import javax.jms.ExceptionListener;
30  import javax.jms.JMSException;
31  import java.net.URI;
32  import java.net.URISyntaxException;
33  import java.util.ArrayList;
34  import java.util.List;
35  import java.util.Vector;
36  
37  /***
38   * @version $Revision: 1.17 $
39   */
40  public class TransportChannelTestSupport extends TestCase implements PacketListener, TransportChannelListener {
41  
42      protected int TEST_SIZE = 100;
43      protected Object mutex;
44      protected TransportChannel sender;
45      protected TransportChannel receiver;
46      protected TransportServerChannel server;
47      protected ArrayList packets;
48      protected List exceptions = new Vector();
49      protected boolean rpcTest = false;
50      private IdGenerator idGenerator = new IdGenerator();
51      protected WireFormat wireFormat = new DefaultWireFormat();
52      private boolean closeReceiver = true;
53  
54      public TransportChannelTestSupport(String name) {
55          super(name);
56      }
57  
58      /*
59       * test for Receipt send(Packet, int)
60       */
61      public void testSendPacket() throws Exception {
62          System.out.println("Sending packets");
63  
64          List tmpList = (List) packets.clone();
65          for (int i = 0; i < TEST_SIZE; i++) {
66              sender.asyncSend((Packet) tmpList.get(i));
67          }
68          System.out.println("Sent: " + TEST_SIZE + " packets");
69  
70          for (int i = 0; i < 10; i++) {
71              synchronized (mutex) {
72                  if (!packets.isEmpty()) {
73                      mutex.wait(500);
74                  }
75              }
76          }
77          assertTrue("Packets not consumed, still have: " + packets.size() + " packet(s) unconsumed", packets.isEmpty());
78          assertTrue("Exceptions were thrown: " + exceptions, exceptions.size() == 0);
79      }
80  
81      public void testRpc() throws Exception {
82          rpcTest = true;
83  
84          List tmpList = (List) packets.clone();
85          for (int i = 0; i < TEST_SIZE; i++) {
86              Packet packet = (Packet) tmpList.get(i);
87              Receipt receipt = sender.send(packet, 4000);
88              assertTrue("Receipt should not be null!", receipt != null);
89              System.out.println("Got receipt: " + receipt + " for packet: " + packet);
90          }
91  
92      }
93  
94      public void consume(Packet packet) {
95          System.out.println("Received packet: " + packet);
96  
97          if (rpcTest) {
98              // lets send a receipt
99              Receipt receipt = new Receipt();
100             receipt.setId(idGenerator.generateId());
101             receipt.setCorrelationId(packet.getId());
102             try {
103                 receiver.asyncSend(receipt);
104             }
105             catch (JMSException e) {
106                 logMessage("Sending receipt: " + receipt + " for packet: " + packet, e);
107             }
108         }
109         else {
110             packets.remove(packet);
111             if (packets.isEmpty()) {
112                 synchronized (mutex) {
113                     mutex.notify();
114                 }
115             }
116         }
117     }
118 
119     /***
120      * Assume that sender and receiver are created before we're invoked
121      */
122     protected void setUp() throws Exception {
123         super.setUp();
124 
125         assertTrue("sender must be constructed in the TestCase before setUp() is invoked", sender != null);
126         assertTrue("receiver or server must be constructed in the TestCase before setUp() is invoked", receiver != null
127                 || server != null);
128 
129         mutex = new Object();
130 
131         sender.setExceptionListener(new ExceptionListener() {
132 
133             public void onException(JMSException ex) {
134                 String message = "Sender got an exception:";
135                 logMessage(message, ex);
136             }
137         });
138 
139         sender.setPacketListener(new PacketListener() {
140 
141             public void consume(Packet packet) {
142                 System.err.println("Error - sender received a packet: " + packet);
143                 exceptions.add(packet);
144             }
145 
146         });
147 
148         sender.setClientID("sender");
149         sender.start();
150 
151         packets = new ArrayList(TEST_SIZE);
152         for (int i = 0; i < TEST_SIZE; i++) {
153             Packet test = new ActiveMQMessage();
154             test.setId("test:" + i);
155             packets.add(test);
156         }
157     }
158 
159     protected void tearDown() throws Exception {
160         super.tearDown();
161 
162         System.out.println("Stopping sender");
163         sender.stop();
164         if (receiver == null) {
165             System.out.println("No receiver created!");
166         }
167         else {
168             if (closeReceiver) {
169                 System.out.println("Stopping receiver");
170                 //assertTrue("No receiver created!", receiver != null);
171                 receiver.stop();
172             }
173             else {
174                 System.out.println("Receiver will be closed by the server");
175             }
176         }
177         if (server != null) {
178             System.out.println("Stopping server");
179             server.stop();
180         }
181         assertTrue("Exceptions were thrown: " + exceptions, exceptions.size() == 0);
182     }
183 
184     protected void configureServer() throws JMSException {
185         if (server != null) {
186             server.setTransportChannelListener(this);
187             server.start();
188             System.out.println("Server has started");
189 
190             // lets wait a little for the server to startup
191 /*
192             try {
193                 Thread.sleep(500);
194             }
195             catch (InterruptedException e) {
196                 System.out.println("Caught: " + e);
197                 e.printStackTrace();
198             }
199 */
200         }
201     }
202 
203     protected void configureReceiver() {
204         receiver.setPacketListener(this);
205 
206         receiver.setExceptionListener(new ExceptionListener() {
207 
208             public void onException(JMSException ex) {
209                 logMessage("Receiver got an exception:", ex);
210             }
211 
212         });
213 
214         receiver.setClientID("receiver");
215 
216         try {
217             receiver.start();
218         }
219         catch (JMSException e) {
220             logMessage("Failure starting receiver: ", e);
221         }
222         System.out.println("Receiver has started");
223     }
224 
225 
226     protected void createSenderAndReceiver(String string) throws URISyntaxException, JMSException {
227         URI uri = new URI(string);
228         sender = TransportChannelProvider.create(wireFormat, uri);
229         receiver = TransportChannelProvider.create(wireFormat, uri);
230         if (receiver != null) {
231             configureReceiver();
232         }
233     }
234 
235     protected void createSenderAndServer(String subject) throws URISyntaxException, JMSException {
236         URI uri = new URI(subject);
237         server = TransportServerChannelProvider.create(wireFormat, uri);
238         configureServer();
239         sender = TransportChannelProvider.create(wireFormat, uri);
240     }
241 
242     protected void logMessage(String message, JMSException ex) {
243         System.err.println(message);
244         ex.printStackTrace();
245         Throwable t = ex.getLinkedException();
246         if (t != null && t != ex) {
247             System.out.println("Reason: " + t);
248             t.printStackTrace();
249         }
250         exceptions.add(ex);
251     }
252 
253     public void addClient(TransportChannel channel) {
254         this.receiver = channel;
255         this.closeReceiver = false;
256 
257         System.out.println("addClient() with receiver: " + receiver);
258 
259         assertTrue("Should have received a receiver by now", receiver != null);
260 
261         configureReceiver();
262     }
263 
264     public void removeClient(TransportChannel channel) {
265     }
266 }