001    /*
002     * Copyright (c) 2005 Your Corporation. All Rights Reserved.
003     */
004    package org.activemq.transport.stomp;
005    
006    import EDU.oswego.cs.dl.util.concurrent.Channel;
007    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
008    import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
009    import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
010    import org.activemq.io.WireFormat;
011    import org.activemq.message.ActiveMQDestination;
012    import org.activemq.message.ActiveMQTextMessage;
013    import org.activemq.message.ConnectionInfo;
014    import org.activemq.message.ConsumerInfo;
015    import org.activemq.message.Packet;
016    import org.activemq.message.Receipt;
017    import org.activemq.message.SessionInfo;
018    import org.activemq.util.IdGenerator;
019    
020    import javax.jms.JMSException;
021    import javax.jms.Session;
022    import java.io.BufferedReader;
023    import java.io.DataInput;
024    import java.io.DataInputStream;
025    import java.io.DataOutput;
026    import java.io.DataOutputStream;
027    import java.io.IOException;
028    import java.io.InputStreamReader;
029    import java.net.DatagramPacket;
030    import java.net.ProtocolException;
031    import java.util.List;
032    import java.util.Map;
033    import java.util.Properties;
034    
035    /**
036     * Implements the TTMP protocol.
037     */
038    public class StompWireFormat implements WireFormat
039    {
040    
041        static final IdGenerator PACKET_IDS = new IdGenerator();
042        static final IdGenerator clientIds = new IdGenerator();
043    
044        private CommandParser commandParser = new CommandParser(this);
045        private HeaderParser headerParser = new HeaderParser();
046    
047        private DataInputStream in;
048    
049        private String clientId;
050    
051        private Channel pendingReadPackets = new LinkedQueue();
052        private Channel pendingWriteFrames = new LinkedQueue();
053        private List receiptListeners = new CopyOnWriteArrayList();
054        private String transactionId;
055        private short sessionId;
056        private Map subscriptions = new ConcurrentHashMap();
057    
058    
059        void addReceiptListener(ReceiptListener listener)
060        {
061            receiptListeners.add(listener);
062        }
063    
064    
065        public Packet readPacket(DataInput in) throws IOException
066        {
067            Packet pending = (Packet) AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.HelperWithReturn()
068            {
069                public Object cycle() throws InterruptedException
070                {
071                    return pendingReadPackets.poll(0);
072                }
073            });
074            if (pending != null)
075            {
076                return pending;
077            }
078    
079            try
080            {
081                return commandParser.parse(in);
082            }
083            catch (ProtocolException e)
084            {
085                sendError(e.getMessage());
086                return null;
087            }
088        }
089    
090        public Packet writePacket(final Packet packet, final DataOutput out) throws IOException, JMSException
091        {
092            flushPendingFrames(out);
093    
094            if (packet.getPacketType() == Packet.RECEIPT_INFO)
095            {
096                assert(packet instanceof Receipt);
097                Receipt receipt = (Receipt) packet;
098                for (int i = 0; i < receiptListeners.size(); i++)
099                {
100                    ReceiptListener listener = (ReceiptListener) receiptListeners.get(i);
101                    if (listener.onReceipt(receipt, out))
102                    {
103                        receiptListeners.remove(listener);
104                        return null;
105                    }
106                }
107            }
108    
109            if (packet.getPacketType() == Packet.ACTIVEMQ_TEXT_MESSAGE)
110            {
111                assert(packet instanceof ActiveMQTextMessage);
112                ActiveMQTextMessage msg = (ActiveMQTextMessage) packet;
113                Subscription sub = (Subscription) subscriptions.get(msg.getJMSDestination());
114                sub.receive(msg, out);
115            }
116            return null;
117        }
118    
119        private void flushPendingFrames(final DataOutput out) throws IOException
120        {
121            boolean interrupted = false;
122            do
123            {
124                try
125                {
126                    String frame = (String) pendingWriteFrames.poll(0);
127                    if (frame == null) return;
128                    out.writeBytes(frame);
129                }
130                catch (InterruptedException e)
131                {
132                    interrupted = true;
133                }
134            }
135            while (interrupted);
136        }
137    
138        private void sendError(final String message)
139        {
140            AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper()
141            {
142                public void cycle() throws InterruptedException
143                {
144                    pendingWriteFrames.put(new FrameBuilder(Stomp.Responses.ERROR)
145                            .addHeader(Stomp.Headers.Error.MESSAGE, message)
146                            .toFrame());
147                }
148            });
149        }
150    
151        /**
152         * some transports may register their streams (e.g. Tcp)
153         *
154         * @param dataOut
155         * @param dataIn
156         */
157        public void registerTransportStreams(DataOutputStream dataOut, DataInputStream dataIn)
158        {
159            this.in = dataIn;
160        }
161    
162        /**
163         * Some wire formats require a handshake at start-up
164         *
165         * @throws java.io.IOException
166         */
167        public void initiateServerSideProtocol() throws IOException
168        {
169            BufferedReader in = new BufferedReader(new InputStreamReader(this.in));
170            String first_line = in.readLine();
171            if (!first_line.startsWith(Stomp.Commands.CONNECT))
172            {
173                throw new IOException("First line does not begin with with " + Stomp.Commands.CONNECT);
174            }
175    
176            Properties headers = headerParser.parse(in);
177            //if (!headers.containsKey(TTMP.Headers.Connect.LOGIN))
178            //    System.err.println("Required header [" + TTMP.Headers.Connect.LOGIN + "] missing");
179            //if (!headers.containsKey(TTMP.Headers.Connect.PASSCODE))
180            //    System.err.println("Required header [" + TTMP.Headers.Connect.PASSCODE + "] missing");
181    
182            // allow anyone to login for now
183    
184            String login = headers.getProperty(Stomp.Headers.Connect.LOGIN);
185            String passcode = headers.getProperty(Stomp.Headers.Connect.PASSCODE);
186    
187            // skip to end of the packet
188            while (in.read() != 0) {}
189            final ConnectionInfo info = new ConnectionInfo();
190            final Short packet_id = new Short(PACKET_IDS.getNextShortSequence());
191            clientId = clientIds.generateId();
192            commandParser.setClientId(clientId);
193    
194            info.setClientId(clientId);
195            info.setReceiptRequired(true);
196            info.setClientVersion(Integer.toString(getCurrentWireFormatVersion()));
197            info.setClosed(false);
198            info.setHostName("ttmp.fake.host.name");
199            info.setId(packet_id.shortValue());
200            info.setUserName(login);
201            info.setPassword(passcode);
202            info.setStartTime(System.currentTimeMillis());
203            info.setStarted(true);
204    
205            AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper()
206            {
207                public void cycle() throws InterruptedException
208                {
209                    pendingReadPackets.put(info);
210                }
211            });
212    
213            addReceiptListener(new ReceiptListener()
214            {
215                public boolean onReceipt(Receipt receipt, DataOutput out)
216                {
217                    if (receipt.getCorrelationId() != packet_id.shortValue()) return false;
218                    final Short session_packet_id = new Short(PACKET_IDS.getNextShortSequence());
219                    sessionId = clientIds.getNextShortSequence();
220                    commandParser.setSessionId(sessionId);
221    
222                    final SessionInfo info = new SessionInfo();
223                    info.setStartTime(System.currentTimeMillis());
224                    info.setId(session_packet_id.shortValue());
225                    info.setClientId(clientId);
226                    info.setSessionId(sessionId);
227                    info.setStarted(true);
228                    info.setSessionMode(Session.AUTO_ACKNOWLEDGE);
229                    info.setReceiptRequired(true);
230    
231                    AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper()
232                    {
233                        public void cycle() throws InterruptedException
234                        {
235                            pendingReadPackets.put(info);
236                        }
237                    });
238    
239                    addReceiptListener(new ReceiptListener()
240                    {
241                        public boolean onReceipt(Receipt receipt, DataOutput out) throws IOException
242                        {
243                            if (receipt.getCorrelationId() != session_packet_id.shortValue()) return false;
244                            StringBuffer buffer = new StringBuffer();
245                            buffer.append(Stomp.Responses.CONNECTED).append(Stomp.NEWLINE);
246                            buffer.append(Stomp.Headers.Connected.SESSION)
247                                    .append(Stomp.Headers.SEPERATOR)
248                                    .append(clientId)
249                                    .append(Stomp.NEWLINE)
250                                    .append(Stomp.NEWLINE);
251                            buffer.append(Stomp.NULL);
252                            out.writeBytes(buffer.toString());
253                            return true;
254                        }
255                    });
256    
257                    return true;
258                }
259            });
260        }
261    
262        /**
263         * Creates a new copy of this wire format so it can be used in another thread/context
264         */
265        public WireFormat copy()
266        {
267            return new StompWireFormat();
268        }
269    
270        /* Stuff below here is leaky stuff we don't actually need */
271    
272        /**
273         * Some wire formats require a handshake at start-up
274         *
275         * @throws java.io.IOException
276         */
277        public void initiateClientSideProtocol() throws IOException
278        {
279            throw new UnsupportedOperationException("Not yet implemented!");
280        }
281    
282        /**
283         * Can this wireformat process packets of this version
284         *
285         * @param version the version number to test
286         * @return true if can accept the version
287         */
288        public boolean canProcessWireFormatVersion(int version)
289        {
290            return version == getCurrentWireFormatVersion();
291        }
292    
293        /**
294         * @return the current version of this wire format
295         */
296        public int getCurrentWireFormatVersion()
297        {
298            return 1;
299        }
300    
301        /**
302         * @return Returns the enableCaching.
303         */
304        public boolean isCachingEnabled()
305        {
306            return false;
307        }
308    
309        /**
310         * @param enableCaching The enableCaching to set.
311         */
312        public void setCachingEnabled(boolean enableCaching)
313        {
314            // never
315        }
316    
317        /**
318         * some wire formats will implement their own fragementation
319         *
320         * @return true unless a wire format supports it's own fragmentation
321         */
322        public boolean doesSupportMessageFragmentation()
323        {
324            return false;
325        }
326    
327        /**
328         * Some wire formats will not be able to understand compressed messages
329         *
330         * @return true unless a wire format cannot understand compression
331         */
332        public boolean doesSupportMessageCompression()
333        {
334            return false;
335        }
336    
337        /**
338         * Writes the given package to a new datagram
339         *
340         * @param channelID is the unique channel ID
341         * @param packet    is the packet to write
342         * @return
343         * @throws java.io.IOException
344         * @throws javax.jms.JMSException
345         */
346        public DatagramPacket writePacket(String channelID, Packet packet) throws IOException, JMSException
347        {
348            throw new UnsupportedOperationException("Will not be implemented");
349        }
350    
351        /**
352         * Reads the packet from the given byte[]
353         *
354         * @param bytes
355         * @param offset
356         * @param length
357         * @return
358         * @throws java.io.IOException
359         */
360        public Packet fromBytes(byte[] bytes, int offset, int length) throws IOException
361        {
362            throw new UnsupportedOperationException("Will not be implemented");
363        }
364    
365        /**
366         * Reads the packet from the given byte[]
367         *
368         * @param bytes
369         * @return
370         * @throws java.io.IOException
371         */
372        public Packet fromBytes(byte[] bytes) throws IOException
373        {
374            throw new UnsupportedOperationException("Will not be implemented");
375        }
376    
377        /**
378         * A helper method which converts a packet into a byte array
379         *
380         * @param packet
381         * @return a byte array representing the packet using some wire protocol
382         * @throws java.io.IOException
383         * @throws javax.jms.JMSException
384         */
385        public byte[] toBytes(Packet packet) throws IOException, JMSException
386        {
387            throw new UnsupportedOperationException("Will not be implemented");
388        }
389    
390        /**
391         * A helper method for working with sockets where the first byte is read
392         * first, then the rest of the message is read.
393         * <p/>
394         * Its common when dealing with sockets to have different timeout semantics
395         * until the first non-zero byte is read of a message, after which
396         * time a zero timeout is used.
397         *
398         * @param firstByte the first byte of the packet
399         * @param in        the rest of the packet
400         * @return
401         * @throws java.io.IOException
402         */
403        public Packet readPacket(int firstByte, DataInput in) throws IOException
404        {
405            throw new UnsupportedOperationException("Will not be implemented");
406        }
407    
408        /**
409         * Read a packet from a Datagram packet from the given channelID. If the
410         * packet is from the same channel ID as it was sent then we have a
411         * loop-back so discard the packet
412         *
413         * @param channelID is the unique channel ID
414         * @param dpacket
415         * @return the packet read from the datagram or null if it should be
416         *         discarded
417         * @throws java.io.IOException
418         */
419        public Packet readPacket(String channelID, DatagramPacket dpacket) throws IOException
420        {
421            throw new UnsupportedOperationException("Will not be implemented");
422        }
423    
424        boolean isInTransaction()
425        {
426            return transactionId != null;
427        }
428    
429        void setTransactionId(String transactionId)
430        {
431            this.transactionId = transactionId;
432        }
433    
434        String getTransactionId()
435        {
436            return transactionId;
437        }
438    
439        void clearTransactionId()
440        {
441            this.transactionId = null;
442        }
443    
444        String getClientId()
445        {
446            return this.clientId;
447        }
448    
449        public short getSessionId()
450        {
451            return sessionId;
452        }
453    
454        public void addSubscription(Subscription s)
455        {
456            if (subscriptions.containsKey(s.getDestination()))
457            {
458                Subscription old = (Subscription) subscriptions.get(s.getDestination());
459                ConsumerInfo p = old.close();
460                enqueuePacket(p);
461                subscriptions.put(s.getDestination(), s);
462            }
463            else
464            {
465                subscriptions.put(s.getDestination(), s);
466            }
467        }
468    
469        public void enqueuePacket(final Packet ack)
470        {
471            AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper()
472            {
473                public void cycle() throws InterruptedException
474                {
475                    pendingReadPackets.put(ack);
476                }
477            });
478        }
479    
480        public Subscription getSubscriptionFor(ActiveMQDestination destination)
481        {
482            return (Subscription) subscriptions.get(destination);
483        }
484    }