View Javadoc

1   /***
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.transport.tcp;
20  
21  import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
22  import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
23  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.codehaus.activemq.message.WireFormat;
27  import org.codehaus.activemq.transport.TransportServerChannelSupport;
28  
29  import javax.jms.JMSException;
30  import java.io.IOException;
31  import java.net.InetAddress;
32  import java.net.ServerSocket;
33  import java.net.Socket;
34  import java.net.URI;
35  import java.net.UnknownHostException;
36  
37  /***
38   * Binds to a well known port and listens for Sockets ...
39   *
40   * @version $Revision: 1.17 $
41   */
42  public class TcpTransportServerChannel extends TransportServerChannelSupport implements Runnable {
43      private static final Log log = LogFactory.getLog(TcpTransportServerChannel.class);
44      protected static final int DEFEAULT_BACKLOG = 500;
45  
46      private WireFormat wireFormat;
47      protected String bindAddressURI;
48      private Thread serverSocketThread;
49      private ServerSocket serverSocket;
50      private SynchronizedBoolean closed;
51      private SynchronizedBoolean started;
52  
53      private boolean useAsyncSend = false;
54      private int maxOutstandingMessages = 10;
55      private int backlog = DEFEAULT_BACKLOG;
56  
57      /***
58       * Default Constructor
59       *
60       * @param bindAddr
61       * @throws JMSException
62       */
63      public TcpTransportServerChannel(WireFormat wireFormat, URI bindAddr) throws JMSException {
64          this.wireFormat = wireFormat;
65          this.bindAddressURI = bindAddr.toString();
66          closed = new SynchronizedBoolean(false);
67          started = new SynchronizedBoolean(false);
68          try {
69              serverSocket = createServerSocket(bindAddr);
70              log.info("Listening for connections at: " + bindAddr);
71          }
72          catch (Exception se) {
73              System.out.println(se);
74              se.printStackTrace();
75              JMSException jmsEx = new JMSException("Bind to " + bindAddressURI + " failed: " + se.getMessage());
76              jmsEx.setLinkedException(se);
77              throw jmsEx;
78          }
79      }
80  
81      public TcpTransportServerChannel(WireFormat wireFormat, ServerSocket serverSocket) {
82          this.wireFormat = wireFormat;
83          this.serverSocket = serverSocket;
84          closed = new SynchronizedBoolean(false);
85          started = new SynchronizedBoolean(false);
86          this.bindAddressURI = serverSocket.getInetAddress().toString();
87          log.info("Listening for connections at: " + bindAddressURI);
88      }
89  
90      /***
91       * close the ServerChannel
92       */
93      public void stop() {
94          if (closed.commit(false, true)) {
95              super.stop();
96              try {
97                  if (serverSocket != null) {
98                      serverSocket.close();
99                      serverSocketThread.join();
100                     serverSocketThread = null;
101                 }
102             }
103             catch (Throwable e) {
104                 log.warn("Caught while closing: " + e + ". Now Closed", e);
105             }
106         }
107     }
108 
109     /***
110      * start listeneing for events
111      *
112      * @throws JMSException if an error occurs
113      */
114     public void start() throws JMSException {
115         super.start();
116         if (started.commit(false, true)) {
117             serverSocketThread = new Thread(this, toString());
118             serverSocketThread.setDaemon(true);
119             serverSocketThread.start();
120         }
121     }
122 
123     /***
124      * @return pretty print of this
125      */
126     public String toString() {
127         return "TcpTransportServerChannel@" + bindAddressURI;
128     }
129 
130     /***
131      * pull Sockets from the ServerSocket
132      */
133     public void run() {
134         while (!closed.get()) {
135             Socket socket = null;
136             try {
137                 socket = serverSocket.accept();
138                 if (socket != null) {
139                     // have thread per channel for sending messages and a thread for receiving them
140                     PooledExecutor executor = null;
141                     if (useAsyncSend) {
142                         executor = new PooledExecutor(new BoundedBuffer(maxOutstandingMessages), 1);
143                     }
144                     TcpTransportChannel channel = new TcpTransportChannel(wireFormat, socket, executor);
145                     addClient(channel);
146                 }
147             }
148             catch (Exception e) {
149                 if (!closed.get()) {
150                     log.warn("run()", e);
151                 }
152             }
153         }
154     }
155 
156     // Properties
157     //-------------------------------------------------------------------------
158     public boolean isUseAsyncSend() {
159         return useAsyncSend;
160     }
161 
162     public void setUseAsyncSend(boolean useAsyncSend) {
163         this.useAsyncSend = useAsyncSend;
164     }
165 
166     public int getMaxOutstandingMessages() {
167         return maxOutstandingMessages;
168     }
169 
170     public void setMaxOutstandingMessages(int maxOutstandingMessages) {
171         this.maxOutstandingMessages = maxOutstandingMessages;
172     }
173 
174     public int getBacklog() {
175         return backlog;
176     }
177 
178     public void setBacklog(int backlog) {
179         this.backlog = backlog;
180     }
181 
182     // Implementation methods
183     //-------------------------------------------------------------------------
184 
185     /***
186      * Factory method to create a new ServerSocket
187      *
188      * @throws UnknownHostException
189      * @throws IOException
190      */
191     protected ServerSocket createServerSocket(URI bind) throws UnknownHostException, IOException {
192         ServerSocket answer = null;
193         String host = bind.getHost();
194         host = (host == null || host.length() == 0) ? "localhost" : host;
195         InetAddress addr = InetAddress.getByName(host);
196         if (addr.equals(InetAddress.getLocalHost())) {
197             answer = new ServerSocket(bind.getPort(), backlog);
198         }
199         else {
200             answer = new ServerSocket(bind.getPort(), backlog, addr);
201         }
202         return answer;
203     }
204 }