001 /** 002 * 003 * Copyright 2005 LogicBlaze, Inc. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.logicblaze.lingo.jms.impl; 019 020 import org.logicblaze.lingo.jms.JmsProducer; 021 import org.logicblaze.lingo.jms.Requestor; 022 023 import javax.jms.Connection; 024 import javax.jms.ConnectionFactory; 025 import javax.jms.Destination; 026 import javax.jms.JMSException; 027 import javax.jms.Message; 028 import javax.jms.MessageConsumer; 029 import javax.jms.Session; 030 import javax.jms.TemporaryQueue; 031 import javax.jms.TemporaryTopic; 032 033 /** 034 * A simple {@link org.logicblaze.lingo.jms.Requestor} which can only be used by one thread at once 035 * and only used for one message exchange at once. 036 * 037 * @version $Revision: 1.3 $ 038 */ 039 public class SingleThreadedRequestor extends OneWayRequestor { 040 private Connection connection; 041 private Session session; 042 private Destination inboundDestination; 043 private MessageConsumer receiver; 044 045 046 public static Requestor newInstance(ConnectionFactory connectionFactory, Destination serverDestination) throws JMSException { 047 JmsProducer producer = DefaultJmsProducer.newInstance(connectionFactory); 048 return new SingleThreadedRequestor(producer.getSession(), producer, serverDestination); 049 } 050 051 public SingleThreadedRequestor(Session session, JmsProducer producer, Destination serverDestination, Destination clientDestination) throws JMSException { 052 super(producer, serverDestination); 053 this.session = session; 054 this.inboundDestination = clientDestination; 055 if (inboundDestination == null) { 056 inboundDestination = createTemporaryDestination(session); 057 } 058 receiver = session.createConsumer(inboundDestination); 059 } 060 061 public SingleThreadedRequestor(Session session, JmsProducer producer, Destination serverDestination) throws JMSException { 062 this(session, producer, serverDestination, null); 063 } 064 065 public Message request(Destination destination, Message message) throws JMSException { 066 oneWay(destination, message); 067 long timeout = getTimeToLive(); 068 return receive(timeout); 069 } 070 071 public Message request(Destination destination, Message message, long timeout) throws JMSException { 072 oneWay(destination, message, timeout); 073 return receive(timeout); 074 } 075 076 public Message receive(long timeout) throws JMSException { 077 if (timeout < 0) { 078 return receiver.receive(); 079 } 080 else if (timeout == 0) { 081 return receiver.receiveNoWait(); 082 } 083 return receiver.receive(timeout); 084 } 085 086 public synchronized void close() throws JMSException { 087 // producer and consumer created by constructor are implicitly closed. 088 session.close(); 089 if (inboundDestination instanceof TemporaryQueue) { 090 ((TemporaryQueue) inboundDestination).delete(); 091 } 092 else if (inboundDestination instanceof TemporaryTopic) { 093 ((TemporaryTopic) inboundDestination).delete(); 094 } 095 super.close(); 096 097 if (connection != null) { 098 connection.close(); 099 } 100 connection = null; 101 session = null; 102 inboundDestination = null; 103 } 104 105 106 // Implementation methods 107 //------------------------------------------------------------------------- 108 protected TemporaryQueue createTemporaryDestination(Session session) throws JMSException { 109 return session.createTemporaryQueue(); 110 } 111 112 protected void populateHeaders(Message message) throws JMSException { 113 message.setJMSReplyTo(inboundDestination); 114 } 115 116 protected MessageConsumer getReceiver() { 117 return receiver; 118 } 119 120 }