View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.transport;
20  
21  import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
22  import EDU.oswego.cs.dl.util.concurrent.Executor;
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.codehaus.activemq.message.Packet;
26  import org.codehaus.activemq.message.PacketListener;
27  import org.codehaus.activemq.message.Receipt;
28  import org.codehaus.activemq.message.ReceiptHolder;
29  import org.codehaus.activemq.util.ExecutorHelper;
30  
31  import javax.jms.ExceptionListener;
32  import javax.jms.JMSException;
33  import java.net.URI;
34  import java.util.HashMap;
35  import java.util.Iterator;
36  import java.util.Map;
37  
38  /***
39   * Some basic functionality, common across most transport implementations of channels
40   *
41   * @version $Revision: 1.10 $
42   */
43  public abstract class AbstractTransportChannel implements TransportChannel {
44      private static final Log log = LogFactory.getLog(AbstractTransportChannel.class);
45  
46      private CopyOnWriteArrayList listeners = new CopyOnWriteArrayList();
47      private HashMap requestMap = new HashMap();
48      private PacketListener packetListener;
49      private ExceptionListener exceptionListener;
50      private String clientID;
51      private TransportChannelListener transportChannelListener;
52  
53  
54      /***
55       * close the channel
56       */
57      public void stop() {
58          Map map = (Map) this.requestMap.clone();
59          for (Iterator i = map.values().iterator(); i.hasNext();) {
60              ReceiptHolder rh = (ReceiptHolder) i.next();
61              rh.close();
62          }
63          map.clear();
64          requestMap.clear();
65          if (transportChannelListener != null) {
66              transportChannelListener.removeClient(this);
67          }
68          exceptionListener = null;
69          packetListener = null;
70      }
71  
72      /***
73       * synchronously send a Packet
74       *
75       * @param packet
76       * @return a Receipt
77       * @throws JMSException
78       */
79      public Receipt send(Packet packet) throws JMSException {
80          return send(packet, 0);
81      }
82  
83      /***
84       * Synchronously send a Packet
85       *
86       * @param packet  packet to send
87       * @param timeout amount of time to wait for a receipt
88       * @return the Receipt
89       * @throws JMSException
90       */
91      public Receipt send(Packet packet, int timeout) throws JMSException {
92          ReceiptHolder rh = new ReceiptHolder();
93          requestMap.put(packet.getId(), rh);
94          doAsyncSend(packet);
95          Receipt result = rh.getReceipt(timeout);
96          return result;
97      }
98  
99  
100     // Properties
101     //-------------------------------------------------------------------------
102 
103     public TransportChannelListener getTransportChannelListener() {
104         return transportChannelListener;
105     }
106 
107     public void setTransportChannelListener(TransportChannelListener transportChannelListener) {
108         this.transportChannelListener = transportChannelListener;
109     }
110 
111     /***
112      * Add a listener for changes in a channels status
113      *
114      * @param listener
115      */
116     public void addTransportStatusEventListener(TransportStatusEventListener listener) {
117         listeners.add(listener);
118     }
119 
120     /***
121      * Remove a listener for changes in a channels status
122      *
123      * @param listener
124      */
125     public void removeTransportStatusEventListener(TransportStatusEventListener listener) {
126         listeners.remove(listener);
127     }
128 
129     public String getClientID() {
130         return clientID;
131     }
132 
133     public void setClientID(String clientID) {
134         this.clientID = clientID;
135     }
136 
137     public ExceptionListener getExceptionListener() {
138         return exceptionListener;
139     }
140 
141     public PacketListener getPacketListener() {
142         return packetListener;
143     }
144 
145     /***
146      * Set a listener for Packets
147      *
148      * @param l
149      */
150     public void setPacketListener(PacketListener l) {
151         this.packetListener = l;
152     }
153 
154 
155     /***
156      * Set an exception listener to listen for asynchronously generated exceptions
157      *
158      * @param listener
159      */
160     public void setExceptionListener(ExceptionListener listener) {
161         this.exceptionListener = listener;
162     }
163 
164     // Implementation methods
165     //-------------------------------------------------------------------------
166 
167     /***
168      * consume a packet from the channel
169      *
170      * @param packet
171      */
172     protected void doConsumePacket(Packet packet) {
173         if (!doHandleReceipt(packet)) {
174             if (packetListener != null) {
175                 packetListener.consume(packet);
176             }
177             else {
178                 log.warn("No packet listener set to receive packets");
179             }
180         }
181     }
182 
183     protected boolean doHandleReceipt(Packet packet) {
184         boolean result = false;
185         if (packet != null) {
186             if (packet.isReceipt()) {
187                 result = true;
188                 Receipt receipt = (Receipt) packet;
189                 ReceiptHolder rh = (ReceiptHolder) requestMap.remove(receipt.getCorrelationId());
190                 if (rh != null) {
191                     rh.setReceipt(receipt);
192                 }
193                 else {
194                     log.warn("No Packet found to match Receipt correlationId: " + receipt.getCorrelationId());
195                 }
196             }
197         }
198         return result;
199     }
200 
201     /***
202      * send a Packet to the raw underlying transport
203      * This method is here to allow specific implementations
204      * to override this method
205      *
206      * @param packet
207      * @throws JMSException
208      */
209     protected void doAsyncSend(Packet packet) throws JMSException {
210         asyncSend(packet);
211     }
212 
213     /***
214      * Handles an exception thrown while performing async dispatch of messages
215      *
216      * @param e
217      */
218     protected void onAsyncException(JMSException e) {
219         if (exceptionListener != null) {
220             exceptionListener.onException(e);
221         }
222         else {
223             log.warn("Caught exception dispatching message and no ExceptionListener registered: " + e, e);
224         }
225     }
226 
227     /***
228      * Fire status event to any status event listeners
229      *
230      * @param remoteURI
231      * @param status
232      */
233     protected void fireStatusEvent(URI remoteURI, int status) {
234         TransportStatusEvent event = new TransportStatusEvent();
235         event.setChannelStatus(status);
236         event.setRemoteURI(remoteURI);
237         fireStatusEvent(event);
238     }
239 
240     /***
241      * Fire status event to any status event listeners
242      *
243      * @param event
244      */
245     protected void fireStatusEvent(TransportStatusEvent event) {
246         if (event != null) {
247             for (Iterator i = listeners.iterator(); i.hasNext();) {
248                 TransportStatusEventListener l = (TransportStatusEventListener) i.next();
249                 l.statusChanged(event);
250             }
251         }
252     }
253 
254     /***
255      * A helper method to stop the execution of an executor
256      *
257      * @param executor the executor or null if one is not created yet
258      */
259     protected void stopExecutor(Executor executor) throws InterruptedException, JMSException {
260         ExecutorHelper.stopExecutor(executor);
261     }
262 }