001 /** 002 * 003 * Copyright 2005 LogicBlaze, Inc. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.logicblaze.lingo.jms.impl; 019 020 import EDU.oswego.cs.dl.util.concurrent.FutureResult; 021 import org.logicblaze.lingo.jms.FailedToProcessResponse; 022 import org.logicblaze.lingo.jms.JmsProducer; 023 import org.logicblaze.lingo.jms.ReplyHandler; 024 import org.logicblaze.lingo.jms.Requestor; 025 import org.apache.commons.logging.Log; 026 import org.apache.commons.logging.LogFactory; 027 028 import javax.jms.ConnectionFactory; 029 import javax.jms.Destination; 030 import javax.jms.JMSException; 031 import javax.jms.Message; 032 import javax.jms.MessageListener; 033 import javax.jms.Session; 034 import java.util.HashMap; 035 import java.util.Map; 036 037 /** 038 * A {@link org.logicblaze.lingo.jms.Requestor} which will use a single producer, consumer 039 * and temporary topic for resource efficiency, but will use correlation 040 * IDs on each message and response to ensure that each threads requests 041 * can occur synchronously. 042 * <p/> 043 * This class can be used concurrently by many different threads at the same time. 044 * 045 * @version $Revision: 1.4 $ 046 */ 047 public class MultiplexingRequestor extends SingleThreadedRequestor implements MessageListener { 048 private static final Log log = LogFactory.getLog(MultiplexingRequestor.class); 049 050 private Map requests = new HashMap(); 051 052 053 public static Requestor newInstance(ConnectionFactory connectionFactory, Destination serverDestination) throws JMSException { 054 DefaultJmsProducer producer = DefaultJmsProducer.newInstance(connectionFactory); 055 return new MultiplexingRequestor(producer.getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE), producer, serverDestination); 056 } 057 058 public static Requestor newInstance(ConnectionFactory connectionFactory, Destination serverDestination, Destination clientDestination) throws JMSException { 059 DefaultJmsProducer producer = DefaultJmsProducer.newInstance(connectionFactory); 060 return new MultiplexingRequestor(producer.getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE), producer, serverDestination, clientDestination); 061 } 062 063 public MultiplexingRequestor(Session session, JmsProducer producer, Destination serverDestination, Destination clientDestination) throws JMSException { 064 super(session, producer, serverDestination, clientDestination); 065 getReceiver().setMessageListener(this); 066 } 067 068 public MultiplexingRequestor(Session session, JmsProducer producer, Destination serverDestination) throws JMSException { 069 this(session, producer, serverDestination, null); 070 } 071 072 public void registerHandler(String correlationID, ReplyHandler handler) { 073 synchronized (this) { 074 requests.put(correlationID, handler); 075 } 076 } 077 078 public Message request(Destination destination, Message message) throws JMSException { 079 long timeout = getTimeToLive(); 080 return request(destination, message, timeout); 081 } 082 083 public Message request(Destination destination, Message message, long timeout) throws JMSException { 084 // lets create a correlationID 085 String correlationID = createCorrelationID(); 086 FutureResult future = new FutureResultHandler(); 087 synchronized (this) { 088 requests.put(correlationID, future); 089 } 090 message.setJMSCorrelationID(correlationID); 091 oneWay(destination, message); 092 093 try { 094 if (timeout < 0) { 095 return (Message) future.get(); 096 } 097 else if (timeout == 0) { 098 return (Message) future.peek(); 099 } 100 else { 101 return (Message) future.timedGet(timeout); 102 } 103 } 104 catch (Exception e) { 105 throw createJMSException(e); 106 } 107 } 108 109 /** 110 * Processes inbound responses from requests 111 */ 112 public void onMessage(Message message) { 113 try { 114 String correlationID = message.getJMSCorrelationID(); 115 116 // lets notify the monitor for this response 117 ReplyHandler handler = null; 118 synchronized (this) { 119 handler = (ReplyHandler) requests.get(correlationID); 120 } 121 if (handler == null) { 122 log.warn("Response received for unknown request: " + message); 123 } 124 else { 125 boolean complete = handler.handle(message); 126 if (complete) { 127 synchronized (this) { 128 requests.remove(correlationID); 129 } 130 } 131 } 132 } 133 catch (JMSException e) { 134 throw new FailedToProcessResponse(message, e); 135 } 136 137 } 138 139 // Lets ensure only one thread performs a send/receive at once 140 public synchronized Message receive(long timeout) throws JMSException { 141 return super.receive(timeout); 142 } 143 144 protected synchronized void doSend(Destination destination, Message message, long timeout) throws JMSException { 145 super.doSend(destination, message, timeout); 146 } 147 148 // Implementation methods 149 //------------------------------------------------------------------------- 150 151 protected JMSException createJMSException(Exception e) { 152 JMSException answer = new JMSException(e.toString()); 153 answer.setLinkedException(e); 154 return answer; 155 } 156 157 }