1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.tool;
19  
20  import javax.jms.JMSException;
21  import javax.jms.Message;
22  import javax.jms.MessageConsumer;
23  import javax.jms.MessageListener;
24  import javax.jms.TextMessage;
25  import javax.jms.Topic;
26  import java.io.IOException;
27  
28  /***
29   * A simple tool for consuming messages
30   *
31   * @version $Revision: 1.7 $
32   */
33  public class ConsumerTool extends ToolSupport implements MessageListener {
34  
35      protected int count = 0;
36      protected int dumpCount = 10;
37      protected boolean verbose = true;
38      protected int maxiumMessages = 0;
39      private MessageConsumer consumer;
40      private boolean pauseBeforeShutdown;
41  
42  
43      public static void main(String[] args) {
44          ConsumerTool tool = new ConsumerTool();
45          if (args.length > 0) {
46              tool.url = args[0];
47          }
48          if (args.length > 1) {
49              tool.topic = args[1].equalsIgnoreCase("true");
50          }
51          if (args.length > 2) {
52              tool.subject = args[2];
53          }
54          if (args.length > 3) {
55              tool.durable = args[3].equalsIgnoreCase("true");
56          }
57          if (args.length > 4) {
58              tool.maxiumMessages = Integer.parseInt(args[4]);
59          }
60          tool.run();
61      }
62  
63      public void run() {
64          try {
65              System.out.println("Connecting to URL: " + url);
66              System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
67              System.out.println("Using " + (durable ? "durable" : "non-durable") + " subscription");
68  
69              createSession();
70              if (durable && topic) {
71                  consumer = session.createDurableSubscriber((Topic) destination, consumerName);
72              }
73              else {
74                  consumer = session.createConsumer(destination);
75              }
76              if (maxiumMessages <= 0) {
77                  consumer.setMessageListener(this);
78              }
79              connection.start();
80  
81              if (maxiumMessages > 0) {
82                  consumeMessagesAndClose();
83              }
84          }
85          catch (Exception e) {
86              System.out.println("Caught: " + e);
87              e.printStackTrace();
88          }
89      }
90  
91      public void onMessage(Message message) {
92          try {
93              if (message instanceof TextMessage) {
94                  TextMessage txtMsg = (TextMessage) message;
95                  if (verbose) {
96                      System.out.println("Received: " + txtMsg.getText());
97                  }
98              }
99              else {
100                 if (verbose) {
101                     System.out.println("Received: " + message);
102                 }
103             }
104             if (++count % dumpCount == 0) {
105                 dumpStats();
106             }
107         }
108         catch (JMSException e) {
109             System.out.println("Caught: " + e);
110             e.printStackTrace();
111         }
112     }
113 
114 
115     protected void consumeMessagesAndClose() throws JMSException, IOException {
116         System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown");
117 
118         for (int i = 0; i < maxiumMessages; i++) {
119             Message message = consumer.receive();
120             onMessage(message);
121         }
122         System.out.println("Closing connection");
123         consumer.close();
124         session.close();
125         connection.close();
126         if (pauseBeforeShutdown) {
127             System.out.println("Press return to shut down");
128             System.in.read();
129         }
130     }
131 }