1 /***
2 *
3 * Copyright 2005 LogicBlaze, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.logicblaze.lingo.jms.impl;
19
20 import EDU.oswego.cs.dl.util.concurrent.FutureResult;
21 import org.logicblaze.lingo.jms.FailedToProcessResponse;
22 import org.logicblaze.lingo.jms.JmsProducer;
23 import org.logicblaze.lingo.jms.ReplyHandler;
24 import org.logicblaze.lingo.jms.Requestor;
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27
28 import javax.jms.ConnectionFactory;
29 import javax.jms.Destination;
30 import javax.jms.JMSException;
31 import javax.jms.Message;
32 import javax.jms.MessageListener;
33 import javax.jms.Session;
34 import java.util.HashMap;
35 import java.util.Map;
36
37 /***
38 * A {@link org.logicblaze.lingo.jms.Requestor} which will use a single producer, consumer
39 * and temporary topic for resource efficiency, but will use correlation
40 * IDs on each message and response to ensure that each threads requests
41 * can occur synchronously.
42 * <p/>
43 * This class can be used concurrently by many different threads at the same time.
44 *
45 * @version $Revision: 1.4 $
46 */
47 public class MultiplexingRequestor extends SingleThreadedRequestor implements MessageListener {
48 private static final Log log = LogFactory.getLog(MultiplexingRequestor.class);
49
50 private Map requests = new HashMap();
51
52
53 public static Requestor newInstance(ConnectionFactory connectionFactory, Destination serverDestination) throws JMSException {
54 DefaultJmsProducer producer = DefaultJmsProducer.newInstance(connectionFactory);
55 return new MultiplexingRequestor(producer.getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE), producer, serverDestination);
56 }
57
58 public static Requestor newInstance(ConnectionFactory connectionFactory, Destination serverDestination, Destination clientDestination) throws JMSException {
59 DefaultJmsProducer producer = DefaultJmsProducer.newInstance(connectionFactory);
60 return new MultiplexingRequestor(producer.getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE), producer, serverDestination, clientDestination);
61 }
62
63 public MultiplexingRequestor(Session session, JmsProducer producer, Destination serverDestination, Destination clientDestination) throws JMSException {
64 super(session, producer, serverDestination, clientDestination);
65 getReceiver().setMessageListener(this);
66 }
67
68 public MultiplexingRequestor(Session session, JmsProducer producer, Destination serverDestination) throws JMSException {
69 this(session, producer, serverDestination, null);
70 }
71
72 public void registerHandler(String correlationID, ReplyHandler handler) {
73 synchronized (this) {
74 requests.put(correlationID, handler);
75 }
76 }
77
78 public Message request(Destination destination, Message message) throws JMSException {
79 long timeout = getTimeToLive();
80 return request(destination, message, timeout);
81 }
82
83 public Message request(Destination destination, Message message, long timeout) throws JMSException {
84
85 String correlationID = createCorrelationID();
86 FutureResult future = new FutureResultHandler();
87 synchronized (this) {
88 requests.put(correlationID, future);
89 }
90 message.setJMSCorrelationID(correlationID);
91 oneWay(destination, message);
92
93 try {
94 if (timeout < 0) {
95 return (Message) future.get();
96 }
97 else if (timeout == 0) {
98 return (Message) future.peek();
99 }
100 else {
101 return (Message) future.timedGet(timeout);
102 }
103 }
104 catch (Exception e) {
105 throw createJMSException(e);
106 }
107 }
108
109 /***
110 * Processes inbound responses from requests
111 */
112 public void onMessage(Message message) {
113 try {
114 String correlationID = message.getJMSCorrelationID();
115
116
117 ReplyHandler handler = null;
118 synchronized (this) {
119 handler = (ReplyHandler) requests.get(correlationID);
120 }
121 if (handler == null) {
122 log.warn("Response received for unknown request: " + message);
123 }
124 else {
125 boolean complete = handler.handle(message);
126 if (complete) {
127 synchronized (this) {
128 requests.remove(correlationID);
129 }
130 }
131 }
132 }
133 catch (JMSException e) {
134 throw new FailedToProcessResponse(message, e);
135 }
136
137 }
138
139
140 public synchronized Message receive(long timeout) throws JMSException {
141 return super.receive(timeout);
142 }
143
144 protected synchronized void doSend(Destination destination, Message message, long timeout) throws JMSException {
145 super.doSend(destination, message, timeout);
146 }
147
148
149
150
151 protected JMSException createJMSException(Exception e) {
152 JMSException answer = new JMSException(e.toString());
153 answer.setLinkedException(e);
154 return answer;
155 }
156
157 }