001 /** 002 * Licensed under the Apache License, Version 2.0 (the "License"); 003 * you may not use this file except in compliance with the License. 004 * You may obtain a copy of the License at 005 * 006 * http://www.apache.org/licenses/LICENSE-2.0 007 * 008 * Unless required by applicable law or agreed to in writing, software 009 * distributed under the License is distributed on an "AS IS" BASIS, 010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 011 * See the License for the specific language governing permissions and 012 * limitations under the License. 013 * 014 **/ 015 package org.jencks.pool; 016 017 import javax.jms.Session; 018 import javax.jms.JMSException; 019 import javax.jms.ConnectionConsumer; 020 import javax.jms.Destination; 021 import javax.jms.ServerSessionPool; 022 import javax.jms.Topic; 023 import javax.jms.ExceptionListener; 024 import javax.jms.ConnectionMetaData; 025 import javax.jms.Queue; 026 import javax.jms.QueueSession; 027 import javax.jms.TopicSession; 028 import javax.jms.XAConnection; 029 import javax.jms.TopicConnection; 030 import javax.jms.QueueConnection; 031 import javax.jms.XASession; 032 import javax.transaction.TransactionManager; 033 import javax.transaction.Status; 034 import javax.transaction.SystemException; 035 import javax.transaction.RollbackException; 036 import javax.transaction.xa.XAResource; 037 038 import org.springframework.transaction.support.TransactionSynchronizationManager; 039 import org.springframework.transaction.support.TransactionSynchronization; 040 import org.apache.commons.logging.Log; 041 import org.apache.commons.logging.LogFactory; 042 043 public class PooledSpringXAConnection implements TopicConnection, 044 QueueConnection, XAConnection { 045 private static final Log log = LogFactory 046 .getLog(PooledSpringXAConnection.class); 047 048 private final ConnectionInfo connectionInfo; 049 050 private XASessionPool sessionPool; 051 052 private TransactionManager transactionManager; 053 054 private boolean stopped; 055 056 private boolean closed; 057 058 private boolean clientIdSetSinceReopen = false; 059 060 private PooledSpringXAConnectionFactory pooledConnectionFactory; 061 062 public PooledSpringXAConnection( 063 final PooledSpringXAConnectionFactory pooledConnectionFactory, 064 final TransactionManager transactionManager, 065 final XAConnection connection) { 066 this(pooledConnectionFactory, transactionManager, new ConnectionInfo( 067 connection), new XASessionPool(connection)); 068 } 069 070 public PooledSpringXAConnection( 071 final PooledSpringXAConnectionFactory pooledConnectionFactory, 072 final TransactionManager transactionManager, 073 final ConnectionInfo connectionInfo, final XASessionPool sessionPool) { 074 this.pooledConnectionFactory = pooledConnectionFactory; 075 this.transactionManager = transactionManager; 076 this.connectionInfo = connectionInfo; 077 this.sessionPool = sessionPool; 078 this.closed = false; 079 } 080 081 /** 082 * Factory method to create a new instance. 083 */ 084 public PooledSpringXAConnection newInstance() { 085 return new PooledSpringXAConnection(this.pooledConnectionFactory, 086 this.transactionManager, this.connectionInfo, this.sessionPool); 087 } 088 089 public void close() throws JMSException { 090 this.closed = true; 091 } 092 093 public void start() throws JMSException { 094 // TODO should we start connections first before pooling them? 095 getConnection().start(); 096 } 097 098 public void stop() throws JMSException { 099 this.stopped = true; 100 } 101 102 public ConnectionConsumer createConnectionConsumer( 103 final Destination destination, final String selector, 104 final ServerSessionPool serverSessionPool, final int maxMessages) 105 throws JMSException { 106 return getConnection().createConnectionConsumer(destination, selector, 107 serverSessionPool, maxMessages); 108 } 109 110 public ConnectionConsumer createConnectionConsumer(Topic topic, String s, 111 ServerSessionPool serverSessionPool, int maxMessages) 112 throws JMSException { 113 return getConnection().createConnectionConsumer(topic, s, 114 serverSessionPool, maxMessages); 115 } 116 117 public ConnectionConsumer createDurableConnectionConsumer( 118 final Topic topic, final String selector, final String s1, 119 final ServerSessionPool serverSessionPool, final int i) 120 throws JMSException { 121 return getConnection().createDurableConnectionConsumer(topic, selector, 122 s1, serverSessionPool, i); 123 } 124 125 public String getClientID() throws JMSException { 126 return getConnection().getClientID(); 127 } 128 129 public ExceptionListener getExceptionListener() throws JMSException { 130 return getConnection().getExceptionListener(); 131 } 132 133 public ConnectionMetaData getMetaData() throws JMSException { 134 return getConnection().getMetaData(); 135 } 136 137 public void setExceptionListener(ExceptionListener exceptionListener) 138 throws JMSException { 139 getConnection().setExceptionListener(exceptionListener); 140 } 141 142 public void setClientID(String clientID) throws JMSException { 143 if (this.clientIdSetSinceReopen) { 144 throw new JMSException( 145 "ClientID is already set on this connection."); 146 } 147 148 synchronized (this.connectionInfo) { 149 if (this.connectionInfo.isActualClientIdSet()) { 150 if (this.connectionInfo.getActualClientIdBase() == null ? clientID != null 151 : !this.connectionInfo.getActualClientIdBase().equals( 152 clientID)) { 153 throw new JMSException( 154 "A pooled Connection must only ever have its client ID set to the same value for the duration of the pooled ConnectionFactory. It looks like code has set a client ID, returned the connection to the pool, and then later obtained the connection from the pool and set a different client ID."); 155 } 156 } else { 157 final String generatedId = getPooledConnectionFactory() 158 .generateClientID(clientID); 159 getConnection().setClientID(generatedId); 160 this.connectionInfo.setActualClientIdBase(clientID); 161 this.connectionInfo.setActualClientIdSet(true); 162 } 163 } 164 165 this.clientIdSetSinceReopen = true; 166 } 167 168 public ConnectionConsumer createConnectionConsumer(final Queue queue, 169 final String selector, final ServerSessionPool serverSessionPool, 170 final int maxMessages) throws JMSException { 171 return getConnection().createConnectionConsumer(queue, selector, 172 serverSessionPool, maxMessages); 173 } 174 175 public XASession createXASession() throws JMSException { 176 try { 177 if (log.isDebugEnabled()) { 178 log.debug("-->> ENTERING PooledSpringXAConnection.createXASession()"); 179 } 180 if (this.transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION) { 181 if (log.isDebugEnabled()) { 182 log.debug("-->> ACTUAL TRANSACTION IS ACTIVE!"); 183 } 184 final XASession session = (XASession) TransactionSynchronizationManager.getResource(this.connectionInfo.getConnection()); 185 if (session != null) { 186 if (log.isDebugEnabled()) { 187 log.debug("-->> RETURNING ALREADY ACTIVE SESSION ASSOCIATED WITH CURRENT THREAD..."); 188 } 189 return session; 190 } else { 191 if (log.isDebugEnabled()) { 192 log.debug("-->> NO ACTIVE SESSION ASSOCIATED WITH CURRENT THREAD, BORROWING..."); 193 } 194 final PooledSpringXASession newSession = this.sessionPool.borrowSession(); 195 newSession.setIgnoreClose(true); 196 if (log.isDebugEnabled()) { 197 log.debug("-->> ENLISTING NEW SESSION'S XAResource WITH TRANSACTION..."); 198 } 199 this.transactionManager.getTransaction().enlistResource(newSession.getXAResource()); 200 try { 201 if (log.isDebugEnabled()) { 202 log.debug("-->> BINDING NEW SESSION WITH TRANSACTION..."); 203 } 204 TransactionSynchronizationManager.bindResource(this.connectionInfo.getConnection(), newSession); 205 try { 206 if (log.isDebugEnabled()) { 207 log.debug("-->> REGISTERING SYNCHRONIZATION WITH TRANSACTION..."); 208 } 209 TransactionSynchronizationManager.registerSynchronization(new Synchronization(newSession)); 210 return newSession; 211 } catch (Throwable t) { 212 if (log.isDebugEnabled()) { 213 log.debug("-->> CAUGHT EXCEPTION WHILE ASSOCIATING SESSION WITH TRANSACTION, UNBINDING RESOURCE.", t); 214 } 215 TransactionSynchronizationManager.unbindResource(connectionInfo.getConnection()); 216 newSession.setIgnoreClose(false); 217 throw t; 218 } 219 } catch (Throwable t) { 220 if (log.isDebugEnabled()) { 221 log.debug("-->> CAUGHT EXCEPTION WHILE ASSOCIATING SESSION WITH TRANSACTION (2), DELISTING RESOURCE...", t); 222 } 223 this.transactionManager.getTransaction().delistResource(newSession.getXAResource(), XAResource.TMSUCCESS); 224 if (log.isDebugEnabled()) { 225 log.debug("-->> DESTROYING SESSION AND REMOVING FROM POOL..."); 226 } 227 newSession.destroyAndRemoveFromPool(); 228 if (log.isDebugEnabled()) { 229 log.debug("-->> RETHROWING EXCEPTION..."); 230 } 231 if (t instanceof JMSException) { 232 throw (JMSException) t; 233 } else if (t instanceof RuntimeException) { 234 throw (RuntimeException) t; 235 } else if (t instanceof Error) { 236 throw (Error) t; 237 } else { 238 final JMSException jmsException = new JMSException("Unable to enlist session with transaction."); 239 jmsException.initCause(t); 240 throw jmsException; 241 } 242 } 243 } 244 } else { 245 if (log.isDebugEnabled()) 246 log 247 .debug("-->> THERE IS NO ACTIVE TRANSACTION, SO JUST RETURNING BORROWED SESSION..."); 248 return this.sessionPool.borrowSession(); 249 } 250 } catch (SystemException e) { 251 final JMSException jmsException = new JMSException( 252 "System Exception"); 253 jmsException.initCause(e); 254 throw jmsException; 255 } catch (RollbackException re) { 256 final JMSException jmsException = new JMSException( 257 "Rollback exception"); 258 jmsException.initCause(re); 259 throw jmsException; 260 } 261 } 262 263 // Session factory methods 264 //------------------------------------------------------------------------- 265 public QueueSession createQueueSession(boolean transacted, int ackMode) 266 throws JMSException { 267 return (QueueSession) createSession(transacted, ackMode); 268 } 269 270 public TopicSession createTopicSession(boolean transacted, int ackMode) 271 throws JMSException { 272 return (TopicSession) createSession(transacted, ackMode); 273 } 274 275 public Session createSession(boolean transacted, int ackMode) 276 throws JMSException { 277 return createXASession(); 278 } 279 280 // Implementation methods 281 //------------------------------------------------------------------------- 282 protected XAConnection getConnection() throws JMSException { 283 if (this.stopped || this.closed) { 284 throw new JMSException("Already closed"); 285 } 286 return this.connectionInfo.getConnection(); 287 } 288 289 public PooledSpringXAConnectionFactory getPooledConnectionFactory() { 290 return this.pooledConnectionFactory; 291 } 292 293 private class Synchronization implements TransactionSynchronization { 294 private final PooledSpringXASession session; 295 296 private Synchronization(PooledSpringXASession session) { 297 this.session = session; 298 } 299 300 public void suspend() { 301 if (log.isDebugEnabled()) { 302 log.debug("-->> PooledSpringXAConnection.[synchronization].suspend() CALLED..."); 303 } 304 TransactionSynchronizationManager.unbindResource(connectionInfo.getConnection()); 305 } 306 307 public void resume() { 308 if (log.isDebugEnabled()) { 309 log.debug("-->> PooledSpringXAConnection.[synchronization].resume() CALLED..."); 310 } 311 TransactionSynchronizationManager.bindResource(connectionInfo.getConnection(), session); 312 } 313 314 public void beforeCommit(boolean readOnly) { 315 } 316 317 public void beforeCompletion() { 318 } 319 320 public void afterCompletion(int status) { 321 if (log.isDebugEnabled()) { 322 log.debug("-->> PooledSpringXAConnection.[synchronization].afterCompletion() CALLED..."); 323 } 324 TransactionSynchronizationManager.unbindResource(connectionInfo.getConnection()); 325 try { 326 // This will return session to the pool. 327 if (log.isDebugEnabled()) { 328 log.debug("-->> RETURNING JMS SESSION TO POOL..."); 329 } 330 session.setIgnoreClose(false); 331 session.close(); 332 } catch (JMSException e) { 333 throw new RuntimeException(e); 334 } 335 } 336 } 337 338 private static class ConnectionInfo { 339 private XAConnection connection; 340 341 private boolean actualClientIdSet; 342 343 private String actualClientIdBase; 344 345 public ConnectionInfo(final XAConnection connection) { 346 this.connection = connection; 347 this.actualClientIdSet = false; 348 this.actualClientIdBase = null; 349 } 350 351 public XAConnection getConnection() { 352 return connection; 353 } 354 355 public void setConnection(final XAConnection connection) { 356 this.connection = connection; 357 } 358 359 public synchronized boolean isActualClientIdSet() { 360 return actualClientIdSet; 361 } 362 363 public synchronized void setActualClientIdSet( 364 final boolean actualClientIdSet) { 365 this.actualClientIdSet = actualClientIdSet; 366 } 367 368 public synchronized String getActualClientIdBase() { 369 return actualClientIdBase; 370 } 371 372 public synchronized void setActualClientIdBase( 373 final String actualClientIdBase) { 374 this.actualClientIdBase = actualClientIdBase; 375 } 376 } 377 }