View Javadoc

1   /*** 
2    * Licensed under the Apache License, Version 2.0 (the "License"); 
3    * you may not use this file except in compliance with the License. 
4    * You may obtain a copy of the License at 
5    * 
6    * http://www.apache.org/licenses/LICENSE-2.0
7    * 
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS, 
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
11   * See the License for the specific language governing permissions and 
12   * limitations under the License. 
13   * 
14   **/
15  package org.jencks.pool;
16  
17  import javax.jms.Session;
18  import javax.jms.JMSException;
19  import javax.jms.ConnectionConsumer;
20  import javax.jms.Destination;
21  import javax.jms.ServerSessionPool;
22  import javax.jms.Topic;
23  import javax.jms.ExceptionListener;
24  import javax.jms.ConnectionMetaData;
25  import javax.jms.Queue;
26  import javax.jms.QueueSession;
27  import javax.jms.TopicSession;
28  import javax.jms.XAConnection;
29  import javax.jms.TopicConnection;
30  import javax.jms.QueueConnection;
31  import javax.jms.XASession;
32  import javax.transaction.TransactionManager;
33  import javax.transaction.Status;
34  import javax.transaction.SystemException;
35  import javax.transaction.RollbackException;
36  import javax.transaction.xa.XAResource;
37  
38  import org.springframework.transaction.support.TransactionSynchronizationManager;
39  import org.springframework.transaction.support.TransactionSynchronization;
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  
43  public class PooledSpringXAConnection implements TopicConnection,
44  		QueueConnection, XAConnection {
45  	private static final Log log = LogFactory
46  			.getLog(PooledSpringXAConnection.class);
47  
48  	private final ConnectionInfo connectionInfo;
49  
50  	private XASessionPool sessionPool;
51  
52  	private TransactionManager transactionManager;
53  
54  	private boolean stopped;
55  
56  	private boolean closed;
57  
58  	private boolean clientIdSetSinceReopen = false;
59  
60  	private PooledSpringXAConnectionFactory pooledConnectionFactory;
61  
62  	public PooledSpringXAConnection(
63  			final PooledSpringXAConnectionFactory pooledConnectionFactory,
64  			final TransactionManager transactionManager,
65  			final XAConnection connection) {
66  		this(pooledConnectionFactory, transactionManager, new ConnectionInfo(
67  				connection), new XASessionPool(connection));
68  	}
69  
70  	public PooledSpringXAConnection(
71  			final PooledSpringXAConnectionFactory pooledConnectionFactory,
72  			final TransactionManager transactionManager,
73  			final ConnectionInfo connectionInfo, final XASessionPool sessionPool) {
74  		this.pooledConnectionFactory = pooledConnectionFactory;
75  		this.transactionManager = transactionManager;
76  		this.connectionInfo = connectionInfo;
77  		this.sessionPool = sessionPool;
78  		this.closed = false;
79  	}
80  
81  	/***
82  	 * Factory method to create a new instance.
83  	 */
84  	public PooledSpringXAConnection newInstance() {
85  		return new PooledSpringXAConnection(this.pooledConnectionFactory,
86  				this.transactionManager, this.connectionInfo, this.sessionPool);
87  	}
88  
89  	public void close() throws JMSException {
90  		this.closed = true;
91  	}
92  
93  	public void start() throws JMSException {
94  		// TODO should we start connections first before pooling them?
95  		getConnection().start();
96  	}
97  
98  	public void stop() throws JMSException {
99  		this.stopped = true;
100 	}
101 
102 	public ConnectionConsumer createConnectionConsumer(
103 			final Destination destination, final String selector,
104 			final ServerSessionPool serverSessionPool, final int maxMessages)
105 			throws JMSException {
106 		return getConnection().createConnectionConsumer(destination, selector,
107 				serverSessionPool, maxMessages);
108 	}
109 
110 	public ConnectionConsumer createConnectionConsumer(Topic topic, String s,
111 			ServerSessionPool serverSessionPool, int maxMessages)
112 			throws JMSException {
113 		return getConnection().createConnectionConsumer(topic, s,
114 				serverSessionPool, maxMessages);
115 	}
116 
117 	public ConnectionConsumer createDurableConnectionConsumer(
118 			final Topic topic, final String selector, final String s1,
119 			final ServerSessionPool serverSessionPool, final int i)
120 			throws JMSException {
121 		return getConnection().createDurableConnectionConsumer(topic, selector,
122 				s1, serverSessionPool, i);
123 	}
124 
125 	public String getClientID() throws JMSException {
126 		return getConnection().getClientID();
127 	}
128 
129 	public ExceptionListener getExceptionListener() throws JMSException {
130 		return getConnection().getExceptionListener();
131 	}
132 
133 	public ConnectionMetaData getMetaData() throws JMSException {
134 		return getConnection().getMetaData();
135 	}
136 
137 	public void setExceptionListener(ExceptionListener exceptionListener)
138 			throws JMSException {
139 		getConnection().setExceptionListener(exceptionListener);
140 	}
141 
142 	public void setClientID(String clientID) throws JMSException {
143 		if (this.clientIdSetSinceReopen) {
144 			throw new JMSException(
145 					"ClientID is already set on this connection.");
146 		}
147 
148 		synchronized (this.connectionInfo) {
149 			if (this.connectionInfo.isActualClientIdSet()) {
150 				if (this.connectionInfo.getActualClientIdBase() == null ? clientID != null
151 						: !this.connectionInfo.getActualClientIdBase().equals(
152 								clientID)) {
153 					throw new JMSException(
154 							"A pooled Connection must only ever have its client ID set to the same value for the duration of the pooled ConnectionFactory.  It looks like code has set a client ID, returned the connection to the pool, and then later obtained the connection from the pool and set a different client ID.");
155 				}
156 			} else {
157 				final String generatedId = getPooledConnectionFactory()
158 						.generateClientID(clientID);
159 				getConnection().setClientID(generatedId);
160 				this.connectionInfo.setActualClientIdBase(clientID);
161 				this.connectionInfo.setActualClientIdSet(true);
162 			}
163 		}
164 
165 		this.clientIdSetSinceReopen = true;
166 	}
167 
168 	public ConnectionConsumer createConnectionConsumer(final Queue queue,
169 			final String selector, final ServerSessionPool serverSessionPool,
170 			final int maxMessages) throws JMSException {
171 		return getConnection().createConnectionConsumer(queue, selector,
172 				serverSessionPool, maxMessages);
173 	}
174 
175 	public XASession createXASession() throws JMSException {
176 		try {
177 			if (log.isDebugEnabled()) {
178 				log.debug("-->> ENTERING PooledSpringXAConnection.createXASession()");
179 			}
180 			if (this.transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION) {
181 				if (log.isDebugEnabled()) {
182 					log.debug("-->> ACTUAL TRANSACTION IS ACTIVE!");
183 				}
184 				final XASession session = (XASession) TransactionSynchronizationManager.getResource(this.connectionInfo.getConnection());
185 				if (session != null) {
186 					if (log.isDebugEnabled()) {
187 						log.debug("-->> RETURNING ALREADY ACTIVE SESSION ASSOCIATED WITH CURRENT THREAD...");
188 					}
189 					return session;
190 				} else {
191 					if (log.isDebugEnabled()) {
192 						log.debug("-->> NO ACTIVE SESSION ASSOCIATED WITH CURRENT THREAD, BORROWING...");
193 					}
194 					final PooledSpringXASession newSession = this.sessionPool.borrowSession();
195 					newSession.setIgnoreClose(true);
196 					if (log.isDebugEnabled()) {
197 						log.debug("-->> ENLISTING NEW SESSION'S XAResource WITH TRANSACTION...");
198 					}
199 					this.transactionManager.getTransaction().enlistResource(newSession.getXAResource());
200 					try {
201 						if (log.isDebugEnabled()) {
202 							log.debug("-->> BINDING NEW SESSION WITH TRANSACTION...");
203 						}
204 						TransactionSynchronizationManager.bindResource(this.connectionInfo.getConnection(), newSession);
205 						try {
206 							if (log.isDebugEnabled()) {
207 								log.debug("-->> REGISTERING SYNCHRONIZATION WITH TRANSACTION...");
208 							}
209 							TransactionSynchronizationManager.registerSynchronization(new Synchronization(newSession));
210 							return newSession;
211 						} catch (Throwable t) {
212 							if (log.isDebugEnabled()) {
213 								log.debug("-->> CAUGHT EXCEPTION WHILE ASSOCIATING SESSION WITH TRANSACTION, UNBINDING RESOURCE.", t);
214 							}
215 							TransactionSynchronizationManager.unbindResource(connectionInfo.getConnection());
216 							newSession.setIgnoreClose(false);
217 							throw t;
218 						}
219 					} catch (Throwable t) {
220 						if (log.isDebugEnabled()) {
221 							log.debug("-->> CAUGHT EXCEPTION WHILE ASSOCIATING SESSION WITH TRANSACTION (2), DELISTING RESOURCE...", t);
222 						}
223 						this.transactionManager.getTransaction().delistResource(newSession.getXAResource(), XAResource.TMSUCCESS);
224 						if (log.isDebugEnabled()) {
225 							log.debug("-->> DESTROYING SESSION AND REMOVING FROM POOL...");
226 						}
227 						newSession.destroyAndRemoveFromPool();
228 						if (log.isDebugEnabled()) {
229 							log.debug("-->> RETHROWING EXCEPTION...");
230 						}
231 						if (t instanceof JMSException) {
232 							throw (JMSException) t;
233 						} else if (t instanceof RuntimeException) {
234 							throw (RuntimeException) t;
235 						} else if (t instanceof Error) {
236 							throw (Error) t;
237 						} else {
238 							final JMSException jmsException = new JMSException("Unable to enlist session with transaction.");
239 							jmsException.initCause(t);
240 							throw jmsException;
241 						}
242 					}
243 				}
244 			} else {
245 				if (log.isDebugEnabled())
246 					log
247 							.debug("-->> THERE IS NO ACTIVE TRANSACTION, SO JUST RETURNING BORROWED SESSION...");
248 				return this.sessionPool.borrowSession();
249 			}
250 		} catch (SystemException e) {
251 			final JMSException jmsException = new JMSException(
252 					"System Exception");
253 			jmsException.initCause(e);
254 			throw jmsException;
255 		} catch (RollbackException re) {
256 			final JMSException jmsException = new JMSException(
257 					"Rollback exception");
258 			jmsException.initCause(re);
259 			throw jmsException;
260 		}
261 	}
262 
263 	// Session factory methods
264 	//-------------------------------------------------------------------------
265 	public QueueSession createQueueSession(boolean transacted, int ackMode)
266 			throws JMSException {
267 		return (QueueSession) createSession(transacted, ackMode);
268 	}
269 
270 	public TopicSession createTopicSession(boolean transacted, int ackMode)
271 			throws JMSException {
272 		return (TopicSession) createSession(transacted, ackMode);
273 	}
274 
275 	public Session createSession(boolean transacted, int ackMode)
276 			throws JMSException {
277 		return createXASession();
278 	}
279 
280 	// Implementation methods
281 	//-------------------------------------------------------------------------
282 	protected XAConnection getConnection() throws JMSException {
283 		if (this.stopped || this.closed) {
284 			throw new JMSException("Already closed");
285 		}
286 		return this.connectionInfo.getConnection();
287 	}
288 
289 	public PooledSpringXAConnectionFactory getPooledConnectionFactory() {
290 		return this.pooledConnectionFactory;
291 	}
292 
293 	private class Synchronization implements TransactionSynchronization {
294 		private final PooledSpringXASession session;
295 
296 		private Synchronization(PooledSpringXASession session) {
297 			this.session = session;
298 		}
299 
300 		public void suspend() {
301 			if (log.isDebugEnabled()) {
302 				log.debug("-->> PooledSpringXAConnection.[synchronization].suspend() CALLED...");
303 			}
304 			TransactionSynchronizationManager.unbindResource(connectionInfo.getConnection());
305 		}
306 
307 		public void resume() {
308 			if (log.isDebugEnabled()) {
309 				log.debug("-->> PooledSpringXAConnection.[synchronization].resume() CALLED...");
310 			}
311 			TransactionSynchronizationManager.bindResource(connectionInfo.getConnection(), session);
312 		}
313 
314 		public void beforeCommit(boolean readOnly) {
315 		}
316 
317 		public void beforeCompletion() {
318 		}
319 
320 		public void afterCompletion(int status) {
321 			if (log.isDebugEnabled()) {
322 				log.debug("-->> PooledSpringXAConnection.[synchronization].afterCompletion() CALLED...");
323 			}
324 			TransactionSynchronizationManager.unbindResource(connectionInfo.getConnection());
325 			try {
326 				// This will return session to the pool.
327 				if (log.isDebugEnabled()) {
328 					log.debug("-->> RETURNING JMS SESSION TO POOL...");
329 				}
330 				session.setIgnoreClose(false);
331 				session.close();
332 			} catch (JMSException e) {
333 				throw new RuntimeException(e);
334 			}
335 		}
336 	}
337 
338 	private static class ConnectionInfo {
339 		private XAConnection connection;
340 
341 		private boolean actualClientIdSet;
342 
343 		private String actualClientIdBase;
344 
345 		public ConnectionInfo(final XAConnection connection) {
346 			this.connection = connection;
347 			this.actualClientIdSet = false;
348 			this.actualClientIdBase = null;
349 		}
350 
351 		public XAConnection getConnection() {
352 			return connection;
353 		}
354 
355 		public void setConnection(final XAConnection connection) {
356 			this.connection = connection;
357 		}
358 
359 		public synchronized boolean isActualClientIdSet() {
360 			return actualClientIdSet;
361 		}
362 
363 		public synchronized void setActualClientIdSet(
364 				final boolean actualClientIdSet) {
365 			this.actualClientIdSet = actualClientIdSet;
366 		}
367 
368 		public synchronized String getActualClientIdBase() {
369 			return actualClientIdBase;
370 		}
371 
372 		public synchronized void setActualClientIdBase(
373 				final String actualClientIdBase) {
374 			this.actualClientIdBase = actualClientIdBase;
375 		}
376 	}
377 }