1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 * Copyright 2004 Protique Ltd
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 **/
19 package org.codehaus.activemq.transport.gnet;
20
21 import EDU.oswego.cs.dl.util.concurrent.Latch;
22 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.geronimo.network.SelectorManager;
26 import org.apache.geronimo.network.protocol.AcceptableProtocol;
27 import org.apache.geronimo.network.protocol.ProtocolFactory;
28 import org.apache.geronimo.network.protocol.ProtocolFactory.AcceptedCallBack;
29 import org.apache.geronimo.network.protocol.ServerSocketAcceptor;
30 import org.apache.geronimo.network.protocol.SocketProtocol;
31 import org.apache.geronimo.pool.ClockPool;
32 import org.apache.geronimo.pool.ThreadPool;
33 import org.codehaus.activemq.message.WireFormat;
34 import org.codehaus.activemq.transport.TransportServerChannel;
35 import org.codehaus.activemq.transport.TransportServerChannelSupport;
36
37 import javax.jms.JMSException;
38 import java.net.URI;
39
40 /***
41 * An implementation of TransportServerChannel which uses
42 * the Geronimo network layer for connectivity.
43 *
44 * @version $Revision: 1.10 $
45 */
46 public class GTransportServerChannel extends TransportServerChannelSupport implements TransportServerChannel {
47 protected static final int BACKLOG = 500;
48 private static final Log log = LogFactory.getLog(GTransportServerChannel.class);
49
50 private WireFormat wireFormat;
51 protected String bindAddressURI;
52 private SynchronizedBoolean closed;
53 private ThreadPool tp;
54 private ClockPool cp;
55 private SelectorManager sm;
56 private ServerSocketAcceptor ssa;
57 private ProtocolFactory pf;
58 private Latch startLatch;
59
60 /***
61 * Default Constructor
62 *
63 * @param bindAddr
64 * @throws JMSException
65 */
66 public GTransportServerChannel(WireFormat wireFormat, URI bindAddr, SelectorManager selectorManager, ThreadPool threadPool, ClockPool clockPool) throws Exception {
67 this.wireFormat = wireFormat;
68 this.sm = selectorManager;
69 this.tp = threadPool;
70 this.cp = clockPool;
71
72 closed = new SynchronizedBoolean(false);
73 startLatch = new Latch();
74
75
76
77
78 SocketProtocol spt = new SocketProtocol();
79 spt.setTimeout(30 * 1000);
80 spt.setSelectorManager(sm);
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101 pf = new ProtocolFactory();
102 pf.setClockPool(cp);
103 pf.setMaxAge(Long.MAX_VALUE);
104 pf.setMaxInactivity(Long.MAX_VALUE);
105
106 pf.setReclaimPeriod(10 * 1000);
107 pf.setTemplate(spt);
108
109 pf.setAcceptedCallBack(createAcceptedCallBack());
110
111 ssa = new ServerSocketAcceptor();
112 ssa.setSelectorManager(sm);
113 ssa.setTimeOut(5 * 1000);
114 ssa.setUri(bindAddr);
115 ssa.setAcceptorListener(pf);
116 }
117
118 /***
119 * @return
120 */
121 private AcceptedCallBack createAcceptedCallBack() {
122 return new AcceptedCallBack() {
123 public void accepted(AcceptableProtocol p) {
124 try {
125
126 startLatch.acquire();
127
128 if (p != null) {
129 GTransportChannel channel = new GTransportChannel(wireFormat, p, tp);
130 addClient(channel);
131 }
132
133 }
134 catch (Exception e) {
135 log.error("Caught while attempting to add new protocol: " + e, e);
136 }
137 }
138 };
139 }
140
141 /***
142 * close the ServerChannel
143 */
144 public void stop() {
145 if (closed.commit(false, true)) {
146 super.stop();
147 try {
148 ssa.drain();
149 pf.drain();
150 }
151 catch (Throwable e) {
152 log.trace("error closing GTransportServerChannel", e);
153 }
154 }
155 }
156
157 /***
158 * start listeneing for events
159 *
160 * @throws JMSException if an error occurs
161 */
162 public void start() throws JMSException {
163 super.start();
164 try {
165 ssa.startup();
166 }
167 catch (Exception e) {
168 JMSException jmsEx = new JMSException("Could not start ServerSocketAcceptor: " + e);
169 jmsEx.setLinkedException(e);
170 throw jmsEx;
171 }
172 startLatch.release();
173 }
174
175
176 /***
177 * @return pretty print of this
178 */
179 public String toString() {
180 return "GTransportServerChannel@" + bindAddressURI;
181 }
182
183 }