View Javadoc

1   /***
2    *
3    * Copyright 2004 Hiram Chirino
4    * Copyright 2004 Protique Ltd
5    *
6    * Licensed under the Apache License, Version 2.0 (the "License");
7    * you may not use this file except in compliance with the License.
8    * You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   *
18   **/
19  package org.codehaus.activemq.transport.gnet;
20  
21  import EDU.oswego.cs.dl.util.concurrent.Latch;
22  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.geronimo.network.SelectorManager;
26  import org.apache.geronimo.network.protocol.AcceptableProtocol;
27  import org.apache.geronimo.network.protocol.ProtocolFactory;
28  import org.apache.geronimo.network.protocol.ProtocolFactory.AcceptedCallBack;
29  import org.apache.geronimo.network.protocol.ServerSocketAcceptor;
30  import org.apache.geronimo.network.protocol.SocketProtocol;
31  import org.apache.geronimo.pool.ClockPool;
32  import org.apache.geronimo.pool.ThreadPool;
33  import org.codehaus.activemq.message.WireFormat;
34  import org.codehaus.activemq.transport.TransportServerChannel;
35  import org.codehaus.activemq.transport.TransportServerChannelSupport;
36  
37  import javax.jms.JMSException;
38  import java.net.URI;
39  
40  /***
41   * An implementation of TransportServerChannel which uses
42   * the Geronimo network layer for connectivity.
43   *
44   * @version $Revision: 1.10 $
45   */
46  public class GTransportServerChannel extends TransportServerChannelSupport implements TransportServerChannel {
47      protected static final int BACKLOG = 500;
48      private static final Log log = LogFactory.getLog(GTransportServerChannel.class);
49  
50      private WireFormat wireFormat;
51      protected String bindAddressURI;
52      private SynchronizedBoolean closed;
53      private ThreadPool tp;
54      private ClockPool cp;
55      private SelectorManager sm;
56      private ServerSocketAcceptor ssa;
57      private ProtocolFactory pf;
58      private Latch startLatch;
59  
60      /***
61       * Default Constructor
62       *
63       * @param bindAddr
64       * @throws JMSException
65       */
66      public GTransportServerChannel(WireFormat wireFormat, URI bindAddr, SelectorManager selectorManager, ThreadPool threadPool, ClockPool clockPool) throws Exception {
67          this.wireFormat = wireFormat;
68          this.sm = selectorManager;
69          this.tp = threadPool;
70          this.cp = clockPool;
71  
72          closed = new SynchronizedBoolean(false);
73          startLatch = new Latch();
74  
75  /*
76          ControlServerProtocolStack templateStack = new ControlServerProtocolStack();
77  */
78          SocketProtocol spt = new SocketProtocol();
79          spt.setTimeout(30 * 1000);
80          spt.setSelectorManager(sm);
81  /*
82          templateStack.push(spt);
83  
84          ControlServerProtocol csp = new ControlServerProtocol();
85          csp.setTimeout(30 * 1000);
86          csp.setThreadPool(tp);
87          csp.setClockPool(cp);
88          csp.setSelectorManager(sm);
89          csp.setControlServerListener(new ControlServerListener() {
90              public void shutdown() {
91                  log.trace("SERVER SIDE SHUTDOWN");
92              }
93          });
94  
95          templateStack.push(csp);
96          ControlServerProtocolWaiter waiter = new ControlServerProtocolWaiter();
97          waiter.push(new CountingProtocol());
98          templateStack.push(waiter);
99  */
100 
101         pf = new ProtocolFactory();
102         pf.setClockPool(cp);
103         pf.setMaxAge(Long.MAX_VALUE);
104         pf.setMaxInactivity(Long.MAX_VALUE);
105         //pf.setReclaimPeriod(Long.MAX_VALUE);
106         pf.setReclaimPeriod(10 * 1000);
107         pf.setTemplate(spt);
108 //        pf.setTemplate(templateStack);
109         pf.setAcceptedCallBack(createAcceptedCallBack());
110 
111         ssa = new ServerSocketAcceptor();
112         ssa.setSelectorManager(sm);
113         ssa.setTimeOut(5 * 1000);
114         ssa.setUri(bindAddr);
115         ssa.setAcceptorListener(pf);
116     }
117 
118     /***
119      * @return
120      */
121     private AcceptedCallBack createAcceptedCallBack() {
122         return new AcceptedCallBack() {
123             public void accepted(AcceptableProtocol p) {
124                 try {
125                     // Wait for start to be called before accepting connections..
126                     startLatch.acquire();
127 
128                     if (p != null) {
129                         GTransportChannel channel = new GTransportChannel(wireFormat, p, tp);
130                         addClient(channel);
131                     }
132 
133                 }
134                 catch (Exception e) {
135                     log.error("Caught while attempting to add new protocol: " + e, e);
136                 }
137             }
138         };
139     }
140 
141     /***
142      * close the ServerChannel
143      */
144     public void stop() {
145         if (closed.commit(false, true)) {
146             super.stop();
147             try {
148                 ssa.drain();
149                 pf.drain();
150             }
151             catch (Throwable e) {
152                 log.trace("error closing GTransportServerChannel", e);
153             }
154         }
155     }
156 
157     /***
158      * start listeneing for events
159      *
160      * @throws JMSException if an error occurs
161      */
162     public void start() throws JMSException {
163         super.start();
164         try {
165             ssa.startup();
166         }
167         catch (Exception e) {
168             JMSException jmsEx = new JMSException("Could not start ServerSocketAcceptor: " + e);
169             jmsEx.setLinkedException(e);
170             throw jmsEx;
171         }
172         startLatch.release();
173     }
174 
175 
176     /***
177      * @return pretty print of this
178      */
179     public String toString() {
180         return "GTransportServerChannel@" + bindAddressURI;
181     }
182 
183 }