001 /* 002 * Copyright (c) 2005 Your Corporation. All Rights Reserved. 003 */ 004 package org.activemq.transport.stomp; 005 006 import EDU.oswego.cs.dl.util.concurrent.Channel; 007 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 008 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList; 009 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 010 import org.activemq.io.WireFormat; 011 import org.activemq.message.ActiveMQDestination; 012 import org.activemq.message.ActiveMQTextMessage; 013 import org.activemq.message.ConnectionInfo; 014 import org.activemq.message.ConsumerInfo; 015 import org.activemq.message.Packet; 016 import org.activemq.message.Receipt; 017 import org.activemq.message.SessionInfo; 018 import org.activemq.util.IdGenerator; 019 020 import javax.jms.JMSException; 021 import javax.jms.Session; 022 import java.io.BufferedReader; 023 import java.io.DataInput; 024 import java.io.DataInputStream; 025 import java.io.DataOutput; 026 import java.io.DataOutputStream; 027 import java.io.IOException; 028 import java.io.InputStreamReader; 029 import java.net.DatagramPacket; 030 import java.net.ProtocolException; 031 import java.util.List; 032 import java.util.Map; 033 import java.util.Properties; 034 035 /** 036 * Implements the TTMP protocol. 037 */ 038 public class StompWireFormat implements WireFormat 039 { 040 041 static final IdGenerator PACKET_IDS = new IdGenerator(); 042 static final IdGenerator clientIds = new IdGenerator(); 043 044 private CommandParser commandParser = new CommandParser(this); 045 private HeaderParser headerParser = new HeaderParser(); 046 047 private DataInputStream in; 048 049 private String clientId; 050 051 private Channel pendingReadPackets = new LinkedQueue(); 052 private Channel pendingWriteFrames = new LinkedQueue(); 053 private List receiptListeners = new CopyOnWriteArrayList(); 054 private String transactionId; 055 private short sessionId; 056 private Map subscriptions = new ConcurrentHashMap(); 057 058 059 void addReceiptListener(ReceiptListener listener) 060 { 061 receiptListeners.add(listener); 062 } 063 064 065 public Packet readPacket(DataInput in) throws IOException 066 { 067 Packet pending = (Packet) AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.HelperWithReturn() 068 { 069 public Object cycle() throws InterruptedException 070 { 071 return pendingReadPackets.poll(0); 072 } 073 }); 074 if (pending != null) 075 { 076 return pending; 077 } 078 079 try 080 { 081 return commandParser.parse(in); 082 } 083 catch (ProtocolException e) 084 { 085 sendError(e.getMessage()); 086 return null; 087 } 088 } 089 090 public Packet writePacket(final Packet packet, final DataOutput out) throws IOException, JMSException 091 { 092 flushPendingFrames(out); 093 094 if (packet.getPacketType() == Packet.RECEIPT_INFO) 095 { 096 assert(packet instanceof Receipt); 097 Receipt receipt = (Receipt) packet; 098 for (int i = 0; i < receiptListeners.size(); i++) 099 { 100 ReceiptListener listener = (ReceiptListener) receiptListeners.get(i); 101 if (listener.onReceipt(receipt, out)) 102 { 103 receiptListeners.remove(listener); 104 return null; 105 } 106 } 107 } 108 109 if (packet.getPacketType() == Packet.ACTIVEMQ_TEXT_MESSAGE) 110 { 111 assert(packet instanceof ActiveMQTextMessage); 112 ActiveMQTextMessage msg = (ActiveMQTextMessage) packet; 113 Subscription sub = (Subscription) subscriptions.get(msg.getJMSDestination()); 114 sub.receive(msg, out); 115 } 116 return null; 117 } 118 119 private void flushPendingFrames(final DataOutput out) throws IOException 120 { 121 boolean interrupted = false; 122 do 123 { 124 try 125 { 126 String frame = (String) pendingWriteFrames.poll(0); 127 if (frame == null) return; 128 out.writeBytes(frame); 129 } 130 catch (InterruptedException e) 131 { 132 interrupted = true; 133 } 134 } 135 while (interrupted); 136 } 137 138 private void sendError(final String message) 139 { 140 AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper() 141 { 142 public void cycle() throws InterruptedException 143 { 144 pendingWriteFrames.put(new FrameBuilder(Stomp.Responses.ERROR) 145 .addHeader(Stomp.Headers.Error.MESSAGE, message) 146 .toFrame()); 147 } 148 }); 149 } 150 151 /** 152 * some transports may register their streams (e.g. Tcp) 153 * 154 * @param dataOut 155 * @param dataIn 156 */ 157 public void registerTransportStreams(DataOutputStream dataOut, DataInputStream dataIn) 158 { 159 this.in = dataIn; 160 } 161 162 /** 163 * Some wire formats require a handshake at start-up 164 * 165 * @throws java.io.IOException 166 */ 167 public void initiateServerSideProtocol() throws IOException 168 { 169 BufferedReader in = new BufferedReader(new InputStreamReader(this.in)); 170 String first_line = in.readLine(); 171 if (!first_line.startsWith(Stomp.Commands.CONNECT)) 172 { 173 throw new IOException("First line does not begin with with " + Stomp.Commands.CONNECT); 174 } 175 176 Properties headers = headerParser.parse(in); 177 //if (!headers.containsKey(TTMP.Headers.Connect.LOGIN)) 178 // System.err.println("Required header [" + TTMP.Headers.Connect.LOGIN + "] missing"); 179 //if (!headers.containsKey(TTMP.Headers.Connect.PASSCODE)) 180 // System.err.println("Required header [" + TTMP.Headers.Connect.PASSCODE + "] missing"); 181 182 // allow anyone to login for now 183 184 String login = headers.getProperty(Stomp.Headers.Connect.LOGIN); 185 String passcode = headers.getProperty(Stomp.Headers.Connect.PASSCODE); 186 187 // skip to end of the packet 188 while (in.read() != 0) {} 189 final ConnectionInfo info = new ConnectionInfo(); 190 final Short packet_id = new Short(PACKET_IDS.getNextShortSequence()); 191 clientId = clientIds.generateId(); 192 commandParser.setClientId(clientId); 193 194 info.setClientId(clientId); 195 info.setReceiptRequired(true); 196 info.setClientVersion(Integer.toString(getCurrentWireFormatVersion())); 197 info.setClosed(false); 198 info.setHostName("ttmp.fake.host.name"); 199 info.setId(packet_id.shortValue()); 200 info.setUserName(login); 201 info.setPassword(passcode); 202 info.setStartTime(System.currentTimeMillis()); 203 info.setStarted(true); 204 205 AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper() 206 { 207 public void cycle() throws InterruptedException 208 { 209 pendingReadPackets.put(info); 210 } 211 }); 212 213 addReceiptListener(new ReceiptListener() 214 { 215 public boolean onReceipt(Receipt receipt, DataOutput out) 216 { 217 if (receipt.getCorrelationId() != packet_id.shortValue()) return false; 218 final Short session_packet_id = new Short(PACKET_IDS.getNextShortSequence()); 219 sessionId = clientIds.getNextShortSequence(); 220 commandParser.setSessionId(sessionId); 221 222 final SessionInfo info = new SessionInfo(); 223 info.setStartTime(System.currentTimeMillis()); 224 info.setId(session_packet_id.shortValue()); 225 info.setClientId(clientId); 226 info.setSessionId(sessionId); 227 info.setStarted(true); 228 info.setSessionMode(Session.AUTO_ACKNOWLEDGE); 229 info.setReceiptRequired(true); 230 231 AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper() 232 { 233 public void cycle() throws InterruptedException 234 { 235 pendingReadPackets.put(info); 236 } 237 }); 238 239 addReceiptListener(new ReceiptListener() 240 { 241 public boolean onReceipt(Receipt receipt, DataOutput out) throws IOException 242 { 243 if (receipt.getCorrelationId() != session_packet_id.shortValue()) return false; 244 StringBuffer buffer = new StringBuffer(); 245 buffer.append(Stomp.Responses.CONNECTED).append(Stomp.NEWLINE); 246 buffer.append(Stomp.Headers.Connected.SESSION) 247 .append(Stomp.Headers.SEPERATOR) 248 .append(clientId) 249 .append(Stomp.NEWLINE) 250 .append(Stomp.NEWLINE); 251 buffer.append(Stomp.NULL); 252 out.writeBytes(buffer.toString()); 253 return true; 254 } 255 }); 256 257 return true; 258 } 259 }); 260 } 261 262 /** 263 * Creates a new copy of this wire format so it can be used in another thread/context 264 */ 265 public WireFormat copy() 266 { 267 return new StompWireFormat(); 268 } 269 270 /* Stuff below here is leaky stuff we don't actually need */ 271 272 /** 273 * Some wire formats require a handshake at start-up 274 * 275 * @throws java.io.IOException 276 */ 277 public void initiateClientSideProtocol() throws IOException 278 { 279 throw new UnsupportedOperationException("Not yet implemented!"); 280 } 281 282 /** 283 * Can this wireformat process packets of this version 284 * 285 * @param version the version number to test 286 * @return true if can accept the version 287 */ 288 public boolean canProcessWireFormatVersion(int version) 289 { 290 return version == getCurrentWireFormatVersion(); 291 } 292 293 /** 294 * @return the current version of this wire format 295 */ 296 public int getCurrentWireFormatVersion() 297 { 298 return 1; 299 } 300 301 /** 302 * @return Returns the enableCaching. 303 */ 304 public boolean isCachingEnabled() 305 { 306 return false; 307 } 308 309 /** 310 * @param enableCaching The enableCaching to set. 311 */ 312 public void setCachingEnabled(boolean enableCaching) 313 { 314 // never 315 } 316 317 /** 318 * some wire formats will implement their own fragementation 319 * 320 * @return true unless a wire format supports it's own fragmentation 321 */ 322 public boolean doesSupportMessageFragmentation() 323 { 324 return false; 325 } 326 327 /** 328 * Some wire formats will not be able to understand compressed messages 329 * 330 * @return true unless a wire format cannot understand compression 331 */ 332 public boolean doesSupportMessageCompression() 333 { 334 return false; 335 } 336 337 /** 338 * Writes the given package to a new datagram 339 * 340 * @param channelID is the unique channel ID 341 * @param packet is the packet to write 342 * @return 343 * @throws java.io.IOException 344 * @throws javax.jms.JMSException 345 */ 346 public DatagramPacket writePacket(String channelID, Packet packet) throws IOException, JMSException 347 { 348 throw new UnsupportedOperationException("Will not be implemented"); 349 } 350 351 /** 352 * Reads the packet from the given byte[] 353 * 354 * @param bytes 355 * @param offset 356 * @param length 357 * @return 358 * @throws java.io.IOException 359 */ 360 public Packet fromBytes(byte[] bytes, int offset, int length) throws IOException 361 { 362 throw new UnsupportedOperationException("Will not be implemented"); 363 } 364 365 /** 366 * Reads the packet from the given byte[] 367 * 368 * @param bytes 369 * @return 370 * @throws java.io.IOException 371 */ 372 public Packet fromBytes(byte[] bytes) throws IOException 373 { 374 throw new UnsupportedOperationException("Will not be implemented"); 375 } 376 377 /** 378 * A helper method which converts a packet into a byte array 379 * 380 * @param packet 381 * @return a byte array representing the packet using some wire protocol 382 * @throws java.io.IOException 383 * @throws javax.jms.JMSException 384 */ 385 public byte[] toBytes(Packet packet) throws IOException, JMSException 386 { 387 throw new UnsupportedOperationException("Will not be implemented"); 388 } 389 390 /** 391 * A helper method for working with sockets where the first byte is read 392 * first, then the rest of the message is read. 393 * <p/> 394 * Its common when dealing with sockets to have different timeout semantics 395 * until the first non-zero byte is read of a message, after which 396 * time a zero timeout is used. 397 * 398 * @param firstByte the first byte of the packet 399 * @param in the rest of the packet 400 * @return 401 * @throws java.io.IOException 402 */ 403 public Packet readPacket(int firstByte, DataInput in) throws IOException 404 { 405 throw new UnsupportedOperationException("Will not be implemented"); 406 } 407 408 /** 409 * Read a packet from a Datagram packet from the given channelID. If the 410 * packet is from the same channel ID as it was sent then we have a 411 * loop-back so discard the packet 412 * 413 * @param channelID is the unique channel ID 414 * @param dpacket 415 * @return the packet read from the datagram or null if it should be 416 * discarded 417 * @throws java.io.IOException 418 */ 419 public Packet readPacket(String channelID, DatagramPacket dpacket) throws IOException 420 { 421 throw new UnsupportedOperationException("Will not be implemented"); 422 } 423 424 boolean isInTransaction() 425 { 426 return transactionId != null; 427 } 428 429 void setTransactionId(String transactionId) 430 { 431 this.transactionId = transactionId; 432 } 433 434 String getTransactionId() 435 { 436 return transactionId; 437 } 438 439 void clearTransactionId() 440 { 441 this.transactionId = null; 442 } 443 444 String getClientId() 445 { 446 return this.clientId; 447 } 448 449 public short getSessionId() 450 { 451 return sessionId; 452 } 453 454 public void addSubscription(Subscription s) 455 { 456 if (subscriptions.containsKey(s.getDestination())) 457 { 458 Subscription old = (Subscription) subscriptions.get(s.getDestination()); 459 ConsumerInfo p = old.close(); 460 enqueuePacket(p); 461 subscriptions.put(s.getDestination(), s); 462 } 463 else 464 { 465 subscriptions.put(s.getDestination(), s); 466 } 467 } 468 469 public void enqueuePacket(final Packet ack) 470 { 471 AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper() 472 { 473 public void cycle() throws InterruptedException 474 { 475 pendingReadPackets.put(ack); 476 } 477 }); 478 } 479 480 public Subscription getSubscriptionFor(ActiveMQDestination destination) 481 { 482 return (Subscription) subscriptions.get(destination); 483 } 484 }