View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq;
19  
20  import org.codehaus.activemq.management.JMSProducerStatsImpl;
21  import org.codehaus.activemq.management.StatsCapable;
22  import org.codehaus.activemq.message.ActiveMQDestination;
23  import org.codehaus.activemq.util.IdGenerator;
24  
25  import javax.jms.DeliveryMode;
26  import javax.jms.Destination;
27  import javax.jms.IllegalStateException;
28  import javax.jms.InvalidDestinationException;
29  import javax.jms.JMSException;
30  import javax.jms.Message;
31  import javax.jms.MessageFormatException;
32  import javax.jms.MessageProducer;
33  import javax.management.j2ee.statistics.Stats;
34  
35  /***
36   * A client uses a <CODE>MessageProducer</CODE> object to send messages to a
37   * destination. A <CODE>MessageProducer</CODE> object is created by passing a
38   * <CODE>Destination</CODE> object to a message-producer creation method
39   * supplied by a session.
40   * <P>
41   * <CODE>MessageProducer</CODE> is the parent interface for all message
42   * producers.
43   * <P>
44   * A client also has the option of creating a message producer without
45   * supplying a destination. In this case, a destination must be provided with
46   * every send operation. A typical use for this kind of message producer is to
47   * send replies to requests using the request's <CODE>JMSReplyTo</CODE>
48   * destination.
49   * <P>
50   * A client can specify a default delivery mode, priority, and time to live for
51   * messages sent by a message producer. It can also specify the delivery mode,
52   * priority, and time to live for an individual message.
53   * <P>
54   * A client can specify a time-to-live value in milliseconds for each message
55   * it sends. This value defines a message expiration time that is the sum of
56   * the message's time-to-live and the GMT when it is sent (for transacted
57   * sends, this is the time the client sends the message, not the time the
58   * transaction is committed).
59   * <P>
60   * A JMS provider should do its best to expire messages accurately; however,
61   * the JMS API does not define the accuracy provided.
62   *
63   * @version $Revision: 1.18 $
64   * @see javax.jms.TopicPublisher
65   * @see javax.jms.QueueSender
66   * @see javax.jms.Session#createProducer
67   */
68  public class ActiveMQMessageProducer implements MessageProducer, StatsCapable {
69      protected ActiveMQSession session;
70      protected String producerId;
71      private IdGenerator idGenerator;
72      protected boolean closed;
73      private boolean disableMessageID;
74      private boolean disableMessageTimestamp;
75      private int defaultDeliveryMode;
76      private int defaultPriority;
77      private long defaultTimeToLive;
78      protected ActiveMQDestination defaultDestination;
79      private long startTime;
80      private JMSProducerStatsImpl stats;
81  
82      protected ActiveMQMessageProducer(ActiveMQSession theSession, ActiveMQDestination destination) throws JMSException {
83          this.session = theSession;
84          this.defaultDestination = destination;
85          this.idGenerator = new IdGenerator();
86          this.disableMessageID = false;
87          this.disableMessageTimestamp = false;
88          this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
89          this.defaultPriority = Message.DEFAULT_PRIORITY;
90          this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE;
91          this.startTime = System.currentTimeMillis();
92          this.session.addProducer(this);
93          this.stats = new JMSProducerStatsImpl(theSession.getSessionStats(), destination);
94      }
95  
96      public Stats getStats() {
97          return stats;
98      }
99  
100     public JMSProducerStatsImpl getProducerStats() {
101         return stats;
102     }
103 
104     /***
105      * Sets whether message IDs are disabled.
106      * <P>
107      * Since message IDs take some effort to create and increase a message's
108      * size, some JMS providers may be able to optimize message overhead if
109      * they are given a hint that the message ID is not used by an application.
110      * By calling the <CODE>setDisableMessageID</CODE> method on this message
111      * producer, a JMS client enables this potential optimization for all
112      * messages sent by this message producer. If the JMS provider accepts this
113      * hint, these messages must have the message ID set to null; if the
114      * provider ignores the hint, the message ID must be set to its normal
115      * unique value.
116      * <P>
117      * Message IDs are enabled by default.
118      *
119      * @param value indicates if message IDs are disabled
120      * @throws JMSException if the JMS provider fails to close the producer due to
121      *                      some internal error.
122      */
123     public void setDisableMessageID(boolean value) throws JMSException {
124         checkClosed();
125         this.disableMessageID = value;
126     }
127 
128     /***
129      * Gets an indication of whether message IDs are disabled.
130      *
131      * @return an indication of whether message IDs are disabled
132      * @throws JMSException if the JMS provider fails to determine if message IDs are
133      *                      disabled due to some internal error.
134      */
135     public boolean getDisableMessageID() throws JMSException {
136         checkClosed();
137         return this.disableMessageID;
138     }
139 
140     /***
141      * Sets whether message timestamps are disabled.
142      * <P>
143      * Since timestamps take some effort to create and increase a message's
144      * size, some JMS providers may be able to optimize message overhead if
145      * they are given a hint that the timestamp is not used by an application.
146      * By calling the <CODE>setDisableMessageTimestamp</CODE> method on this
147      * message producer, a JMS client enables this potential optimization for
148      * all messages sent by this message producer. If the JMS provider accepts
149      * this hint, these messages must have the timestamp set to zero; if the
150      * provider ignores the hint, the timestamp must be set to its normal
151      * value.
152      * <P>
153      * Message timestamps are enabled by default.
154      *
155      * @param value indicates if message timestamps are disabled
156      * @throws JMSException if the JMS provider fails to close the producer due to
157      *                      some internal error.
158      */
159     public void setDisableMessageTimestamp(boolean value) throws JMSException {
160         checkClosed();
161         this.disableMessageTimestamp = value;
162     }
163 
164     /***
165      * Gets an indication of whether message timestamps are disabled.
166      *
167      * @return an indication of whether message timestamps are disabled
168      * @throws JMSException if the JMS provider fails to close the producer due to
169      *                      some internal error.
170      */
171     public boolean getDisableMessageTimestamp() throws JMSException {
172         checkClosed();
173         return this.disableMessageTimestamp;
174     }
175 
176     /***
177      * Sets the producer's default delivery mode.
178      * <P>
179      * Delivery mode is set to <CODE>PERSISTENT</CODE> by default.
180      *
181      * @param newDeliveryMode the message delivery mode for this message producer; legal
182      *                        values are <code>DeliveryMode.NON_PERSISTENT</code> and
183      *                        <code>DeliveryMode.PERSISTENT</code>
184      * @throws JMSException if the JMS provider fails to set the delivery mode due to
185      *                      some internal error.
186      * @see javax.jms.MessageProducer#getDeliveryMode
187      * @see javax.jms.DeliveryMode#NON_PERSISTENT
188      * @see javax.jms.DeliveryMode#PERSISTENT
189      * @see javax.jms.Message#DEFAULT_DELIVERY_MODE
190      */
191     public void setDeliveryMode(int newDeliveryMode) throws JMSException {
192         if (newDeliveryMode != DeliveryMode.PERSISTENT && newDeliveryMode != DeliveryMode.NON_PERSISTENT) {
193             throw new IllegalStateException("unkown delivery mode: " + newDeliveryMode);
194         }
195         checkClosed();
196         this.defaultDeliveryMode = newDeliveryMode;
197     }
198 
199     /***
200      * Gets the producer's default delivery mode.
201      *
202      * @return the message delivery mode for this message producer
203      * @throws JMSException if the JMS provider fails to close the producer due to
204      *                      some internal error.
205      */
206     public int getDeliveryMode() throws JMSException {
207         checkClosed();
208         return this.defaultDeliveryMode;
209     }
210 
211     /***
212      * Sets the producer's default priority.
213      * <P>
214      * The JMS API defines ten levels of priority value, with 0 as the lowest
215      * priority and 9 as the highest. Clients should consider priorities 0-4 as
216      * gradations of normal priority and priorities 5-9 as gradations of
217      * expedited priority. Priority is set to 4 by default.
218      *
219      * @param newDefaultPriority the message priority for this message producer; must be a
220      *                           value between 0 and 9
221      * @throws JMSException if the JMS provider fails to set the delivery mode due to
222      *                      some internal error.
223      * @see javax.jms.MessageProducer#getPriority
224      * @see javax.jms.Message#DEFAULT_PRIORITY
225      */
226     public void setPriority(int newDefaultPriority) throws JMSException {
227         if (newDefaultPriority < 0 || newDefaultPriority > 9) {
228             throw new IllegalStateException("default priority must be a value between 0 and 9");
229         }
230         checkClosed();
231         this.defaultPriority = newDefaultPriority;
232     }
233 
234     /***
235      * Gets the producer's default priority.
236      *
237      * @return the message priority for this message producer
238      * @throws JMSException if the JMS provider fails to close the producer due to
239      *                      some internal error.
240      * @see javax.jms.MessageProducer#setPriority
241      */
242     public int getPriority() throws JMSException {
243         checkClosed();
244         return this.defaultPriority;
245     }
246 
247     /***
248      * Sets the default length of time in milliseconds from its dispatch time
249      * that a produced message should be retained by the message system.
250      * <P>
251      * Time to live is set to zero by default.
252      *
253      * @param timeToLive the message time to live in milliseconds; zero is unlimited
254      * @throws JMSException if the JMS provider fails to set the time to live due to
255      *                      some internal error.
256      * @see javax.jms.MessageProducer#getTimeToLive
257      * @see javax.jms.Message#DEFAULT_TIME_TO_LIVE
258      */
259     public void setTimeToLive(long timeToLive) throws JMSException {
260         if (timeToLive < 0l) {
261             throw new IllegalStateException("cannot set a negative timeToLive");
262         }
263         checkClosed();
264         this.defaultTimeToLive = timeToLive;
265     }
266 
267     /***
268      * Gets the default length of time in milliseconds from its dispatch time
269      * that a produced message should be retained by the message system.
270      *
271      * @return the message time to live in milliseconds; zero is unlimited
272      * @throws JMSException if the JMS provider fails to get the time to live due to
273      *                      some internal error.
274      * @see javax.jms.MessageProducer#setTimeToLive
275      */
276     public long getTimeToLive() throws JMSException {
277         checkClosed();
278         return this.defaultTimeToLive;
279     }
280 
281     /***
282      * Gets the destination associated with this <CODE>MessageProducer</CODE>.
283      *
284      * @return this producer's <CODE>Destination/ <CODE>
285      * @throws JMSException if the JMS provider fails to close the producer due to
286      *                      some internal error.
287      * @since 1.1
288      */
289     public Destination getDestination() throws JMSException {
290         checkClosed();
291         return this.defaultDestination;
292     }
293 
294     /***
295      * Closes the message producer.
296      * <P>
297      * Since a provider may allocate some resources on behalf of a <CODE>
298      * MessageProducer</CODE> outside the Java virtual machine, clients should
299      * close them when they are not needed. Relying on garbage collection to
300      * eventually reclaim these resources may not be timely enough.
301      *
302      * @throws JMSException if the JMS provider fails to close the producer due to
303      *                      some internal error.
304      */
305     public void close() throws JMSException {
306         this.session.removeProducer(this);
307         closed = true;
308     }
309 
310     protected void checkClosed() throws IllegalStateException {
311         if (closed) {
312             throw new IllegalStateException("The producer is closed");
313         }
314     }
315 
316     /***
317      * Sends a message using the <CODE>MessageProducer</CODE>'s default
318      * delivery mode, priority, and time to live.
319      *
320      * @param message the message to send
321      * @throws JMSException                if the JMS provider fails to send the message due to some
322      *                                     internal error.
323      * @throws MessageFormatException      if an invalid message is specified.
324      * @throws InvalidDestinationException if a client uses this method with a <CODE>
325      *                                     MessageProducer</CODE> with an invalid destination.
326      * @throws java.lang.UnsupportedOperationException
327      *                                     if a client uses this method with a <CODE>
328      *                                     MessageProducer</CODE> that did not specify a
329      *                                     destination at creation time.
330      * @see javax.jms.Session#createProducer
331      * @see javax.jms.MessageProducer
332      * @since 1.1
333      */
334     public void send(Message message) throws JMSException {
335         this.send(this.defaultDestination, message, this.defaultDeliveryMode, this.defaultPriority,
336                 this.defaultTimeToLive);
337     }
338 
339     /***
340      * Sends a message to the destination, specifying delivery mode, priority,
341      * and time to live.
342      *
343      * @param message      the message to send
344      * @param deliveryMode the delivery mode to use
345      * @param priority     the priority for this message
346      * @param timeToLive   the message's lifetime (in milliseconds)
347      * @throws JMSException                if the JMS provider fails to send the message due to some
348      *                                     internal error.
349      * @throws MessageFormatException      if an invalid message is specified.
350      * @throws InvalidDestinationException if a client uses this method with a <CODE>
351      *                                     MessageProducer</CODE> with an invalid destination.
352      * @throws java.lang.UnsupportedOperationException
353      *                                     if a client uses this method with a <CODE>
354      *                                     MessageProducer</CODE> that did not specify a
355      *                                     destination at creation time.
356      * @see javax.jms.Session#createProducer
357      * @since 1.1
358      */
359     public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
360         this.send(this.defaultDestination, message, deliveryMode, priority, timeToLive);
361     }
362 
363     /***
364      * Sends a message to a destination for an unidentified message producer.
365      * Uses the <CODE>MessageProducer</CODE>'s default delivery mode,
366      * priority, and time to live.
367      * <P>
368      * Typically, a message producer is assigned a destination at creation
369      * time; however, the JMS API also supports unidentified message producers,
370      * which require that the destination be supplied every time a message is
371      * sent.
372      *
373      * @param destination the destination to send this message to
374      * @param message     the message to send
375      * @throws JMSException                if the JMS provider fails to send the message due to some
376      *                                     internal error.
377      * @throws MessageFormatException      if an invalid message is specified.
378      * @throws InvalidDestinationException if a client uses this method with an invalid destination.
379      * @throws java.lang.UnsupportedOperationException
380      *                                     if a client uses this method with a <CODE>
381      *                                     MessageProducer</CODE> that specified a destination at
382      *                                     creation time.
383      * @see javax.jms.Session#createProducer
384      * @see javax.jms.MessageProducer
385      * @since 1.1
386      */
387     public void send(Destination destination, Message message) throws JMSException {
388         this.send(destination, message, this.defaultDeliveryMode, this.defaultPriority, this.defaultTimeToLive);
389     }
390 
391     /***
392      * Sends a message to a destination for an unidentified message producer,
393      * specifying delivery mode, priority and time to live.
394      * <P>
395      * Typically, a message producer is assigned a destination at creation
396      * time; however, the JMS API also supports unidentified message producers,
397      * which require that the destination be supplied every time a message is
398      * sent.
399      *
400      * @param destination  the destination to send this message to
401      * @param message      the message to send
402      * @param deliveryMode the delivery mode to use
403      * @param priority     the priority for this message
404      * @param timeToLive   the message's lifetime (in milliseconds)
405      * @throws JMSException                if the JMS provider fails to send the message due to some
406      *                                     internal error.
407      * @throws MessageFormatException      if an invalid message is specified.
408      * @throws InvalidDestinationException if a client uses this method with an invalid destination.
409      * @see javax.jms.Session#createProducer
410      * @since 1.1
411      */
412     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
413             throws JMSException {
414         checkClosed();
415         if (destination == null) {
416             throw new InvalidDestinationException("Dont understand null destinations");
417         }
418         this.session.send(this, destination, message, deliveryMode, priority, timeToLive);
419         stats.onMessage(message);
420     }
421 
422 
423     /***
424      * @return Returns the producerId.
425      */
426     protected String getProducerId() {
427         return producerId;
428     }
429 
430     /***
431      * @param producerId The producerId to set.
432      */
433     protected void setProducerId(String producerId) {
434         this.producerId = producerId;
435     }
436 
437     protected long getStartTime() {
438         return this.startTime;
439     }
440 
441     protected IdGenerator getIdGenerator() {
442         return this.idGenerator;
443     }
444 
445 }