001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.service;
020    import javax.jms.JMSException;
021    import org.apache.commons.logging.*;
022    import org.activemq.broker.Broker;
023    import org.activemq.message.ActiveMQDestination;
024    import org.activemq.message.ActiveMQMessage;
025    
026    /**
027     * Determines how messages are stored in a dead letter queue
028     * 
029     * @version $Revision: 1.1.1.1 $
030     */
031    public class DeadLetterPolicy {
032        /**
033         * Prefix used by dead letter queues
034         */
035        public static final String DEAD_LETTER_PREFIX = "org.activemq.deadletter.";
036        private static final String DEFAULT_DEAD_LETTER_NAME = "DLQ";
037        private static final Log log = LogFactory.getLog(DeadLetterPolicy.class);
038        private Broker broker;
039        private String deadLetterPrefix = DEAD_LETTER_PREFIX;
040        private String deadLetterName = DEFAULT_DEAD_LETTER_NAME;
041        private boolean deadLetterEnabled = true;
042        private boolean deadLetterPerDestinationName = true;
043        private boolean storeNonPersistentMessages = true;
044        private boolean noTopicConsumerEnabled = true;
045    
046       
047        /**
048         * Construct a dead letter policy
049         * 
050         * @param broker
051         */
052        public DeadLetterPolicy(Broker broker) {
053            this.broker = broker;
054        }
055    
056        /**
057         * Default constructor
058         */
059        public DeadLetterPolicy() {
060        }
061    
062        /**
063         * @return Returns the broker.
064         */
065        public Broker getBroker() {
066            return broker;
067        }
068    
069        /**
070         * @param broker The broker to set.
071         */
072        public void setBroker(Broker broker) {
073            this.broker = broker;
074        }
075    
076        /**
077         * @return Returns the deadLetterEnabled.
078         */
079        public boolean isDeadLetterEnabled() {
080            return deadLetterEnabled;
081        }
082    
083        /**
084         * @param deadLetterEnabled The deadLetterEnabled to set.
085         */
086        public void setDeadLetterEnabled(boolean deadLetterEnabled) {
087            this.deadLetterEnabled = deadLetterEnabled;
088        }
089    
090        /**
091         * @return Returns the deadLetterPerDestinationName.
092         */
093        public boolean isDeadLetterPerDestinationName() {
094            return deadLetterPerDestinationName;
095        }
096    
097        /**
098         * @param deadLetterPerDestinationName The deadLetterPerDestinationName to set.
099         */
100        public void setDeadLetterPerDestinationName(boolean deadLetterPerDestinationName) {
101            this.deadLetterPerDestinationName = deadLetterPerDestinationName;
102        }
103    
104        /**
105         * @return Returns the deadLetterName.
106         */
107        public String getDeadLetterName() {
108            return deadLetterName;
109        }
110    
111        /**
112         * @param deadLetterName The deadLetterName to set.
113         */
114        public void setDeadLetterName(String deadLetterName) {
115            this.deadLetterName = deadLetterName;
116        }
117    
118        /**
119         * @return Returns the deadLetterPrefix.
120         */
121        public String getDeadLetterPrefix() {
122            return deadLetterPrefix;
123        }
124    
125        /**
126         * @param deadLetterPrefix The deadLetterPrefix to set.
127         */
128        public void setDeadLetterPrefix(String deadLetterPrefix) {
129            this.deadLetterPrefix = deadLetterPrefix;
130        }
131    
132        /**
133         * @return Returns the storeNonPersistentMessages.
134         */
135        public boolean isStoreNonPersistentMessages() {
136            return storeNonPersistentMessages;
137        }
138    
139        /**
140         * @param storeNonPersistentMessages The storeNonPersistentMessages to set.
141         */
142        public void setStoreNonPersistentMessages(boolean storeNonPersistentMessages) {
143            this.storeNonPersistentMessages = storeNonPersistentMessages;
144        }
145        
146        /**
147         * @return Returns the noTopicConsumerEnabled.
148         */
149        public boolean isNoTopicConsumerEnabled() {
150            return noTopicConsumerEnabled;
151        }
152        /**
153         * @param noTopicConsumerEnabled The noTopicConsumerEnabled to set.
154         */
155        public void setNoTopicConsumerEnabled(boolean noTopicConsumerEnabled) {
156            this.noTopicConsumerEnabled = noTopicConsumerEnabled;
157        }
158        
159        /**
160         * Get the name of the DLQ from the destination provided
161         * @param destination
162         * @return the name of the DLQ for this Destination
163         */
164        public String getDeadLetterNameFromDestination(ActiveMQDestination destination){
165            String deadLetterName = deadLetterPrefix;
166            if (deadLetterPerDestinationName) {
167                deadLetterName += destination.getPhysicalName();
168            }
169            else {
170                deadLetterName += deadLetterName;
171            }
172            return deadLetterName;
173        }
174    
175        /**
176         * Send a message to a dead letter queue
177         * 
178         * @param message
179         * @throws JMSException
180         */
181        public void sendToDeadLetter(ActiveMQMessage message) throws JMSException {
182            if (deadLetterEnabled && message != null && (message.isPersistent() || storeNonPersistentMessages)) {
183                if (broker != null) {
184                    String dlqName = getDeadLetterNameFromDestination(message.getJMSActiveMQDestination());
185                    message.setDispatchedFromDLQ(true);
186                    broker.sendToDeadLetterQueue(dlqName, message);
187                    if (log.isDebugEnabled()) log.debug("Passed message: " + message + " to DLQ: " + dlqName);
188                }
189                else {
190                    log.warn("Broker is not initialized - cannot add to DLQ: " + message);
191                }
192            }else if (log.isDebugEnabled()){
193                log.debug("DLQ not storing message: " + message);
194            }
195        }
196    }