1 /***
2 *
3 * Copyright 2005 LogicBlaze, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.logicblaze.lingo.jms.impl;
19
20 import org.logicblaze.lingo.jms.JmsProducer;
21 import org.logicblaze.lingo.jms.Requestor;
22
23 import javax.jms.Connection;
24 import javax.jms.ConnectionFactory;
25 import javax.jms.Destination;
26 import javax.jms.JMSException;
27 import javax.jms.Message;
28 import javax.jms.MessageConsumer;
29 import javax.jms.Session;
30 import javax.jms.TemporaryQueue;
31 import javax.jms.TemporaryTopic;
32
33 /***
34 * A simple {@link org.logicblaze.lingo.jms.Requestor} which can only be used by one thread at once
35 * and only used for one message exchange at once.
36 *
37 * @version $Revision: 1.3 $
38 */
39 public class SingleThreadedRequestor extends OneWayRequestor {
40 private Connection connection;
41 private Session session;
42 private Destination inboundDestination;
43 private MessageConsumer receiver;
44
45
46 public static Requestor newInstance(ConnectionFactory connectionFactory, Destination serverDestination) throws JMSException {
47 JmsProducer producer = DefaultJmsProducer.newInstance(connectionFactory);
48 return new SingleThreadedRequestor(producer.getSession(), producer, serverDestination);
49 }
50
51 public SingleThreadedRequestor(Session session, JmsProducer producer, Destination serverDestination, Destination clientDestination) throws JMSException {
52 super(producer, serverDestination);
53 this.session = session;
54 this.inboundDestination = clientDestination;
55 if (inboundDestination == null) {
56 inboundDestination = createTemporaryDestination(session);
57 }
58 receiver = session.createConsumer(inboundDestination);
59 }
60
61 public SingleThreadedRequestor(Session session, JmsProducer producer, Destination serverDestination) throws JMSException {
62 this(session, producer, serverDestination, null);
63 }
64
65 public Message request(Destination destination, Message message) throws JMSException {
66 oneWay(destination, message);
67 long timeout = getTimeToLive();
68 return receive(timeout);
69 }
70
71 public Message request(Destination destination, Message message, long timeout) throws JMSException {
72 oneWay(destination, message, timeout);
73 return receive(timeout);
74 }
75
76 public Message receive(long timeout) throws JMSException {
77 if (timeout < 0) {
78 return receiver.receive();
79 }
80 else if (timeout == 0) {
81 return receiver.receiveNoWait();
82 }
83 return receiver.receive(timeout);
84 }
85
86 public synchronized void close() throws JMSException {
87
88 session.close();
89 if (inboundDestination instanceof TemporaryQueue) {
90 ((TemporaryQueue) inboundDestination).delete();
91 }
92 else if (inboundDestination instanceof TemporaryTopic) {
93 ((TemporaryTopic) inboundDestination).delete();
94 }
95 super.close();
96
97 if (connection != null) {
98 connection.close();
99 }
100 connection = null;
101 session = null;
102 inboundDestination = null;
103 }
104
105
106
107
108 protected TemporaryQueue createTemporaryDestination(Session session) throws JMSException {
109 return session.createTemporaryQueue();
110 }
111
112 protected void populateHeaders(Message message) throws JMSException {
113 message.setJMSReplyTo(inboundDestination);
114 }
115
116 protected MessageConsumer getReceiver() {
117 return receiver;
118 }
119
120 }