001 /** 002 * 003 * Copyright 2005 LogicBlaze, Inc. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.logicblaze.lingo.jms; 019 020 import org.logicblaze.lingo.LingoInvocation; 021 import org.logicblaze.lingo.LingoRemoteInvocationFactory; 022 import org.logicblaze.lingo.MetadataStrategy; 023 import org.logicblaze.lingo.MethodMetadata; 024 import org.logicblaze.lingo.SimpleMetadataStrategy; 025 import org.logicblaze.lingo.jms.marshall.DefaultMarshaller; 026 import org.logicblaze.lingo.jms.marshall.Marshaller; 027 import org.apache.commons.logging.Log; 028 import org.apache.commons.logging.LogFactory; 029 import org.springframework.beans.factory.InitializingBean; 030 import org.springframework.remoting.support.RemoteInvocation; 031 import org.springframework.remoting.support.RemoteInvocationBasedExporter; 032 import org.springframework.remoting.support.RemoteInvocationFactory; 033 import org.springframework.remoting.support.RemoteInvocationResult; 034 035 import javax.jms.JMSException; 036 import javax.jms.Message; 037 import javax.jms.MessageListener; 038 import javax.jms.ObjectMessage; 039 import javax.jms.Session; 040 041 /** 042 * @version $Revision: 1.2 $ 043 */ 044 public abstract class JmsServiceExporterSupport extends RemoteInvocationBasedExporter implements MessageListener, InitializingBean { 045 private static final Log log = LogFactory.getLog(JmsServiceExporterSupport.class); 046 047 protected Object proxy; 048 private boolean ignoreFailures; 049 private Marshaller marshaller; 050 private MetadataStrategy metadataStrategy; 051 private RemoteInvocationFactory invocationFactory; 052 private Requestor responseRequestor; 053 054 public void afterPropertiesSet() throws Exception { 055 this.proxy = getProxyForService(); 056 if (proxy == null) { 057 throw new IllegalArgumentException("proxy is required"); 058 } 059 if (responseRequestor == null) { 060 throw new IllegalArgumentException("responseRequestor is required"); 061 } 062 if (marshaller == null) { 063 marshaller = new DefaultMarshaller(); 064 } 065 if (metadataStrategy == null) { 066 metadataStrategy = new SimpleMetadataStrategy(true); 067 } 068 if (invocationFactory == null) { 069 invocationFactory = new LingoRemoteInvocationFactory(metadataStrategy); 070 } 071 } 072 073 public void onMessage(Message message) { 074 try { 075 RemoteInvocation invocation = marshaller.readRemoteInvocation(message); 076 if (invocation != null) { 077 boolean oneway = false; 078 if (invocation instanceof LingoInvocation) { 079 LingoInvocation lingoInvocation = (LingoInvocation) invocation; 080 oneway = lingoInvocation.getMetadata().isOneWay(); 081 introduceRemoteReferences(lingoInvocation, message); 082 } 083 RemoteInvocationResult result = invokeAndCreateResult(invocation, this.proxy); 084 if (!oneway) { 085 writeRemoteInvocationResult(message, result); 086 } 087 } 088 } 089 catch (JMSException e) { 090 onException(message, e); 091 } 092 } 093 094 // Properties 095 //------------------------------------------------------------------------- 096 public Requestor getResponseRequestor() { 097 return responseRequestor; 098 } 099 100 public void setResponseRequestor(Requestor responseRequestor) { 101 this.responseRequestor = responseRequestor; 102 } 103 104 public Marshaller getMarshaller() { 105 return marshaller; 106 } 107 108 public void setMarshaller(Marshaller marshaller) { 109 this.marshaller = marshaller; 110 } 111 112 public RemoteInvocationFactory getInvocationFactory() { 113 return invocationFactory; 114 } 115 116 public void setInvocationFactory(RemoteInvocationFactory invocationFactory) { 117 this.invocationFactory = invocationFactory; 118 } 119 120 public boolean isIgnoreFailures() { 121 return ignoreFailures; 122 } 123 124 /** 125 * Sets whether or not failures should be ignored (and just logged) or thrown as 126 * runtime exceptions into the JMS provider 127 */ 128 public void setIgnoreFailures(boolean ignoreFailures) { 129 this.ignoreFailures = ignoreFailures; 130 } 131 132 133 // Implementation methods 134 //------------------------------------------------------------------------- 135 136 /** 137 * Send the given RemoteInvocationResult as a JMS message to the originator 138 * 139 * @param message current HTTP message 140 * @param result the RemoteInvocationResult object 141 * @throws javax.jms.JMSException if thrown by trying to send the message 142 */ 143 protected abstract void writeRemoteInvocationResult(Message message, RemoteInvocationResult result) throws JMSException; 144 145 /** 146 * Creates the invocation result response message 147 * 148 * @param session the JMS session to use 149 * @param message the original request message, in case we want to attach any properties etc. 150 * @param result the invocation result 151 * @return the message response to send 152 * @throws javax.jms.JMSException if creating the messsage failed 153 */ 154 protected Message createResponseMessage(Session session, Message message, RemoteInvocationResult result) throws JMSException { 155 // an alternative strategy could be to use XStream and text messages 156 // though some JMS providers, like ActiveMQ, might do this kind of thing for us under the covers 157 if (result == null) { 158 throw new IllegalArgumentException("result cannot be null"); 159 } 160 ObjectMessage answer = session.createObjectMessage(result); 161 162 // lets preserve the correlation ID 163 answer.setJMSCorrelationID(message.getJMSCorrelationID()); 164 return answer; 165 } 166 167 /** 168 * Lets replace any remote object correlation IDs with dynamic proxies 169 * 170 * @param invocation 171 * @param requestMessage 172 */ 173 protected void introduceRemoteReferences(LingoInvocation invocation, Message requestMessage) throws JMSException { 174 MethodMetadata metadata = invocation.getMetadata(); 175 Object[] arguments = invocation.getArguments(); 176 Class[] parameterTypes = invocation.getParameterTypes(); 177 for (int i = 0; i < parameterTypes.length; i++) { 178 if (metadata.isRemoteParameter(i)) { 179 arguments[i] = createRemoteProxy(requestMessage, parameterTypes[i], arguments[i]); 180 } 181 } 182 } 183 184 protected Object createRemoteProxy(Message message, Class parameterType, Object argument) throws JMSException { 185 JmsProxyFactoryBean factory = new JmsProxyFactoryBean(); 186 factory.setDestination(message.getJMSReplyTo()); 187 factory.setCorrelationID((String) argument); 188 factory.setRemoteInvocationFactory(invocationFactory); 189 factory.setServiceInterface(parameterType); 190 factory.setRequestor(responseRequestor); 191 factory.afterPropertiesSet(); 192 return factory.getObject(); 193 } 194 195 196 /** 197 * Handle the processing of an exception when processing an inbound messsage 198 */ 199 protected void onException(Message message, JMSException e) { 200 String text = "Failed to process inbound message due to: " + e + ". Message will be discarded: " + message; 201 log.info(text, e); 202 if (!ignoreFailures) { 203 throw new RuntimeException(text, e); 204 } 205 } 206 }