001    /**
002     *
003     * Copyright 2005 LogicBlaze, Inc.
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     * http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     *
017     **/
018    package org.logicblaze.lingo.jms.impl;
019    
020    import org.logicblaze.lingo.jms.JmsProducer;
021    import org.logicblaze.lingo.jms.Requestor;
022    
023    import javax.jms.Connection;
024    import javax.jms.ConnectionFactory;
025    import javax.jms.Destination;
026    import javax.jms.JMSException;
027    import javax.jms.Message;
028    import javax.jms.MessageConsumer;
029    import javax.jms.Session;
030    import javax.jms.TemporaryQueue;
031    import javax.jms.TemporaryTopic;
032    
033    /**
034     * A simple {@link org.logicblaze.lingo.jms.Requestor} which can only be used by one thread at once
035     * and only used for one message exchange at once.
036     *
037     * @version $Revision: 1.3 $
038     */
039    public class SingleThreadedRequestor extends OneWayRequestor {
040        private Connection connection;
041        private Session session;
042        private Destination inboundDestination;
043        private MessageConsumer receiver;
044    
045    
046        public static Requestor newInstance(ConnectionFactory connectionFactory, Destination serverDestination) throws JMSException {
047            JmsProducer producer = DefaultJmsProducer.newInstance(connectionFactory);
048            return new SingleThreadedRequestor(producer.getSession(), producer, serverDestination);
049        }
050    
051        public SingleThreadedRequestor(Session session, JmsProducer producer, Destination serverDestination, Destination clientDestination) throws JMSException {
052            super(producer, serverDestination);
053            this.session = session;
054            this.inboundDestination = clientDestination;
055            if (inboundDestination == null) {
056                inboundDestination = createTemporaryDestination(session);
057            }
058            receiver = session.createConsumer(inboundDestination);
059        }
060    
061        public SingleThreadedRequestor(Session session, JmsProducer producer, Destination serverDestination) throws JMSException {
062            this(session, producer, serverDestination, null);
063        }
064    
065        public Message request(Destination destination, Message message) throws JMSException {
066            oneWay(destination, message);
067            long timeout = getTimeToLive();
068            return receive(timeout);
069        }
070    
071        public Message request(Destination destination, Message message, long timeout) throws JMSException {
072            oneWay(destination, message, timeout);
073            return receive(timeout);
074        }
075    
076        public Message receive(long timeout) throws JMSException {
077            if (timeout < 0) {
078                return receiver.receive();
079            }
080            else if (timeout == 0) {
081                return receiver.receiveNoWait();
082            }
083            return receiver.receive(timeout);
084        }
085    
086        public synchronized void close() throws JMSException {
087            // producer and consumer created by constructor are implicitly closed.
088            session.close();
089            if (inboundDestination instanceof TemporaryQueue) {
090                ((TemporaryQueue) inboundDestination).delete();
091            }
092            else if (inboundDestination instanceof TemporaryTopic) {
093                ((TemporaryTopic) inboundDestination).delete();
094            }
095            super.close();
096    
097            if (connection != null) {
098                connection.close();
099            }
100            connection = null;
101            session = null;
102            inboundDestination = null;
103        }
104    
105    
106        // Implementation methods
107        //-------------------------------------------------------------------------
108        protected TemporaryQueue createTemporaryDestination(Session session) throws JMSException {
109            return session.createTemporaryQueue();
110        }
111    
112        protected void populateHeaders(Message message) throws JMSException {
113            message.setJMSReplyTo(inboundDestination);
114        }
115    
116        protected MessageConsumer getReceiver() {
117            return receiver;
118        }
119    
120    }