View Javadoc

1   package org.codehaus.xfire.client;
2   
3   import java.util.ArrayList;
4   import java.util.List;
5   
6   import javax.xml.stream.XMLStreamException;
7   import javax.xml.stream.XMLStreamReader;
8   
9   import org.apache.commons.logging.Log;
10  import org.apache.commons.logging.LogFactory;
11  import org.codehaus.xfire.MessageContext;
12  import org.codehaus.xfire.XFire;
13  import org.codehaus.xfire.XFireException;
14  import org.codehaus.xfire.XFireRuntimeException;
15  import org.codehaus.xfire.exchange.InMessage;
16  import org.codehaus.xfire.exchange.OutMessage;
17  import org.codehaus.xfire.exchange.RobustInOutExchange;
18  import org.codehaus.xfire.fault.XFireFault;
19  import org.codehaus.xfire.handler.AbstractHandlerSupport;
20  import org.codehaus.xfire.handler.HandlerPipeline;
21  import org.codehaus.xfire.handler.OutMessageSender;
22  import org.codehaus.xfire.handler.ParseMessageHandler;
23  import org.codehaus.xfire.handler.Phase;
24  import org.codehaus.xfire.service.OperationInfo;
25  import org.codehaus.xfire.service.Service;
26  import org.codehaus.xfire.service.binding.AbstractBinding;
27  import org.codehaus.xfire.service.binding.ObjectBinding;
28  import org.codehaus.xfire.transport.Channel;
29  import org.codehaus.xfire.transport.ChannelEndpoint;
30  import org.codehaus.xfire.transport.Transport;
31  
32  public class Client
33      extends AbstractHandlerSupport
34      implements ChannelEndpoint
35  {
36      private static final Log log = LogFactory.getLog(Client.class);
37  
38      private Object[] response;
39      private Transport transport;
40      private Service service;
41      private ObjectBinding clientBinding;
42      private String url;
43      private int timeout = 10*1000;
44      private MessageContext context;
45      private XFireFault fault;
46      private String endpointUri;
47      private List inPhases;
48      private List outPhases;
49     
50      /*** The XFire instance. This is only needed when invoking local services. */
51      private XFire xfire;
52      
53      public Client(Transport transport, Service service, String url)
54      {
55          this(transport, service, url, null);
56      }
57      
58      public Client(Transport transport, Service service, String url, String endpointUri)
59      {
60          this.transport = transport;
61          this.url = url;
62          this.endpointUri = endpointUri;
63          
64          clientBinding = (ObjectBinding) ((AbstractBinding) service.getBinding()).clone();
65          clientBinding.setClientModeOn(true);
66          
67          // Create a service clone
68          this.service = new Service(service.getServiceInfo());
69          this.service.setBinding(clientBinding);
70          this.service.setFaultSerializer(service.getFaultSerializer());
71          this.service.setSoapVersion(service.getSoapVersion());
72          
73          inPhases = new ArrayList();
74          outPhases = new ArrayList();
75          createPhases();
76          
77          addOutHandler(new OutMessageSender());
78          addInHandler(new ParseMessageHandler());
79      }
80  
81      public Object[] invoke(OperationInfo op, Object[] params) throws XFireException, XFireFault
82      {
83          try
84          {
85              OutMessage msg = new OutMessage("targeturl");
86              msg.setBody(params);
87              msg.setUri(url);
88              msg.setSerializer(service.getBinding());
89              msg.setAction(op.getAction());
90              msg.setChannel(getOutChannel());
91              
92              context = new MessageContext();
93              context.setService(service);
94              context.setXFire(xfire);
95              
96              RobustInOutExchange exchange = new RobustInOutExchange(context);
97              exchange.setOperation(op);
98              exchange.setOutMessage(msg);
99              context.setExchange(exchange);
100             
101             HandlerPipeline pipeline = new HandlerPipeline(outPhases);
102             pipeline.addHandlers(getOutHandlers());
103             pipeline.addHandlers(transport.getOutHandlers());
104         
105             pipeline.invoke(context);
106         }
107         catch (Exception e1)
108         {
109             throw XFireFault.createFault(e1);
110         }
111         
112         /***
113          * If this is an asynchronous channel, we'll need to sleep() and wait
114          * for a response. Channels such as HTTP will have the response set
115          * by the time we get to this point.
116          */
117         if (response == null && fault == null)
118         {
119             int count = 0;
120             while (response == null && fault == null && count < timeout)
121             {
122                 try
123                 {
124                     Thread.sleep(50);
125                     count += 50;
126                 }
127                 catch (InterruptedException e)
128                 {
129                     break;
130                 }
131             }
132         }
133 
134         if (fault != null)
135         {
136             XFireFault localFault = fault;
137             fault = null;
138             throw localFault;
139         }
140         
141         Object[] localResponse = response;
142         response = null;
143         return localResponse;
144     }
145     
146     public void onReceive(MessageContext recvContext, InMessage msg)
147     {
148         if (log.isDebugEnabled()) log.debug("Received message to " + msg.getUri());
149         
150         if (context.getExchange() == null)
151         {
152             context.setExchange(new RobustInOutExchange(context));
153         }
154         
155         RobustInOutExchange exchange = (RobustInOutExchange) context.getExchange();
156         exchange.setInMessage(msg);
157         
158         HandlerPipeline pipeline = new HandlerPipeline(inPhases);
159         pipeline.addHandlers(getInHandlers());
160         pipeline.addHandlers(transport.getInHandlers());
161         
162         try
163         {
164             pipeline.invoke(context);
165             
166             finishReadingMessage(msg, context);
167             
168             response = ((List) msg.getBody()).toArray();
169         }
170         catch (Exception e1)
171         {
172             XFireFault fault = XFireFault.createFault(e1);
173             pipeline.handleFault(fault, context);
174             
175             this.fault = fault;
176         }
177     }
178     
179     public void finishReadingMessage(InMessage message, MessageContext context)
180         throws XFireFault
181     {
182         XMLStreamReader reader = message.getXMLStreamReader();
183 
184         try
185         {
186             while (reader.hasNext()) reader.next();
187         }
188         catch (XMLStreamException e)
189         {
190             throw new XFireFault("Couldn't parse message.", e, XFireFault.SENDER);
191         }
192     }
193     
194     protected void createPhases()
195     {
196         inPhases = new ArrayList();
197         inPhases.add(new Phase(Phase.TRANSPORT, 1000));
198         inPhases.add(new Phase(Phase.PARSE, 2000));
199         inPhases.add(new Phase(Phase.PRE_DISPATCH, 3000));
200         inPhases.add(new Phase(Phase.DISPATCH, 4000));
201         inPhases.add(new Phase(Phase.POLICY, 5000));
202         inPhases.add(new Phase(Phase.USER, 6000));
203         inPhases.add(new Phase(Phase.PRE_INVOKE, 7000));
204         inPhases.add(new Phase(Phase.SERVICE, 8000));
205 
206         outPhases = new ArrayList();
207         outPhases.add(new Phase(Phase.POST_INVOKE, 1000));
208         outPhases.add(new Phase(Phase.POLICY, 2000));
209         outPhases.add(new Phase(Phase.USER, 3000));
210         outPhases.add(new Phase(Phase.TRANSPORT, 4000));
211         outPhases.add(new Phase(Phase.SEND, 5000));
212     }
213 
214     public Channel getOutChannel()
215     {
216         Channel channel = null;
217         try
218         {
219             String uri = getEndpointUri();
220             if (uri == null)
221                 channel = getTransport().createChannel();
222             else
223                 channel = getTransport().createChannel(uri);
224         }
225         catch (Exception e)
226         {
227             throw new XFireRuntimeException("Couldn't open channel.", e);
228         }
229 
230         channel.setEndpoint(this);
231         
232         return channel;
233     }
234     
235     public Transport getTransport()
236     {
237         return transport;
238     }
239 
240     public void receive(Object response)
241     {
242         this.response = ((List) response).toArray();
243     }
244     
245     public Service getService()
246     {
247         return service;
248     }
249 
250     public String getUrl()
251     {
252         return url;
253     }
254 
255     public void setUrl(String url)
256     {
257         this.url = url;
258     }
259 
260     public String getEndpointUri()
261     {
262         return endpointUri;
263     }
264 
265     public void setEndpointUri(String endpointUri)
266     {
267         this.endpointUri = endpointUri;
268     }
269 
270     public int getTimeout()
271     {
272         return timeout;
273     }
274 
275     public void setTimeout(int timeout)
276     {
277         this.timeout = timeout;
278     }
279 
280     public void receiveFault(XFireFault fault)
281     {
282         this.fault = fault;
283     }
284 
285     public XFire getXFire()
286     {
287         return xfire;
288     }
289 
290     public void setXFire(XFire xfire)
291     {
292         this.xfire = xfire;
293     }
294 }