1 package org.codehaus.xfire.transport.local; 2 3 import java.io.IOException; 4 import java.io.OutputStream; 5 import java.io.PipedInputStream; 6 import java.io.PipedOutputStream; 7 8 import javax.xml.stream.XMLStreamReader; 9 import javax.xml.stream.XMLStreamWriter; 10 11 import org.codehaus.xfire.MessageContext; 12 import org.codehaus.xfire.XFire; 13 import org.codehaus.xfire.XFireException; 14 import org.codehaus.xfire.XFireRuntimeException; 15 import org.codehaus.xfire.exchange.InMessage; 16 import org.codehaus.xfire.exchange.OutMessage; 17 import org.codehaus.xfire.service.Service; 18 import org.codehaus.xfire.transport.AbstractChannel; 19 import org.codehaus.xfire.transport.Channel; 20 import org.codehaus.xfire.util.STAXUtils; 21 22 public class LocalChannel 23 extends AbstractChannel 24 { 25 private String uri; 26 protected static final String SENDER_URI = "senderUri"; 27 protected static final String OLD_CONTEXT = "urn:xfire:transport:local:oldContext"; 28 29 public LocalChannel(String uri, LocalTransport transport) 30 { 31 setUri(uri); 32 setTransport(transport); 33 } 34 35 public void open() 36 { 37 } 38 39 public void send(final MessageContext context, final OutMessage message) throws XFireException 40 { 41 if (message.getUri().equals(Channel.BACKCHANNEL_URI)) 42 { 43 final OutputStream out = (OutputStream) context.getProperty(Channel.BACKCHANNEL_URI); 44 if (out != null) 45 { 46 final XMLStreamWriter writer = STAXUtils.createXMLStreamWriter(out, message.getEncoding()); 47 48 message.getSerializer().writeMessage(message, writer, context); 49 } 50 else 51 { 52 MessageContext oldContext = (MessageContext) context.getProperty(OLD_CONTEXT); 53 54 sendViaNewChannel(context, oldContext, message, (String) context.getProperty(SENDER_URI)); 55 } 56 } 57 else 58 { 59 MessageContext receivingContext = new MessageContext(); 60 receivingContext.setXFire(context.getXFire()); 61 receivingContext.setService(getService(context.getXFire(), message.getUri())); 62 receivingContext.setProperty(OLD_CONTEXT, context); 63 receivingContext.setProperty(SENDER_URI, getUri()); 64 65 sendViaNewChannel(context, receivingContext, message, message.getUri()); 66 } 67 } 68 69 protected Service getService(XFire xfire, String uri) 70 { 71 int i = uri.indexOf("//"); 72 73 if (i == -1 || xfire == null) return null; 74 75 return xfire.getServiceRegistry().getService(uri.substring(i+2)); 76 } 77 78 private void sendViaNewChannel(final MessageContext context, 79 final MessageContext receivingContext, 80 final OutMessage message, 81 final String uri) throws XFireException 82 { 83 try 84 { 85 final PipedInputStream stream = new PipedInputStream(); 86 final PipedOutputStream outStream = new PipedOutputStream(stream); 87 88 final Channel channel; 89 try 90 { 91 channel = getTransport().createChannel(uri); 92 } 93 catch (Exception e) 94 { 95 throw new XFireException("Couldn't create channel.", e); 96 } 97 98 final Object readNotify = new Object(); 99 100 Thread writeThread = new Thread(new Runnable() 101 { 102 public void run() 103 { 104 try 105 { 106 final XMLStreamWriter writer = 107 STAXUtils.createXMLStreamWriter(outStream, message.getEncoding()); 108 message.getSerializer().writeMessage(message, writer, context); 109 110 writer.close(); 111 outStream.close(); 112 113 } 114 catch (Exception e) 115 { 116 throw new XFireRuntimeException("Couldn't write stream.", e); 117 } 118 }; 119 }); 120 121 Thread readThread = new Thread(new Runnable() 122 { 123 public void run() 124 { 125 try 126 { 127 final XMLStreamReader reader = STAXUtils.createXMLStreamReader(stream, message.getEncoding()); 128 final InMessage inMessage = new InMessage(reader, uri); 129 inMessage.setEncoding(message.getEncoding()); 130 131 channel.receive(receivingContext, inMessage); 132 133 reader.close(); 134 stream.close(); 135 } 136 catch (Exception e) 137 { 138 throw new XFireRuntimeException("Couldn't read stream.", e); 139 } 140 finally 141 { 142 synchronized (readNotify) { readNotify.notifyAll(); } 143 } 144 }; 145 }); 146 147 writeThread.start(); 148 readThread.start(); 149 150 synchronized (readNotify) 151 { 152 try 153 { 154 readNotify.wait(); 155 } 156 catch (InterruptedException e) 157 { 158 } 159 } 160 } 161 catch (IOException e) 162 { 163 throw new XFireRuntimeException("Couldn't create stream.", e); 164 } 165 } 166 167 public void close() 168 { 169 } 170 }