View Javadoc

1   package org.codehaus.xfire.transport.local;
2   
3   import java.io.IOException;
4   import java.io.OutputStream;
5   import java.io.PipedInputStream;
6   import java.io.PipedOutputStream;
7   
8   import javax.xml.stream.XMLStreamReader;
9   import javax.xml.stream.XMLStreamWriter;
10  
11  import org.codehaus.xfire.MessageContext;
12  import org.codehaus.xfire.XFire;
13  import org.codehaus.xfire.XFireException;
14  import org.codehaus.xfire.XFireRuntimeException;
15  import org.codehaus.xfire.exchange.InMessage;
16  import org.codehaus.xfire.exchange.OutMessage;
17  import org.codehaus.xfire.service.Service;
18  import org.codehaus.xfire.transport.AbstractChannel;
19  import org.codehaus.xfire.transport.Channel;
20  import org.codehaus.xfire.util.STAXUtils;
21  
22  public class LocalChannel
23      extends AbstractChannel
24  {
25      private String uri;
26      protected static final String SENDER_URI = "senderUri";
27      protected static final String OLD_CONTEXT = "urn:xfire:transport:local:oldContext";
28      
29      public LocalChannel(String uri, LocalTransport transport)
30      {
31          setUri(uri);
32          setTransport(transport);
33      }
34  
35      public void open()
36      {
37      }
38  
39      public void send(final MessageContext context, final OutMessage message) throws XFireException
40      {
41          if (message.getUri().equals(Channel.BACKCHANNEL_URI))
42          {
43              final OutputStream out = (OutputStream) context.getProperty(Channel.BACKCHANNEL_URI);
44              if (out != null)
45              {
46                  final XMLStreamWriter writer = STAXUtils.createXMLStreamWriter(out, message.getEncoding());
47  
48                  message.getSerializer().writeMessage(message, writer, context);
49              }
50              else
51              {
52                  MessageContext oldContext = (MessageContext) context.getProperty(OLD_CONTEXT);
53                  
54                  sendViaNewChannel(context, oldContext, message, (String) context.getProperty(SENDER_URI));
55              }
56          }
57          else
58          {
59              MessageContext receivingContext = new MessageContext();
60              receivingContext.setXFire(context.getXFire());
61              receivingContext.setService(getService(context.getXFire(), message.getUri()));
62              receivingContext.setProperty(OLD_CONTEXT, context);
63              receivingContext.setProperty(SENDER_URI, getUri());
64              
65              sendViaNewChannel(context, receivingContext, message, message.getUri());
66          }
67      }
68  
69      protected Service getService(XFire xfire, String uri)
70      {
71          int i = uri.indexOf("//");
72          
73          if (i == -1 || xfire == null) return null;
74          
75          return xfire.getServiceRegistry().getService(uri.substring(i+2));
76      }
77  
78      private void sendViaNewChannel(final MessageContext context,
79                                     final MessageContext receivingContext,
80                                     final OutMessage message,
81                                     final String uri) throws XFireException
82      {
83          try
84          {
85              final PipedInputStream stream = new PipedInputStream();
86              final PipedOutputStream outStream = new PipedOutputStream(stream);
87              
88              final Channel channel;
89              try
90              {
91                  channel = getTransport().createChannel(uri);
92              }
93              catch (Exception e)
94              {
95                  throw new XFireException("Couldn't create channel.", e);
96              }
97              
98              final Object readNotify = new Object();
99  
100             Thread writeThread = new Thread(new Runnable()
101             {
102                 public void run()
103                 {
104                     try
105                     {
106                         final XMLStreamWriter writer = 
107                             STAXUtils.createXMLStreamWriter(outStream, message.getEncoding());
108                         message.getSerializer().writeMessage(message, writer, context);
109 
110                         writer.close();
111                         outStream.close();
112                         
113                     }
114                     catch (Exception e)
115                     {
116                         throw new XFireRuntimeException("Couldn't write stream.", e);
117                     }
118                 };
119             });
120             
121             Thread readThread = new Thread(new Runnable() 
122             {
123                 public void run() 
124                 {
125                    try
126                     {
127                        final XMLStreamReader reader = STAXUtils.createXMLStreamReader(stream, message.getEncoding());
128                        final InMessage inMessage = new InMessage(reader, uri);
129                        inMessage.setEncoding(message.getEncoding());
130 
131                        channel.receive(receivingContext, inMessage);
132                        
133                        reader.close();
134                        stream.close();
135                     }
136                     catch (Exception e)
137                     {
138                         throw new XFireRuntimeException("Couldn't read stream.", e);
139                     }
140                     finally
141                     {
142                         synchronized (readNotify) { readNotify.notifyAll(); }
143                     }
144                 };
145             });
146 
147             writeThread.start();
148             readThread.start();
149             
150             synchronized (readNotify)
151             {
152                 try
153                 {
154                     readNotify.wait();
155                 }
156                 catch (InterruptedException e)
157                 {
158                 }
159             }
160         }
161         catch (IOException e)
162         {
163             throw new XFireRuntimeException("Couldn't create stream.", e);
164         }
165     }
166 
167     public void close()
168     {
169     }
170 }