001    /** 
002     * 
003     * Copyright 2004 Hiram Chirino
004     * Copyright 2004 Protique Ltd
005     * 
006     * Licensed under the Apache License, Version 2.0 (the "License"); 
007     * you may not use this file except in compliance with the License. 
008     * You may obtain a copy of the License at 
009     * 
010     * http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS, 
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
015     * See the License for the specific language governing permissions and 
016     * limitations under the License. 
017     * 
018     **/
019    package org.activemq.store.journal;
020    
021    import java.io.DataInputStream;
022    import java.io.DataOutputStream;
023    import java.io.File;
024    import java.io.IOException;
025    import java.util.ArrayList;
026    import java.util.Iterator;
027    import java.util.Map;
028    
029    import javax.jms.JMSException;
030    import javax.transaction.xa.XAException;
031    
032    import org.activeio.adapter.PacketByteArrayOutputStream;
033    import org.activeio.adapter.PacketInputStream;
034    import org.activeio.journal.InvalidRecordLocationException;
035    import org.activeio.journal.Journal;
036    import org.activeio.journal.JournalEventListener;
037    import org.activeio.journal.RecordLocation;
038    import org.activeio.journal.active.JournalImpl;
039    import org.activeio.journal.howl.HowlJournal;
040    import org.activemq.io.WireFormat;
041    import org.activemq.io.impl.StatelessDefaultWireFormat;
042    import org.activemq.message.ActiveMQMessage;
043    import org.activemq.message.ActiveMQXid;
044    import org.activemq.message.MessageAck;
045    import org.activemq.message.Packet;
046    import org.activemq.service.MessageIdentity;
047    import org.activemq.store.MessageStore;
048    import org.activemq.store.PersistenceAdapter;
049    import org.activemq.store.TopicMessageStore;
050    import org.activemq.store.TransactionStore;
051    import org.activemq.store.jdbc.JDBCPersistenceAdapter;
052    import org.activemq.store.journal.JournalTransactionStore.Tx;
053    import org.activemq.store.journal.JournalTransactionStore.TxOperation;
054    import org.activemq.util.JMSExceptionHelper;
055    import org.apache.commons.logging.Log;
056    import org.apache.commons.logging.LogFactory;
057    import org.objectweb.howl.log.Configuration;
058    
059    import EDU.oswego.cs.dl.util.concurrent.Channel;
060    import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
061    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
062    import EDU.oswego.cs.dl.util.concurrent.Latch;
063    import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
064    import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
065    import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
066    
067    /**
068     * An implementation of {@link PersistenceAdapter} designed for
069     * use with a {@link Journal} and then checkpointing asynchronously
070     * on a timeout with some other long term persistent storage.
071     *
072     * @version $Revision: 1.1 $
073     */
074    public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener {
075    
076        private static final Log log = LogFactory.getLog(JournalPersistenceAdapter.class);
077        public static final String DEFAULT_JOURNAL_TYPE = "default";
078        public static final String HOWL_JOURNAL_TYPE = "howl";
079        
080        private Journal journal;
081        private String journalType = DEFAULT_JOURNAL_TYPE;
082        private PersistenceAdapter longTermPersistence;
083        private File directory = new File("logs");
084        private final StatelessDefaultWireFormat wireFormat = new StatelessDefaultWireFormat();
085        private final ConcurrentHashMap messageStores = new ConcurrentHashMap();
086        private final ConcurrentHashMap topicMessageStores = new ConcurrentHashMap();
087        
088        private static final int PACKET_RECORD_TYPE = 0;
089        private static final int COMMAND_RECORD_TYPE = 1;
090        private static final int TX_COMMAND_RECORD_TYPE = 2;
091        private static final int ACK_RECORD_TYPE = 3;
092    
093        private Channel checkpointRequests = new LinkedQueue();
094        private QueuedExecutor checkpointExecutor;
095        ClockDaemon clockDaemon;
096        private Object clockTicket;
097        private JournalTransactionStore transactionStore = new JournalTransactionStore(this);
098        private int logFileSize=1024*1024*20;
099        private int logFileCount=2;
100        private long checkpointInterval = 1000 * 60 * 5;
101    
102        public JournalPersistenceAdapter() {
103            checkpointExecutor = new QueuedExecutor(new LinkedQueue());
104            checkpointExecutor.setThreadFactory(new ThreadFactory() {
105                public Thread newThread(Runnable runnable) {
106                    Thread answer = new Thread(runnable, "Checkpoint Worker");
107                    answer.setDaemon(true);
108                    answer.setPriority(Thread.MAX_PRIORITY);
109                    return answer;
110                }
111            });
112        }
113    
114        public JournalPersistenceAdapter(File directory, PersistenceAdapter longTermPersistence) throws IOException {
115            this();
116            this.directory = directory;
117            this.longTermPersistence = longTermPersistence;
118        }
119    
120        public Map getInitialDestinations() {
121            return longTermPersistence.getInitialDestinations();
122        }
123        
124        private MessageStore createMessageStore(String destination, boolean isQueue) throws JMSException {
125            if(isQueue) {
126                return createQueueMessageStore(destination);
127            } else {
128                return createTopicMessageStore(destination);
129            }
130        }
131    
132        public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
133            JournalMessageStore store = (JournalMessageStore) messageStores.get(destinationName);
134            if( store == null ) {
135                    MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destinationName);
136                    store = new JournalMessageStore(this, checkpointStore, destinationName);
137                    messageStores.put(destinationName, store);
138            }
139            return store;
140        }
141    
142        public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException {
143            JournalTopicMessageStore store = (JournalTopicMessageStore) topicMessageStores.get(destinationName);
144            if( store == null ) {
145                TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName);
146                    store = new JournalTopicMessageStore(this, checkpointStore, destinationName);
147                    topicMessageStores.put(destinationName, store);
148            }
149            return store;
150        }
151    
152        public TransactionStore createTransactionStore() throws JMSException {
153            return transactionStore;
154        }
155    
156        public void beginTransaction() throws JMSException {
157            longTermPersistence.beginTransaction();
158        }
159    
160        public void commitTransaction() throws JMSException {
161            longTermPersistence.commitTransaction();
162        }
163    
164        public void rollbackTransaction() {
165            longTermPersistence.rollbackTransaction();
166        }
167    
168        public synchronized void start() throws JMSException {
169            
170            if( longTermPersistence instanceof JDBCPersistenceAdapter ) {
171                // Disabled periodic clean up as it deadlocks with the checkpoint operations.
172                ((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0);
173            }
174            
175            longTermPersistence.start();
176            createTransactionStore();
177            if (journal == null) {
178                try {
179                    log.info("Opening journal.");
180                    journal = createJournal();
181                    log.info("Opened journal: " + journal);
182                    journal.setJournalEventListener(this);
183                }
184                catch (Exception e) {
185                    throw JMSExceptionHelper.newJMSException("Failed to open transaction journal: " + e, e);
186                }
187                try {
188                    recover();
189                }
190                catch (Exception e) {
191                    throw JMSExceptionHelper.newJMSException("Failed to recover transactions from journal: " + e, e);
192                }
193            }
194    
195            // Do a checkpoint periodically.
196            clockTicket = getClockDaemon().executePeriodically(checkpointInterval, new Runnable() {
197                public void run() {
198                    checkpoint(false);
199                }
200            }, false);
201    
202        }
203    
204        public synchronized void stop() throws JMSException {
205    
206            if (clockTicket != null) {
207                // Stop the periodical checkpoint.
208                ClockDaemon.cancel(clockTicket);
209                clockTicket=null;
210                clockDaemon.shutDown();
211            }
212    
213            // Take one final checkpoint and stop checkpoint processing.
214            checkpoint(true);
215            checkpointExecutor.shutdownAfterProcessingCurrentlyQueuedTasks();
216    
217            JMSException firstException = null;
218            if (journal != null) {
219                try {
220                    journal.close();
221                    journal = null;
222                }
223                catch (Exception e) {
224                    firstException = JMSExceptionHelper.newJMSException("Failed to close journals: " + e, e);
225                }
226            }
227            longTermPersistence.stop();
228    
229            if (firstException != null) {
230                throw firstException;
231            }
232        }
233    
234        // Properties
235        //-------------------------------------------------------------------------
236        public PersistenceAdapter getLongTermPersistence() {
237            return longTermPersistence;
238        }
239    
240        public void setLongTermPersistence(PersistenceAdapter longTermPersistence) {
241            this.longTermPersistence = longTermPersistence;
242        }
243    
244        /**
245         * @return Returns the directory.
246         */
247        public File getDirectory() {
248            return directory;
249        }
250    
251        /**
252         * @param directory The directory to set.
253         */
254        public void setDirectory(File directory) {
255            this.directory = directory;
256        }
257    
258        /**
259         * @return Returns the wireFormat.
260         */
261        public WireFormat getWireFormat() {
262            return wireFormat;
263        }
264    
265        public String getJournalType() {
266            return journalType;
267        }
268    
269        public void setJournalType(String journalType) {
270            this.journalType = journalType;
271        }
272    
273        protected Journal createJournal() throws IOException {
274            if( DEFAULT_JOURNAL_TYPE.equals(journalType) ) {
275                return new JournalImpl(directory,logFileCount,logFileSize);
276            }
277            
278            if( HOWL_JOURNAL_TYPE.equals(journalType) ) {
279                try {
280                    Configuration config = new Configuration();
281                    config.setLogFileDir(directory.getCanonicalPath());
282                    return new HowlJournal(config);
283                } catch (IOException e) {
284                    throw e;
285                } catch (Exception e) {
286                    throw (IOException)new IOException("Could not open HOWL journal: "+e.getMessage()).initCause(e);
287                }
288            }
289            
290            throw new IllegalStateException("Unsupported valued for journalType attribute: "+journalType);
291        }
292    
293        // Implementation methods
294        //-------------------------------------------------------------------------
295    
296        /**
297         * The Journal give us a call back so that we can move old data out of the journal.
298         * Taking a checkpoint does this for us.
299         *
300         * @see org.activemq.journal.JournalEventListener#overflowNotification(org.activemq.journal.RecordLocation)
301         */
302        public void overflowNotification(RecordLocation safeLocation) {
303            checkpoint(false);
304        }
305    
306        /**
307         * When we checkpoint we move all the journaled data to long term storage.
308         * @param b 
309         */
310        public void checkpoint(boolean sync) {
311            try {
312                
313                if( journal == null )
314                    throw new IllegalStateException("Journal is closed.");
315                
316                // Do the checkpoint asynchronously?
317                Latch latch=null;
318                if( sync ) {
319                    latch = new Latch();
320                    checkpointRequests.put(latch);
321                } else {
322                    checkpointRequests.put(Boolean.TRUE);
323                }
324                
325                checkpointExecutor.execute(new Runnable() {
326                    public void run() {
327    
328                        ArrayList listners = new ArrayList();
329                        
330                        try { 
331                            // Avoid running a checkpoint too many times in a row.
332                            // Consume any queued up checkpoint requests.
333                            try {
334                                boolean requested = false;
335                                Object t;
336                                while ((t=checkpointRequests.poll(0)) != null) {
337                                    if( t.getClass()==Latch.class )
338                                        listners.add(t);
339                                    requested = true;
340                                }
341                                if (!requested) {
342                                    return;
343                                }
344                            }
345                            catch (InterruptedException e1) {
346                                return;
347                            }
348        
349                            log.debug("Checkpoint started.");
350                            RecordLocation newMark = null;
351        
352                            Iterator iterator = messageStores.values().iterator();
353                            while (iterator.hasNext()) {
354                                try {
355                                    JournalMessageStore ms = (JournalMessageStore) iterator.next();
356                                    RecordLocation mark = ms.checkpoint();
357                                    if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
358                                        newMark = mark;
359                                    }
360                                }
361                                catch (Exception e) {
362                                    log.error("Failed to checkpoint a message store: " + e, e);
363                                }
364                            }
365                            
366                            iterator = topicMessageStores.values().iterator();
367                            while (iterator.hasNext()) {
368                                try {
369                                    JournalTopicMessageStore ms = (JournalTopicMessageStore) iterator.next();
370                                    RecordLocation mark = ms.checkpoint();
371                                    if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
372                                        newMark = mark;
373                                    }
374                                }
375                                catch (Exception e) {
376                                    log.error("Failed to checkpoint a message store: " + e, e);
377                                }
378                            }
379                            
380                            try {
381                                if (newMark != null) {
382                                    if( log.isDebugEnabled() )
383                                        log.debug("Marking journal: "+newMark);
384                                    journal.setMark(newMark, true);
385                                }
386                            }
387                            catch (Exception e) {
388                                log.error("Failed to mark the Journal: " + e, e);
389                            }
390                            
391                            // Clean up the DB if it's a JDBC store.
392                            if( longTermPersistence instanceof JDBCPersistenceAdapter ) {
393                                // Disabled periodic clean up as it deadlocks with the checkpoint operations.
394                                ((JDBCPersistenceAdapter)longTermPersistence).cleanup();
395                            }
396    
397                            log.debug("Checkpoint done.");
398                        } finally {
399                            for (Iterator iter = listners.iterator(); iter.hasNext();) {
400                                Latch latch = (Latch) iter.next();
401                                latch.release();
402                            }
403                        }
404                    }
405                });
406    
407                if( sync ) {
408                    latch.acquire();
409                }
410            }
411            catch (InterruptedException e) {
412                log.warn("Request to start checkpoint failed: " + e, e);
413            }
414        }
415    
416        /**
417         * @param destinationName
418         * @param message
419         * @param sync
420         * @throws JMSException
421         */
422        public RecordLocation writePacket(String destination, Packet packet, boolean sync) throws JMSException {
423            try {
424    
425                PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream();
426                DataOutputStream os = new DataOutputStream(pos);
427                os.writeByte(PACKET_RECORD_TYPE);
428                os.writeUTF(destination);
429                os.close();
430                org.activeio.Packet p = wireFormat.writePacket(packet, pos);
431                return journal.write(p, sync);
432            }
433            catch (IOException e) {
434                throw createWriteException(packet, e);
435            }
436        }
437    
438        /**
439         * @param destinationName
440         * @param message
441         * @param sync
442         * @throws JMSException
443         */
444        public RecordLocation writeCommand(String command, boolean sync) throws JMSException {
445            try {
446    
447                PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream();
448                DataOutputStream os = new DataOutputStream(pos);
449                os.writeByte(COMMAND_RECORD_TYPE);
450                os.writeUTF(command);
451                os.close();
452                return journal.write(pos.getPacket(), sync);
453    
454            }
455            catch (IOException e) {
456                throw createWriteException(command, e);
457            }
458        }
459    
460        /**
461         * @param location
462         * @return
463         * @throws JMSException
464         */
465        public Packet readPacket(RecordLocation location) throws JMSException {
466            try {
467                org.activeio.Packet data = journal.read(location);
468                DataInputStream is = new DataInputStream(new PacketInputStream(data));
469                byte type = is.readByte();
470                if (type != PACKET_RECORD_TYPE) {
471                    throw new IOException("Record is not a packet type.");
472                }
473                String destination = is.readUTF();
474                Packet packet = wireFormat.readPacket(data);
475                is.close();
476                return packet;
477    
478            }
479            catch (InvalidRecordLocationException e) {
480                throw createReadException(location, e);
481            }
482            catch (IOException e) {
483                throw createReadException(location, e);
484            }
485        }
486    
487    
488        /**
489         * Move all the messages that were in the journal into long term storeage.  We just replay and do a checkpoint.
490         *
491         * @throws JMSException
492         * @throws IOException
493         * @throws InvalidRecordLocationException
494         * @throws IllegalStateException
495         */
496        private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, JMSException {
497    
498            RecordLocation pos = null;
499            int transactionCounter = 0;
500    
501            log.info("Journal Recovery Started.");
502    
503            // While we have records in the journal.
504            while ((pos = journal.getNextRecordLocation(pos)) != null) {
505                org.activeio.Packet data = journal.read(pos);
506                DataInputStream is = new DataInputStream(new PacketInputStream(data));
507    
508                // Read the destination and packate from the record.
509                String destination = null;
510                Packet packet = null;
511                try {
512                    byte type = is.readByte();
513                    switch (type) {
514                        case PACKET_RECORD_TYPE:
515    
516                            // Is the current packet part of the destination?
517                            destination = is.readUTF();
518                            packet = wireFormat.readPacket(data);
519    
520                            // Try to replay the packet.
521                            if (packet instanceof ActiveMQMessage) {
522                                ActiveMQMessage msg = (ActiveMQMessage) packet;
523                                
524                                JournalMessageStore store = (JournalMessageStore) createMessageStore(destination, msg.getJMSActiveMQDestination().isQueue());
525                                if( msg.getTransactionId()!=null ) {
526                                    transactionStore.addMessage(store, msg, pos);
527                                } else {
528                                    store.replayAddMessage(msg);
529                                    transactionCounter++;
530                                }
531                            }
532                            else if (packet instanceof MessageAck) {
533                                MessageAck ack = (MessageAck) packet;
534                                JournalMessageStore store = (JournalMessageStore) createMessageStore(destination, ack.getDestination().isQueue());
535                                if( ack.getTransactionId()!=null ) {
536                                    transactionStore.removeMessage(store, ack, pos);
537                                } else {
538                                    store.replayRemoveMessage(ack);
539                                    transactionCounter++;
540                                }
541                            }
542                            else {
543                                log.error("Unknown type of packet in transaction log which will be discarded: " + packet);
544                            }
545    
546                            break;
547                        case TX_COMMAND_RECORD_TYPE:
548                            
549                            TxCommand command = new TxCommand();
550                            command.setType(is.readByte());
551                            command.setWasPrepared(is.readBoolean());
552                            switch(command.getType()) {
553                                    case TxCommand.LOCAL_COMMIT:
554                                    case TxCommand.LOCAL_ROLLBACK:
555                                        command.setTransactionId(is.readUTF());
556                                        break;
557                                    default:
558                                        command.setTransactionId(ActiveMQXid.read(is));
559                                            break;
560                            }
561                            
562                            // Try to replay the packet.
563                            switch(command.getType()) {
564                                    case TxCommand.XA_PREPARE:
565                                  transactionStore.replayPrepare(command.getTransactionId());
566                                        break;
567                                    case TxCommand.XA_COMMIT:
568                                    case TxCommand.LOCAL_COMMIT:
569                                  Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
570                                  // Replay the committed operations.
571                                  if( tx!=null) {
572                                      tx.getOperations();
573                                      for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
574                                          TxOperation op = (TxOperation) iter.next();
575                                          if( op.operationType == TxOperation.ADD_OPERATION_TYPE ) {
576                                              op.store.replayAddMessage((ActiveMQMessage) op.data);
577                                          }
578                                          if( op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
579                                              op.store.replayRemoveMessage((MessageAck) op.data);
580                                          }
581                                          if( op.operationType == TxOperation.ACK_OPERATION_TYPE) {
582                                              JournalAck ack = (JournalAck) op.data;
583                                              ((JournalTopicMessageStore)op.store).replayAcknowledge(ack.getSubscription(), new MessageIdentity(ack.getMessageId()));
584                                          }
585                                      }
586                                      transactionCounter++;
587                                  }
588                                        break;
589                                    case TxCommand.LOCAL_ROLLBACK:
590                                    case TxCommand.XA_ROLLBACK:
591                                  transactionStore.replayRollback(command.getTransactionId());
592                                        break;
593                            }
594                            
595                            break;
596                            
597                        case ACK_RECORD_TYPE:
598                            
599                            destination = is.readUTF();
600                            String subscription = is.readUTF();
601                            String messageId = is.readUTF();
602                            Object transactionId=null;
603                            
604                            JournalTopicMessageStore store = (JournalTopicMessageStore) createMessageStore(destination, false);
605                            if( transactionId!=null ) {
606                                JournalAck ack = new JournalAck(destination, subscription, messageId, transactionId);
607                                transactionStore.acknowledge(store, ack, pos);
608                            } else {
609                                store.replayAcknowledge(subscription, new MessageIdentity(messageId));
610                                transactionCounter++;
611                            }
612                            
613                        case COMMAND_RECORD_TYPE:
614    
615                            break;
616                        default:
617                            log.error("Unknown type of record in transaction log which will be discarded: " + type);
618                            break;
619                    }
620                }
621                finally {
622                    is.close();
623                }
624            }
625    
626            RecordLocation location = writeCommand("RECOVERED", true);
627            journal.setMark(location, true);
628    
629            log.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
630        }
631    
632        private JMSException createReadException(RecordLocation location, Exception e) {
633            return JMSExceptionHelper.newJMSException("Failed to read to journal for: " + location + ". Reason: " + e, e);
634        }
635    
636        protected JMSException createWriteException(Packet packet, Exception e) {
637            return JMSExceptionHelper.newJMSException("Failed to write to journal for: " + packet + ". Reason: " + e, e);
638        }
639        
640        private XAException createWriteException(TxCommand command, Exception e) {
641            return (XAException)new XAException("Failed to write to journal for: " + command + ". Reason: " + e).initCause(e);
642        }
643    
644    
645        protected JMSException createWriteException(String command, Exception e) {
646            return JMSExceptionHelper.newJMSException("Failed to write to journal for command: " + command + ". Reason: " + e, e);
647        }
648    
649        protected JMSException createRecoveryFailedException(Exception e) {
650            return JMSExceptionHelper.newJMSException("Failed to recover from journal. Reason: " + e, e);
651        }
652    
653        public ClockDaemon getClockDaemon() {
654            if (clockDaemon == null) {
655                clockDaemon = new ClockDaemon();
656                clockDaemon.setThreadFactory(new ThreadFactory() {
657                    public Thread newThread(Runnable runnable) {
658                        Thread thread = new Thread(runnable, "Checkpoint Timmer");
659                        thread.setDaemon(true);
660                        return thread;
661                    }
662                });
663            }
664            return clockDaemon;
665        }
666    
667        public void setClockDaemon(ClockDaemon clockDaemon) {
668            this.clockDaemon = clockDaemon;
669        }
670    
671        /**
672         * @param xid
673         * @return
674         */
675        public RecordLocation writeTxCommand(TxCommand command, boolean sync) throws XAException {
676            try {
677    
678                PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream();
679                DataOutputStream os = new DataOutputStream(pos);
680                os.writeByte(TX_COMMAND_RECORD_TYPE);
681                os.writeByte(command.getType());
682                os.writeBoolean(command.getWasPrepared());
683                switch(command.getType()) {
684                    case TxCommand.LOCAL_COMMIT:
685                    case TxCommand.LOCAL_ROLLBACK:
686                        os.writeUTF( (String) command.getTransactionId() );
687                        break;
688                    default:
689                        ActiveMQXid xid = (ActiveMQXid) command.getTransactionId();
690                            xid.write(os);
691                            break;
692                }
693                os.close();
694                return journal.write(pos.getPacket(), sync);
695            }
696            catch (IOException e) {
697                throw createWriteException(command, e);
698            }
699        }
700    
701        /**
702         * @param destinationName
703         * @param persistentKey
704         * @param messageIdentity
705         * @param b
706         * @return
707         */
708        public RecordLocation writePacket(String destinationName, String subscription, MessageIdentity messageIdentity, boolean sync) throws JMSException{
709            try {
710    
711                PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream();
712                DataOutputStream os = new DataOutputStream(pos);
713                os.writeByte(ACK_RECORD_TYPE);
714                os.writeUTF(destinationName);
715                os.writeUTF(subscription);
716                os.writeUTF(messageIdentity.getMessageID());
717                os.close();
718                return journal.write(pos.getPacket(), sync);
719    
720            }
721            catch (IOException e) {
722                throw createWriteException("Ack for message: "+messageIdentity, e);
723            }
724        }
725    
726        public JournalTransactionStore getTransactionStore() {
727            return transactionStore;
728        }
729    
730        public int getLogFileCount() {
731            return logFileCount;
732        }
733    
734        public void setLogFileCount(int logFileCount) {
735            this.logFileCount = logFileCount;
736        }
737    
738        public int getLogFileSize() {
739            return logFileSize;
740        }
741    
742        public void setLogFileSize(int logFileSize) {
743            this.logFileSize = logFileSize;
744        }
745    
746        public long getCheckpointInterval() {
747            return checkpointInterval;
748        }
749        public void setCheckpointInterval(long checkpointInterval) {
750            this.checkpointInterval = checkpointInterval;
751        }
752    }