1 package org.codehaus.xfire.transport.local; 2 3 import java.io.IOException; 4 import java.io.OutputStream; 5 import java.io.PipedInputStream; 6 import java.io.PipedOutputStream; 7 8 import javax.xml.stream.XMLStreamReader; 9 import javax.xml.stream.XMLStreamWriter; 10 11 import org.codehaus.xfire.MessageContext; 12 import org.codehaus.xfire.XFire; 13 import org.codehaus.xfire.XFireException; 14 import org.codehaus.xfire.XFireRuntimeException; 15 import org.codehaus.xfire.exchange.InMessage; 16 import org.codehaus.xfire.exchange.OutMessage; 17 import org.codehaus.xfire.service.Service; 18 import org.codehaus.xfire.transport.AbstractChannel; 19 import org.codehaus.xfire.transport.Channel; 20 import org.codehaus.xfire.util.STAXUtils; 21 22 public class LocalChannel 23 extends AbstractChannel 24 { 25 private String uri; 26 protected static final String SENDER_URI = "senderUri"; 27 protected static final String OLD_CONTEXT = "urn:xfire:transport:local:oldContext"; 28 29 public LocalChannel(String uri, LocalTransport transport) 30 { 31 setUri(uri); 32 setTransport(transport); 33 } 34 35 public void open() 36 { 37 } 38 39 public void send(final MessageContext context, final OutMessage message) throws XFireException 40 { 41 if (message.getUri().equals(Channel.BACKCHANNEL_URI)) 42 { 43 final OutputStream out = (OutputStream) context.getProperty(Channel.BACKCHANNEL_URI); 44 if (out != null) 45 { 46 final XMLStreamWriter writer = STAXUtils.createXMLStreamWriter(out, message.getEncoding()); 47 48 message.getSerializer().writeMessage(message, writer, context); 49 } 50 else 51 { 52 MessageContext oldContext = (MessageContext) context.getProperty(OLD_CONTEXT); 53 54 sendViaNewChannel(context, oldContext, message, (String) context.getProperty(SENDER_URI)); 55 } 56 } 57 else 58 { 59 MessageContext receivingContext = new MessageContext(); 60 receivingContext.setXFire(context.getXFire()); 61 receivingContext.setService(getService(context.getXFire(), message.getUri())); 62 receivingContext.setProperty(OLD_CONTEXT, context); 63 receivingContext.setProperty(SENDER_URI, getUri()); 64 65 sendViaNewChannel(context, receivingContext, message, message.getUri()); 66 } 67 } 68 69 protected Service getService(XFire xfire, String uri) 70 { 71 int i = uri.indexOf("//"); 72 73 if (i == -1 || xfire == null) return null; 74 75 return xfire.getServiceRegistry().getService(uri.substring(i+2)); 76 } 77 78 private void sendViaNewChannel(final MessageContext context, 79 final MessageContext receivingContext, 80 final OutMessage message, 81 final String uri) throws XFireException 82 { 83 try 84 { 85 final PipedInputStream stream = new PipedInputStream(); 86 final PipedOutputStream outStream = new PipedOutputStream(stream); 87 88 final Channel channel; 89 try 90 { 91 channel = getTransport().createChannel(uri); 92 } 93 catch (Exception e) 94 { 95 throw new XFireException("Couldn't create channel.", e); 96 } 97 98 Thread writeThread = new Thread(new Runnable() 99 { 100 public void run() 101 { 102 try 103 { 104 final XMLStreamWriter writer = 105 STAXUtils.createXMLStreamWriter(outStream, message.getEncoding()); 106 message.getSerializer().writeMessage(message, writer, context); 107 108 writer.close(); 109 outStream.close(); 110 } 111 catch (Exception e) 112 { 113 throw new XFireRuntimeException("Couldn't write stream.", e); 114 } 115 }; 116 }); 117 118 Thread readThread = new Thread(new Runnable() 119 { 120 public void run() 121 { 122 try 123 { 124 final XMLStreamReader reader = STAXUtils.createXMLStreamReader(stream, message.getEncoding()); 125 final InMessage inMessage = new InMessage(reader, uri); 126 inMessage.setEncoding(message.getEncoding()); 127 128 channel.receive(receivingContext, inMessage); 129 130 reader.close(); 131 stream.close(); 132 } 133 catch (Exception e) 134 { 135 throw new XFireRuntimeException("Couldn't read stream.", e); 136 } 137 }; 138 }); 139 140 141 writeThread.start(); 142 readThread.start(); 143 } 144 catch (IOException e) 145 { 146 throw new XFireRuntimeException("Couldn't create stream.", e); 147 } 148 } 149 150 public void close() 151 { 152 } 153 }