View Javadoc

1   /*** 
2    * Licensed under the Apache License, Version 2.0 (the "License"); 
3    * you may not use this file except in compliance with the License. 
4    * You may obtain a copy of the License at 
5    * 
6    * http://www.apache.org/licenses/LICENSE-2.0
7    * 
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS, 
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
11   * See the License for the specific language governing permissions and 
12   * limitations under the License. 
13   * 
14   **/
15  package org.jencks.pool;
16  
17  import java.io.Serializable;
18  
19  import javax.jms.XASession;
20  import javax.jms.TopicSession;
21  import javax.jms.QueueSession;
22  import javax.jms.JMSException;
23  import javax.jms.BytesMessage;
24  import javax.jms.MapMessage;
25  import javax.jms.Message;
26  import javax.jms.ObjectMessage;
27  import javax.jms.Queue;
28  import javax.jms.StreamMessage;
29  import javax.jms.TemporaryQueue;
30  import javax.jms.TemporaryTopic;
31  import javax.jms.TextMessage;
32  import javax.jms.Topic;
33  import javax.jms.QueueBrowser;
34  import javax.jms.MessageConsumer;
35  import javax.jms.Destination;
36  import javax.jms.TopicSubscriber;
37  import javax.jms.MessageListener;
38  import javax.jms.QueueReceiver;
39  import javax.jms.MessageProducer;
40  import javax.jms.QueueSender;
41  import javax.jms.TopicPublisher;
42  import javax.jms.Session;
43  import javax.transaction.xa.XAResource;
44  
45  import org.apache.commons.pool.ObjectPool;
46  import org.apache.commons.logging.Log;
47  import org.apache.commons.logging.LogFactory;
48  
49  public class PooledSpringXASession implements TopicSession, QueueSession, XASession
50  {
51    private static final Log log = LogFactory.getLog(PooledSpringXASession.class);
52    
53    private XASession session;
54    private ObjectPool sessionPool;
55    private MessageProducer messageProducer;
56    private QueueSender queueSender;
57    private TopicPublisher topicPublisher;
58    private boolean ignoreClose;
59  
60    public PooledSpringXASession(final XASession session, final ObjectPool sessionPool)
61    {
62      this.session = session;
63      this.sessionPool = sessionPool;
64      this.ignoreClose = false;
65    }
66  
67    public boolean getIgnoreClose()
68    {
69      return this.ignoreClose;
70    }
71  
72    public void setIgnoreClose(final boolean ignoreClose)
73    {
74      this.ignoreClose = ignoreClose;
75    }
76  
77    /***
78     * If the Session goes into an unstable (unusable) state, then we want to
79     * close it down and permanently remove it from the pool.
80     */
81    public void destroyAndRemoveFromPool()
82    {
83      try {
84        sessionPool.invalidateObject(this);
85      } catch (Throwable t) {
86        log.warn("Unable to remove invalidated JMS Session from the pool due to the following exception.  Will ignore the exception and continue.", t);
87      }
88    }
89  
90  
91    public void close() throws JMSException
92    {
93      if(log.isDebugEnabled()) log.debug("---->>>>> PooledSpringXASession.close() called");
94      // If we are associated with a transaction, then we will let
95      // PooledSpringXAConnection's transaction synchronization handle closing
96      // us at the end of the transaction.
97      if(!getIgnoreClose()) {
98        if(log.isDebugEnabled()) log.debug("---->>>>> ignoreClose = false, so returning session pool...");
99        // TODO a cleaner way to reset??
100 
101       // lets reset the session
102       getActualSession().setMessageListener(null);
103 
104       try {
105         sessionPool.returnObject(this);
106       }
107       catch (Exception e) {
108         final JMSException jmsException = new JMSException("Failed to return session to pool: " + e);
109         jmsException.initCause(e);
110         throw jmsException;
111       }
112     } else if(log.isDebugEnabled()) {
113       log.debug("---->>>>> ignoreClose IS TRUE!  KEEPING SESSION OPEN!");
114     }
115   }
116 
117   public void commit() throws JMSException
118   {
119     throw new JMSException("Cannot commit() inside XASession");
120   }
121 
122   public BytesMessage createBytesMessage() throws JMSException
123   {
124     return getActualSession().createBytesMessage();
125   }
126 
127   public MapMessage createMapMessage() throws JMSException
128   {
129     return getActualSession().createMapMessage();
130   }
131 
132   public Message createMessage() throws JMSException
133   {
134     return getActualSession().createMessage();
135   }
136 
137   public ObjectMessage createObjectMessage() throws JMSException
138   {
139     return getActualSession().createObjectMessage();
140   }
141 
142   public ObjectMessage createObjectMessage(Serializable serializable)
143       throws JMSException
144   {
145     return getActualSession().createObjectMessage(serializable);
146   }
147 
148   public Queue createQueue(String s) throws JMSException
149   {
150     return getActualSession().createQueue(s);
151   }
152 
153   public StreamMessage createStreamMessage() throws JMSException
154   {
155     return getActualSession().createStreamMessage();
156   }
157 
158   public TemporaryQueue createTemporaryQueue() throws JMSException
159   {
160     return getActualSession().createTemporaryQueue();
161   }
162 
163   public TemporaryTopic createTemporaryTopic() throws JMSException
164   {
165     return getActualSession().createTemporaryTopic();
166   }
167 
168   public void unsubscribe(String s) throws JMSException
169   {
170     getActualSession().unsubscribe(s);
171   }
172 
173   public TextMessage createTextMessage() throws JMSException
174   {
175     return getActualSession().createTextMessage();
176   }
177 
178   public TextMessage createTextMessage(String s) throws JMSException
179   {
180     return getActualSession().createTextMessage(s);
181   }
182 
183   public Topic createTopic(String s) throws JMSException
184   {
185     return getActualSession().createTopic(s);
186   }
187 
188   public int getAcknowledgeMode() throws JMSException
189   {
190     return getActualSession().getAcknowledgeMode();
191   }
192 
193   public boolean getTransacted() throws JMSException
194   {
195     return true;
196   }
197 
198   public void recover() throws JMSException
199   {
200     getActualSession().recover();
201   }
202 
203   public void rollback() throws JMSException
204   {
205     throw new JMSException("Cannot rollback() inside XASession");
206   }
207 
208   public void run()
209   {
210     if (session != null) {
211       session.run();
212     }
213   }
214 
215   public XAResource getXAResource()
216   {
217     try {
218       return getActualSession().getXAResource();
219     } catch(Exception e) {
220       throw new RuntimeException(e);
221     }
222   }
223 
224   public Session getSession() throws JMSException
225   {
226     return this;
227   }
228 
229 
230   // Consumer related methods
231   //-------------------------------------------------------------------------
232   public QueueBrowser createBrowser(Queue queue) throws JMSException
233   {
234     return getActualSession().createBrowser(queue);
235   }
236 
237   public QueueBrowser createBrowser(Queue queue, String selector)
238       throws JMSException
239   {
240     return getActualSession().createBrowser(queue, selector);
241   }
242 
243   public MessageConsumer createConsumer(Destination destination)
244       throws JMSException
245   {
246     return getActualSession().createConsumer(destination);
247   }
248 
249   public MessageConsumer createConsumer(Destination destination,
250                                         String selector) throws JMSException
251   {
252     return getActualSession().createConsumer(destination, selector);
253   }
254 
255   public MessageConsumer createConsumer(Destination destination,
256                                         String selector, boolean noLocal)
257       throws JMSException
258   {
259     return getActualSession().createConsumer(destination, selector, noLocal);
260   }
261 
262   public TopicSubscriber createDurableSubscriber(Topic topic, String selector)
263       throws JMSException
264   {
265     return getActualSession().createDurableSubscriber(topic, selector);
266   }
267 
268   public TopicSubscriber createDurableSubscriber(Topic topic, String name,
269                                                  String selector,
270                                                  boolean noLocal)
271       throws JMSException
272   {
273     return getActualSession().createDurableSubscriber(topic, name, selector, noLocal);
274   }
275 
276   public MessageListener getMessageListener() throws JMSException
277   {
278     return getActualSession().getMessageListener();
279   }
280 
281   public void setMessageListener(MessageListener messageListener)
282       throws JMSException
283   {
284     getActualSession().setMessageListener(messageListener);
285   }
286 
287   public TopicSubscriber createSubscriber(Topic topic) throws JMSException
288   {
289     return ((TopicSession)getActualSession()).createSubscriber(topic);
290   }
291 
292   public TopicSubscriber createSubscriber(Topic topic, String selector,
293                                           boolean local) throws JMSException
294   {
295     return ((TopicSession)getActualSession()).createSubscriber(topic, selector, local);
296   }
297 
298   public QueueReceiver createReceiver(Queue queue) throws JMSException
299   {
300     return ((QueueSession)getActualSession()).createReceiver(queue);
301   }
302 
303   public QueueReceiver createReceiver(Queue queue, String selector)
304       throws JMSException
305   {
306     return ((QueueSession)getActualSession()).createReceiver(queue, selector);
307   }
308 
309 
310   // Producer related methods
311   //-------------------------------------------------------------------------
312   public MessageProducer createProducer(Destination destination)
313       throws JMSException
314   {
315     return new PooledProducer(getMessageProducer(), destination);
316   }
317 
318   public QueueSender createSender(Queue queue) throws JMSException
319   {
320     return new PooledQueueSender(getQueueSender(), queue);
321   }
322 
323   public TopicPublisher createPublisher(Topic topic) throws JMSException
324   {
325     return new PooledTopicPublisher(getTopicPublisher(), topic);
326   }
327 
328   // Implementation methods
329   //-------------------------------------------------------------------------
330   public XASession getActualSession() throws JMSException
331   {
332     if (this.session == null) {
333       throw new JMSException("The session has already been closed");
334     }
335     return this.session;
336   }
337 
338   public MessageProducer getMessageProducer() throws JMSException
339   {
340     if (this.messageProducer == null) {
341       this.messageProducer = getActualSession().createProducer(null);
342     }
343     return this.messageProducer;
344   }
345 
346   public QueueSender getQueueSender() throws JMSException
347   {
348     if (this.queueSender == null) {
349       this.queueSender = ((QueueSession)getActualSession()).createSender(null);
350     }
351     return this.queueSender;
352   }
353 
354   public TopicPublisher getTopicPublisher() throws JMSException
355   {
356     if (this.topicPublisher == null) {
357       this.topicPublisher = ((TopicSession)getActualSession()).createPublisher(null);
358     }
359     return this.topicPublisher;
360   }
361 
362 }