001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.transport.tcp; 020 021 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer; 022 import EDU.oswego.cs.dl.util.concurrent.BoundedChannel; 023 import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue; 024 import EDU.oswego.cs.dl.util.concurrent.Executor; 025 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; 026 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 027 import org.apache.commons.logging.Log; 028 import org.apache.commons.logging.LogFactory; 029 import org.activemq.io.WireFormat; 030 import org.activemq.io.WireFormatLoader; 031 import org.activemq.message.Packet; 032 import org.activemq.transport.TransportChannelSupport; 033 import org.activemq.transport.TransportStatusEvent; 034 import org.activemq.util.JMSExceptionHelper; 035 036 import javax.jms.JMSException; 037 import java.io.BufferedInputStream; 038 import java.io.DataInputStream; 039 import java.io.DataOutputStream; 040 import java.io.EOFException; 041 import java.io.IOException; 042 import java.io.InterruptedIOException; 043 import java.net.InetAddress; 044 import java.net.InetSocketAddress; 045 import java.net.Socket; 046 import java.net.SocketAddress; 047 import java.net.SocketException; 048 import java.net.SocketTimeoutException; 049 import java.net.URI; 050 import java.net.UnknownHostException; 051 052 /** 053 * A tcp implementation of a TransportChannel 054 * 055 * @version $Revision: 1.2 $ 056 */ 057 public class TcpTransportChannel extends TransportChannelSupport implements Runnable { 058 private static final int DEFAULT_SOCKET_BUFFER_SIZE = 64 * 1024; 059 private static final Log log = LogFactory.getLog(TcpTransportChannel.class); 060 protected Socket socket; 061 protected DataOutputStream dataOut; 062 protected DataInputStream dataIn; 063 064 private WireFormatLoader wireFormatLoader; 065 private SynchronizedBoolean closed; 066 private SynchronizedBoolean started; 067 private Object outboundLock; 068 private Executor executor; 069 private Thread thread; 070 private boolean useAsyncSend = false; 071 private int soTimeout = 10000; 072 private int socketBufferSize = DEFAULT_SOCKET_BUFFER_SIZE; 073 private BoundedChannel exceptionsList; 074 private TcpTransportServerChannel serverChannel; 075 076 /** 077 * Construct basic helpers 078 * 079 * @param wireFormat 080 */ 081 protected TcpTransportChannel(WireFormat wireFormat) { 082 super(wireFormat); 083 this.wireFormatLoader = new WireFormatLoader(wireFormat); 084 closed = new SynchronizedBoolean(false); 085 started = new SynchronizedBoolean(false); 086 // there's not much point logging all exceptions, lets just keep a few around 087 exceptionsList = new BoundedLinkedQueue(10); 088 outboundLock = new Object(); 089 setUseAsyncSend(useAsyncSend); 090 super.setCachingEnabled(true); 091 } 092 093 /** 094 * Connect to a remote Node - e.g. a Broker 095 * 096 * @param wireFormat 097 * @param remoteLocation 098 * @throws JMSException 099 */ 100 public TcpTransportChannel(WireFormat wireFormat, URI remoteLocation) throws JMSException { 101 this(wireFormat); 102 try { 103 this.socket = createSocket(remoteLocation); 104 initializeStreams(); 105 } 106 catch (Exception ioe) { 107 throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed. " + "URI was: " 108 + remoteLocation + " Reason: " + ioe, ioe); 109 } 110 } 111 112 /** 113 * Connect to a remote Node - e.g. a Broker 114 * 115 * @param wireFormat 116 * @param remoteLocation 117 * @param localLocation - e.g. local InetAddress and local port 118 * @throws JMSException 119 */ 120 public TcpTransportChannel(WireFormat wireFormat, URI remoteLocation, URI localLocation) throws JMSException { 121 this(wireFormat); 122 try { 123 this.socket = createSocket(remoteLocation, localLocation); 124 initializeStreams(); 125 } 126 catch (Exception ioe) { 127 throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe); 128 } 129 } 130 131 /** 132 * Initialize from a ServerSocket 133 * @param serverChannel 134 * @param wireFormat 135 * @param socket 136 * @param executor 137 * @throws JMSException 138 */ 139 public TcpTransportChannel(TcpTransportServerChannel serverChannel,WireFormat wireFormat, Socket socket, Executor executor) throws JMSException { 140 this(wireFormat); 141 this.socket = socket; 142 this.executor = executor; 143 this.serverChannel = serverChannel; 144 setServerSide(true); 145 try { 146 initialiseSocket(socket); 147 initializeStreams(); 148 } 149 catch (IOException ioe) { 150 throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe); 151 } 152 } 153 154 public TcpTransportChannel(WireFormat wireFormat, Socket socket, Executor executor) throws JMSException { 155 this(wireFormat); 156 this.socket = socket; 157 this.executor = executor; 158 try { 159 initialiseSocket(socket); 160 initializeStreams(); 161 } 162 catch (IOException ioe) { 163 throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe); 164 } 165 } 166 167 /** 168 * start listeneing for events 169 * 170 * @throws JMSException if an error occurs 171 */ 172 public void start() throws JMSException { 173 if (started.commit(false, true)) { 174 thread = new Thread(this, toString()); 175 try { 176 if (isServerSide()) { 177 thread.setDaemon(true); 178 WireFormat wf = wireFormatLoader.getWireFormat(dataIn); 179 if (wf != null) { 180 setWireFormat(wf); 181 } 182 getWireFormat().registerTransportStreams(dataOut, dataIn); 183 getWireFormat().initiateServerSideProtocol(); 184 } 185 else { 186 getWireFormat().registerTransportStreams(dataOut, dataIn); 187 thread.setPriority(Thread.NORM_PRIORITY + 2); 188 } 189 //enable caching on the wire format 190 currentWireFormat.setCachingEnabled(isCachingEnabled()); 191 thread.start(); 192 //send the wire format 193 if (!isServerSide()) { 194 getWireFormat().initiateClientSideProtocol(); 195 } 196 fireStatusEvent(new TransportStatusEvent(this,TransportStatusEvent.CONNECTED)); 197 } 198 catch (EOFException e) { 199 doClose(e); 200 } 201 catch (IOException e) { 202 JMSException jmsEx = new JMSException("start failed: " + e.getMessage()); 203 jmsEx.initCause(e); 204 jmsEx.setLinkedException(e); 205 throw jmsEx; 206 } 207 } 208 } 209 210 /** 211 * close the channel 212 */ 213 public void stop() { 214 if (closed.commit(false, true)) { 215 super.stop(); 216 try { 217 if (executor != null) { 218 stopExecutor(executor); 219 } 220 closeStreams(); 221 socket.close(); 222 } 223 catch (Exception e) { 224 log.warn("Caught while closing: " + e + ". Now Closed", e); 225 } 226 } 227 closed.set(true); 228 if (this.serverChannel != null){ 229 this.serverChannel.removeClient(this); 230 } 231 } 232 233 public void forceDisconnect() { 234 log.debug("Forcing disconnect"); 235 if (socket != null && socket.isConnected()) { 236 try { 237 socket.close(); 238 } 239 catch (IOException e) { 240 // Ignore 241 } 242 } 243 } 244 245 /** 246 * Asynchronously send a Packet 247 * 248 * @param packet 249 * @throws JMSException 250 */ 251 public void asyncSend(final Packet packet) throws JMSException { 252 if (executor != null) { 253 try { 254 executor.execute(new Runnable() { 255 public void run() { 256 try { 257 if (!isClosed()) { 258 doAsyncSend(packet); 259 } 260 } 261 catch (JMSException e) { 262 try { 263 exceptionsList.put(e); 264 } 265 catch (InterruptedException e1) { 266 log.warn("Failed to add element to exception list: " + e1); 267 } 268 } 269 } 270 }); 271 } 272 catch (InterruptedException e) { 273 log.info("Caught: " + e, e); 274 } 275 try { 276 JMSException e = (JMSException) exceptionsList.poll(0); 277 if (e != null) { 278 throw e; 279 } 280 } 281 catch (InterruptedException e1) { 282 log.warn("Failed to remove element to exception list: " + e1); 283 } 284 } 285 else { 286 doAsyncSend(packet); 287 } 288 } 289 290 /** 291 * @return false 292 */ 293 public boolean isMulticast() { 294 return false; 295 } 296 297 /** 298 * reads packets from a Socket 299 */ 300 public void run() { 301 log.trace("TCP consumer thread starting"); 302 int count = 0; 303 while (!isClosed()) { 304 if (isServerSide() && ++count > 500) { 305 count = 0; 306 Thread.yield(); 307 } 308 try { 309 Packet packet = getWireFormat().readPacket(dataIn); 310 if (packet != null) { 311 doConsumePacket(packet); 312 } 313 } 314 catch (SocketTimeoutException e) { 315 //onAsyncException(JMSExceptionHelper.newJMSException(e)); 316 } 317 catch (InterruptedIOException e) { 318 // TODO confirm that this really is a bug in the AS/400 JVM 319 // Patch for AS/400 JVM 320 // lets ignore these exceptions 321 // as they typically just indicate the thread was interupted 322 // while waiting for input, not that the socket is in error 323 //onAsyncException(JMSExceptionHelper.newJMSException(e)); 324 } 325 catch (IOException e) { 326 doClose(e); 327 } 328 } 329 } 330 331 public boolean isClosed() { 332 return closed.get(); 333 } 334 335 /** 336 * pretty print for object 337 * 338 * @return String representation of this object 339 */ 340 public String toString() { 341 return "TcpTransportChannel: " + socket; 342 } 343 344 /** 345 * @return the socket used by the TcpTransportChannel 346 */ 347 public Socket getSocket() { 348 return socket; 349 } 350 351 /** 352 * Can this wireformat process packets of this version 353 * 354 * @param version the version number to test 355 * @return true if can accept the version 356 */ 357 public boolean canProcessWireFormatVersion(int version) { 358 return getWireFormat().canProcessWireFormatVersion(version); 359 } 360 361 /** 362 * @return the current version of this wire format 363 */ 364 public int getCurrentWireFormatVersion() { 365 return getWireFormat().getCurrentWireFormatVersion(); 366 } 367 368 // Properties 369 //------------------------------------------------------------------------- 370 371 /** 372 * @return true if packets are enqueued to a separate queue before dispatching 373 */ 374 public boolean isUseAsyncSend() { 375 return useAsyncSend; 376 } 377 378 /** 379 * set the useAsync flag 380 * 381 * @param useAsyncSend 382 */ 383 public void setUseAsyncSend(boolean useAsyncSend) { 384 this.useAsyncSend = useAsyncSend; 385 try { 386 if (useAsyncSend && executor==null ) { 387 PooledExecutor pe = new PooledExecutor(new BoundedBuffer(10), 1); 388 pe.waitWhenBlocked(); 389 pe.setKeepAliveTime(1000); 390 executor = pe; 391 } 392 else if (!useAsyncSend && executor != null) { 393 stopExecutor(executor); 394 } 395 } 396 catch (Exception e) { 397 log.warn("problem closing executor", e); 398 } 399 } 400 401 402 403 /** 404 * @return the current so timeout used on the socket 405 */ 406 public int getSoTimeout() { 407 return soTimeout; 408 } 409 410 /** 411 * set the socket so timeout 412 * 413 * @param soTimeout 414 * @throws JMSException 415 */ 416 public void setSoTimeout(int soTimeout) throws JMSException { 417 this.soTimeout = soTimeout; 418 if (this.socket != null){ 419 try { 420 socket.setSoTimeout(soTimeout); 421 } 422 catch (SocketException e) { 423 JMSException jmsEx = new JMSException("Failed to set soTimeout: ", e.getMessage()); 424 jmsEx.setLinkedException(e); 425 throw jmsEx; 426 } 427 } 428 } 429 430 /** 431 * @param noDelay The noDelay to set. 432 */ 433 public void setNoDelay(boolean noDelay) { 434 super.setNoDelay(noDelay); 435 if (socket != null){ 436 try { 437 socket.setTcpNoDelay(noDelay); 438 } 439 catch (SocketException e) { 440 log.warn("failed to set noDelay on the socket");//should never happen 441 } 442 } 443 } 444 445 /** 446 * @return Returns the socketBufferSize. 447 */ 448 public int getSocketBufferSize() { 449 return socketBufferSize; 450 } 451 /** 452 * @param socketBufferSize The socketBufferSize to set. 453 */ 454 public void setSocketBufferSize(int socketBufferSize) { 455 this.socketBufferSize = socketBufferSize; 456 } 457 // Implementation methods 458 //------------------------------------------------------------------------- 459 /** 460 * Actually performs the async send of a packet 461 * 462 * @param packet 463 * @return a response or null 464 * @throws JMSException 465 */ 466 protected Packet doAsyncSend(Packet packet) throws JMSException { 467 Packet response = null; 468 try { 469 synchronized (outboundLock) { 470 response = getWireFormat().writePacket(packet, dataOut); 471 dataOut.flush(); 472 } 473 } 474 catch (IOException e) { 475 // if (closed.get()) { 476 // log.trace("Caught exception while closed: " + e, e); 477 // } 478 // else { 479 JMSException exception = JMSExceptionHelper.newJMSException("asyncSend failed: " + e, e); 480 onAsyncException(exception); 481 throw exception; 482 // } 483 } 484 catch (JMSException e) { 485 if (isClosed()) { 486 log.trace("Caught exception while closed: " + e, e); 487 } 488 else { 489 throw e; 490 } 491 } 492 return response; 493 } 494 495 protected void doClose(Exception ex) { 496 if (!isClosed()) { 497 if (!pendingStop) { 498 setPendingStop(true); 499 setTransportConnected(false); 500 if (ex instanceof EOFException) { 501 if (!isServerSide() && !isUsedInternally()){ 502 log.warn("Peer closed connection", ex); 503 } 504 fireStatusEvent(new TransportStatusEvent(this,TransportStatusEvent.DISCONNECTED)); 505 onAsyncException(JMSExceptionHelper.newJMSException("Error reading socket: " + ex, ex)); 506 } 507 else { 508 fireStatusEvent(new TransportStatusEvent(this,TransportStatusEvent.DISCONNECTED)); 509 onAsyncException(JMSExceptionHelper.newJMSException("Error reading socket: " + ex, ex)); 510 } 511 } 512 stop(); 513 } 514 } 515 516 /** 517 * Configures the socket for use 518 * @param sock 519 * @throws SocketException 520 */ 521 protected void initialiseSocket(Socket sock) throws SocketException { 522 try { 523 sock.setReceiveBufferSize(socketBufferSize); 524 sock.setSendBufferSize(socketBufferSize); 525 } 526 catch (SocketException se) { 527 log.debug("Cannot set socket buffer size = " + socketBufferSize, se); 528 } 529 sock.setSoTimeout(soTimeout); 530 sock.setTcpNoDelay(isNoDelay()); 531 } 532 533 protected void initializeStreams() throws IOException{ 534 BufferedInputStream buffIn = new BufferedInputStream(socket.getInputStream(),8192); 535 this.dataIn = new DataInputStream(buffIn); 536 TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(),8192); 537 this.dataOut = new DataOutputStream(buffOut); 538 } 539 540 protected void closeStreams() throws IOException { 541 if (dataOut != null) { 542 dataOut.close(); 543 } 544 if (dataIn != null) { 545 dataIn.close(); 546 } 547 } 548 549 /** 550 * Factory method to create a new socket 551 * 552 * @param remoteLocation the URI to connect to 553 * @return the newly created socket 554 * @throws UnknownHostException 555 * @throws IOException 556 */ 557 protected Socket createSocket(URI remoteLocation) throws UnknownHostException, IOException { 558 SocketAddress sockAddress = new InetSocketAddress(remoteLocation.getHost(), remoteLocation.getPort()); 559 Socket sock = new Socket(); 560 initialiseSocket(sock); 561 sock.connect(sockAddress); 562 return sock; 563 } 564 565 /** 566 * Factory method to create a new socket 567 * 568 * @param remoteLocation 569 * @param localLocation 570 * @return @throws IOException 571 * @throws IOException 572 * @throws UnknownHostException 573 */ 574 protected Socket createSocket(URI remoteLocation, URI localLocation) throws IOException, UnknownHostException { 575 SocketAddress sockAddress = new InetSocketAddress(remoteLocation.getHost(), remoteLocation.getPort()); 576 SocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort()); 577 Socket sock = new Socket(); 578 initialiseSocket(sock); 579 sock.bind(localAddress); 580 sock.connect(sockAddress); 581 return sock; 582 } 583 584 }