001    /** 
002     * Licensed under the Apache License, Version 2.0 (the "License"); 
003     * you may not use this file except in compliance with the License. 
004     * You may obtain a copy of the License at 
005     * 
006     * http://www.apache.org/licenses/LICENSE-2.0
007     * 
008     * Unless required by applicable law or agreed to in writing, software
009     * distributed under the License is distributed on an "AS IS" BASIS, 
010     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
011     * See the License for the specific language governing permissions and 
012     * limitations under the License. 
013     * 
014     **/
015    package org.jencks.pool;
016    
017    import javax.jms.Session;
018    import javax.jms.JMSException;
019    import javax.jms.ConnectionConsumer;
020    import javax.jms.Destination;
021    import javax.jms.ServerSessionPool;
022    import javax.jms.Topic;
023    import javax.jms.ExceptionListener;
024    import javax.jms.ConnectionMetaData;
025    import javax.jms.Queue;
026    import javax.jms.QueueSession;
027    import javax.jms.TopicSession;
028    import javax.jms.XAConnection;
029    import javax.jms.TopicConnection;
030    import javax.jms.QueueConnection;
031    import javax.jms.XASession;
032    import javax.transaction.TransactionManager;
033    import javax.transaction.Status;
034    import javax.transaction.SystemException;
035    import javax.transaction.RollbackException;
036    import javax.transaction.xa.XAResource;
037    
038    import org.springframework.transaction.support.TransactionSynchronizationManager;
039    import org.springframework.transaction.support.TransactionSynchronization;
040    import org.apache.commons.logging.Log;
041    import org.apache.commons.logging.LogFactory;
042    
043    public class PooledSpringXAConnection implements TopicConnection,
044                    QueueConnection, XAConnection {
045            private static final Log log = LogFactory
046                            .getLog(PooledSpringXAConnection.class);
047    
048            private final ConnectionInfo connectionInfo;
049    
050            private XASessionPool sessionPool;
051    
052            private TransactionManager transactionManager;
053    
054            private boolean stopped;
055    
056            private boolean closed;
057    
058            private boolean clientIdSetSinceReopen = false;
059    
060            private PooledSpringXAConnectionFactory pooledConnectionFactory;
061    
062            public PooledSpringXAConnection(
063                            final PooledSpringXAConnectionFactory pooledConnectionFactory,
064                            final TransactionManager transactionManager,
065                            final XAConnection connection) {
066                    this(pooledConnectionFactory, transactionManager, new ConnectionInfo(
067                                    connection), new XASessionPool(connection));
068            }
069    
070            public PooledSpringXAConnection(
071                            final PooledSpringXAConnectionFactory pooledConnectionFactory,
072                            final TransactionManager transactionManager,
073                            final ConnectionInfo connectionInfo, final XASessionPool sessionPool) {
074                    this.pooledConnectionFactory = pooledConnectionFactory;
075                    this.transactionManager = transactionManager;
076                    this.connectionInfo = connectionInfo;
077                    this.sessionPool = sessionPool;
078                    this.closed = false;
079            }
080    
081            /**
082             * Factory method to create a new instance.
083             */
084            public PooledSpringXAConnection newInstance() {
085                    return new PooledSpringXAConnection(this.pooledConnectionFactory,
086                                    this.transactionManager, this.connectionInfo, this.sessionPool);
087            }
088    
089            public void close() throws JMSException {
090                    this.closed = true;
091            }
092    
093            public void start() throws JMSException {
094                    // TODO should we start connections first before pooling them?
095                    getConnection().start();
096            }
097    
098            public void stop() throws JMSException {
099                    this.stopped = true;
100            }
101    
102            public ConnectionConsumer createConnectionConsumer(
103                            final Destination destination, final String selector,
104                            final ServerSessionPool serverSessionPool, final int maxMessages)
105                            throws JMSException {
106                    return getConnection().createConnectionConsumer(destination, selector,
107                                    serverSessionPool, maxMessages);
108            }
109    
110            public ConnectionConsumer createConnectionConsumer(Topic topic, String s,
111                            ServerSessionPool serverSessionPool, int maxMessages)
112                            throws JMSException {
113                    return getConnection().createConnectionConsumer(topic, s,
114                                    serverSessionPool, maxMessages);
115            }
116    
117            public ConnectionConsumer createDurableConnectionConsumer(
118                            final Topic topic, final String selector, final String s1,
119                            final ServerSessionPool serverSessionPool, final int i)
120                            throws JMSException {
121                    return getConnection().createDurableConnectionConsumer(topic, selector,
122                                    s1, serverSessionPool, i);
123            }
124    
125            public String getClientID() throws JMSException {
126                    return getConnection().getClientID();
127            }
128    
129            public ExceptionListener getExceptionListener() throws JMSException {
130                    return getConnection().getExceptionListener();
131            }
132    
133            public ConnectionMetaData getMetaData() throws JMSException {
134                    return getConnection().getMetaData();
135            }
136    
137            public void setExceptionListener(ExceptionListener exceptionListener)
138                            throws JMSException {
139                    getConnection().setExceptionListener(exceptionListener);
140            }
141    
142            public void setClientID(String clientID) throws JMSException {
143                    if (this.clientIdSetSinceReopen) {
144                            throw new JMSException(
145                                            "ClientID is already set on this connection.");
146                    }
147    
148                    synchronized (this.connectionInfo) {
149                            if (this.connectionInfo.isActualClientIdSet()) {
150                                    if (this.connectionInfo.getActualClientIdBase() == null ? clientID != null
151                                                    : !this.connectionInfo.getActualClientIdBase().equals(
152                                                                    clientID)) {
153                                            throw new JMSException(
154                                                            "A pooled Connection must only ever have its client ID set to the same value for the duration of the pooled ConnectionFactory.  It looks like code has set a client ID, returned the connection to the pool, and then later obtained the connection from the pool and set a different client ID.");
155                                    }
156                            } else {
157                                    final String generatedId = getPooledConnectionFactory()
158                                                    .generateClientID(clientID);
159                                    getConnection().setClientID(generatedId);
160                                    this.connectionInfo.setActualClientIdBase(clientID);
161                                    this.connectionInfo.setActualClientIdSet(true);
162                            }
163                    }
164    
165                    this.clientIdSetSinceReopen = true;
166            }
167    
168            public ConnectionConsumer createConnectionConsumer(final Queue queue,
169                            final String selector, final ServerSessionPool serverSessionPool,
170                            final int maxMessages) throws JMSException {
171                    return getConnection().createConnectionConsumer(queue, selector,
172                                    serverSessionPool, maxMessages);
173            }
174    
175            public XASession createXASession() throws JMSException {
176                    try {
177                            if (log.isDebugEnabled()) {
178                                    log.debug("-->> ENTERING PooledSpringXAConnection.createXASession()");
179                            }
180                            if (this.transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION) {
181                                    if (log.isDebugEnabled()) {
182                                            log.debug("-->> ACTUAL TRANSACTION IS ACTIVE!");
183                                    }
184                                    final XASession session = (XASession) TransactionSynchronizationManager.getResource(this.connectionInfo.getConnection());
185                                    if (session != null) {
186                                            if (log.isDebugEnabled()) {
187                                                    log.debug("-->> RETURNING ALREADY ACTIVE SESSION ASSOCIATED WITH CURRENT THREAD...");
188                                            }
189                                            return session;
190                                    } else {
191                                            if (log.isDebugEnabled()) {
192                                                    log.debug("-->> NO ACTIVE SESSION ASSOCIATED WITH CURRENT THREAD, BORROWING...");
193                                            }
194                                            final PooledSpringXASession newSession = this.sessionPool.borrowSession();
195                                            newSession.setIgnoreClose(true);
196                                            if (log.isDebugEnabled()) {
197                                                    log.debug("-->> ENLISTING NEW SESSION'S XAResource WITH TRANSACTION...");
198                                            }
199                                            this.transactionManager.getTransaction().enlistResource(newSession.getXAResource());
200                                            try {
201                                                    if (log.isDebugEnabled()) {
202                                                            log.debug("-->> BINDING NEW SESSION WITH TRANSACTION...");
203                                                    }
204                                                    TransactionSynchronizationManager.bindResource(this.connectionInfo.getConnection(), newSession);
205                                                    try {
206                                                            if (log.isDebugEnabled()) {
207                                                                    log.debug("-->> REGISTERING SYNCHRONIZATION WITH TRANSACTION...");
208                                                            }
209                                                            TransactionSynchronizationManager.registerSynchronization(new Synchronization(newSession));
210                                                            return newSession;
211                                                    } catch (Throwable t) {
212                                                            if (log.isDebugEnabled()) {
213                                                                    log.debug("-->> CAUGHT EXCEPTION WHILE ASSOCIATING SESSION WITH TRANSACTION, UNBINDING RESOURCE.", t);
214                                                            }
215                                                            TransactionSynchronizationManager.unbindResource(connectionInfo.getConnection());
216                                                            newSession.setIgnoreClose(false);
217                                                            throw t;
218                                                    }
219                                            } catch (Throwable t) {
220                                                    if (log.isDebugEnabled()) {
221                                                            log.debug("-->> CAUGHT EXCEPTION WHILE ASSOCIATING SESSION WITH TRANSACTION (2), DELISTING RESOURCE...", t);
222                                                    }
223                                                    this.transactionManager.getTransaction().delistResource(newSession.getXAResource(), XAResource.TMSUCCESS);
224                                                    if (log.isDebugEnabled()) {
225                                                            log.debug("-->> DESTROYING SESSION AND REMOVING FROM POOL...");
226                                                    }
227                                                    newSession.destroyAndRemoveFromPool();
228                                                    if (log.isDebugEnabled()) {
229                                                            log.debug("-->> RETHROWING EXCEPTION...");
230                                                    }
231                                                    if (t instanceof JMSException) {
232                                                            throw (JMSException) t;
233                                                    } else if (t instanceof RuntimeException) {
234                                                            throw (RuntimeException) t;
235                                                    } else if (t instanceof Error) {
236                                                            throw (Error) t;
237                                                    } else {
238                                                            final JMSException jmsException = new JMSException("Unable to enlist session with transaction.");
239                                                            jmsException.initCause(t);
240                                                            throw jmsException;
241                                                    }
242                                            }
243                                    }
244                            } else {
245                                    if (log.isDebugEnabled())
246                                            log
247                                                            .debug("-->> THERE IS NO ACTIVE TRANSACTION, SO JUST RETURNING BORROWED SESSION...");
248                                    return this.sessionPool.borrowSession();
249                            }
250                    } catch (SystemException e) {
251                            final JMSException jmsException = new JMSException(
252                                            "System Exception");
253                            jmsException.initCause(e);
254                            throw jmsException;
255                    } catch (RollbackException re) {
256                            final JMSException jmsException = new JMSException(
257                                            "Rollback exception");
258                            jmsException.initCause(re);
259                            throw jmsException;
260                    }
261            }
262    
263            // Session factory methods
264            //-------------------------------------------------------------------------
265            public QueueSession createQueueSession(boolean transacted, int ackMode)
266                            throws JMSException {
267                    return (QueueSession) createSession(transacted, ackMode);
268            }
269    
270            public TopicSession createTopicSession(boolean transacted, int ackMode)
271                            throws JMSException {
272                    return (TopicSession) createSession(transacted, ackMode);
273            }
274    
275            public Session createSession(boolean transacted, int ackMode)
276                            throws JMSException {
277                    return createXASession();
278            }
279    
280            // Implementation methods
281            //-------------------------------------------------------------------------
282            protected XAConnection getConnection() throws JMSException {
283                    if (this.stopped || this.closed) {
284                            throw new JMSException("Already closed");
285                    }
286                    return this.connectionInfo.getConnection();
287            }
288    
289            public PooledSpringXAConnectionFactory getPooledConnectionFactory() {
290                    return this.pooledConnectionFactory;
291            }
292    
293            private class Synchronization implements TransactionSynchronization {
294                    private final PooledSpringXASession session;
295    
296                    private Synchronization(PooledSpringXASession session) {
297                            this.session = session;
298                    }
299    
300                    public void suspend() {
301                            if (log.isDebugEnabled()) {
302                                    log.debug("-->> PooledSpringXAConnection.[synchronization].suspend() CALLED...");
303                            }
304                            TransactionSynchronizationManager.unbindResource(connectionInfo.getConnection());
305                    }
306    
307                    public void resume() {
308                            if (log.isDebugEnabled()) {
309                                    log.debug("-->> PooledSpringXAConnection.[synchronization].resume() CALLED...");
310                            }
311                            TransactionSynchronizationManager.bindResource(connectionInfo.getConnection(), session);
312                    }
313    
314                    public void beforeCommit(boolean readOnly) {
315                    }
316    
317                    public void beforeCompletion() {
318                    }
319    
320                    public void afterCompletion(int status) {
321                            if (log.isDebugEnabled()) {
322                                    log.debug("-->> PooledSpringXAConnection.[synchronization].afterCompletion() CALLED...");
323                            }
324                            TransactionSynchronizationManager.unbindResource(connectionInfo.getConnection());
325                            try {
326                                    // This will return session to the pool.
327                                    if (log.isDebugEnabled()) {
328                                            log.debug("-->> RETURNING JMS SESSION TO POOL...");
329                                    }
330                                    session.setIgnoreClose(false);
331                                    session.close();
332                            } catch (JMSException e) {
333                                    throw new RuntimeException(e);
334                            }
335                    }
336            }
337    
338            private static class ConnectionInfo {
339                    private XAConnection connection;
340    
341                    private boolean actualClientIdSet;
342    
343                    private String actualClientIdBase;
344    
345                    public ConnectionInfo(final XAConnection connection) {
346                            this.connection = connection;
347                            this.actualClientIdSet = false;
348                            this.actualClientIdBase = null;
349                    }
350    
351                    public XAConnection getConnection() {
352                            return connection;
353                    }
354    
355                    public void setConnection(final XAConnection connection) {
356                            this.connection = connection;
357                    }
358    
359                    public synchronized boolean isActualClientIdSet() {
360                            return actualClientIdSet;
361                    }
362    
363                    public synchronized void setActualClientIdSet(
364                                    final boolean actualClientIdSet) {
365                            this.actualClientIdSet = actualClientIdSet;
366                    }
367    
368                    public synchronized String getActualClientIdBase() {
369                            return actualClientIdBase;
370                    }
371    
372                    public synchronized void setActualClientIdBase(
373                                    final String actualClientIdBase) {
374                            this.actualClientIdBase = actualClientIdBase;
375                    }
376            }
377    }