001 /** 002 * 003 * Copyright 2004 Hiram Chirino 004 * Copyright 2004 Protique Ltd 005 * 006 * Licensed under the Apache License, Version 2.0 (the "License"); 007 * you may not use this file except in compliance with the License. 008 * You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 * 018 **/ 019 package org.activemq.store.journal; 020 021 import java.io.DataInputStream; 022 import java.io.DataOutputStream; 023 import java.io.File; 024 import java.io.IOException; 025 import java.util.ArrayList; 026 import java.util.Iterator; 027 import java.util.Map; 028 029 import javax.jms.JMSException; 030 import javax.transaction.xa.XAException; 031 032 import org.activeio.adapter.PacketByteArrayOutputStream; 033 import org.activeio.adapter.PacketInputStream; 034 import org.activeio.journal.InvalidRecordLocationException; 035 import org.activeio.journal.Journal; 036 import org.activeio.journal.JournalEventListener; 037 import org.activeio.journal.RecordLocation; 038 import org.activeio.journal.active.JournalImpl; 039 import org.activeio.journal.howl.HowlJournal; 040 import org.activemq.io.WireFormat; 041 import org.activemq.io.impl.StatelessDefaultWireFormat; 042 import org.activemq.message.ActiveMQMessage; 043 import org.activemq.message.ActiveMQXid; 044 import org.activemq.message.MessageAck; 045 import org.activemq.message.Packet; 046 import org.activemq.service.MessageIdentity; 047 import org.activemq.store.MessageStore; 048 import org.activemq.store.PersistenceAdapter; 049 import org.activemq.store.TopicMessageStore; 050 import org.activemq.store.TransactionStore; 051 import org.activemq.store.jdbc.JDBCPersistenceAdapter; 052 import org.activemq.store.journal.JournalTransactionStore.Tx; 053 import org.activemq.store.journal.JournalTransactionStore.TxOperation; 054 import org.activemq.util.JMSExceptionHelper; 055 import org.apache.commons.logging.Log; 056 import org.apache.commons.logging.LogFactory; 057 import org.objectweb.howl.log.Configuration; 058 059 import EDU.oswego.cs.dl.util.concurrent.Channel; 060 import EDU.oswego.cs.dl.util.concurrent.ClockDaemon; 061 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 062 import EDU.oswego.cs.dl.util.concurrent.Latch; 063 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 064 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor; 065 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory; 066 067 /** 068 * An implementation of {@link PersistenceAdapter} designed for 069 * use with a {@link Journal} and then checkpointing asynchronously 070 * on a timeout with some other long term persistent storage. 071 * 072 * @version $Revision: 1.1 $ 073 */ 074 public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener { 075 076 private static final Log log = LogFactory.getLog(JournalPersistenceAdapter.class); 077 public static final String DEFAULT_JOURNAL_TYPE = "default"; 078 public static final String HOWL_JOURNAL_TYPE = "howl"; 079 080 private Journal journal; 081 private String journalType = DEFAULT_JOURNAL_TYPE; 082 private PersistenceAdapter longTermPersistence; 083 private File directory = new File("logs"); 084 private final StatelessDefaultWireFormat wireFormat = new StatelessDefaultWireFormat(); 085 private final ConcurrentHashMap messageStores = new ConcurrentHashMap(); 086 private final ConcurrentHashMap topicMessageStores = new ConcurrentHashMap(); 087 088 private static final int PACKET_RECORD_TYPE = 0; 089 private static final int COMMAND_RECORD_TYPE = 1; 090 private static final int TX_COMMAND_RECORD_TYPE = 2; 091 private static final int ACK_RECORD_TYPE = 3; 092 093 private Channel checkpointRequests = new LinkedQueue(); 094 private QueuedExecutor checkpointExecutor; 095 ClockDaemon clockDaemon; 096 private Object clockTicket; 097 private JournalTransactionStore transactionStore = new JournalTransactionStore(this); 098 private int logFileSize=1024*1024*20; 099 private int logFileCount=2; 100 private long checkpointInterval = 1000 * 60 * 5; 101 102 public JournalPersistenceAdapter() { 103 checkpointExecutor = new QueuedExecutor(new LinkedQueue()); 104 checkpointExecutor.setThreadFactory(new ThreadFactory() { 105 public Thread newThread(Runnable runnable) { 106 Thread answer = new Thread(runnable, "Checkpoint Worker"); 107 answer.setDaemon(true); 108 answer.setPriority(Thread.MAX_PRIORITY); 109 return answer; 110 } 111 }); 112 } 113 114 public JournalPersistenceAdapter(File directory, PersistenceAdapter longTermPersistence) throws IOException { 115 this(); 116 this.directory = directory; 117 this.longTermPersistence = longTermPersistence; 118 } 119 120 public Map getInitialDestinations() { 121 return longTermPersistence.getInitialDestinations(); 122 } 123 124 private MessageStore createMessageStore(String destination, boolean isQueue) throws JMSException { 125 if(isQueue) { 126 return createQueueMessageStore(destination); 127 } else { 128 return createTopicMessageStore(destination); 129 } 130 } 131 132 public MessageStore createQueueMessageStore(String destinationName) throws JMSException { 133 JournalMessageStore store = (JournalMessageStore) messageStores.get(destinationName); 134 if( store == null ) { 135 MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destinationName); 136 store = new JournalMessageStore(this, checkpointStore, destinationName); 137 messageStores.put(destinationName, store); 138 } 139 return store; 140 } 141 142 public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException { 143 JournalTopicMessageStore store = (JournalTopicMessageStore) topicMessageStores.get(destinationName); 144 if( store == null ) { 145 TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName); 146 store = new JournalTopicMessageStore(this, checkpointStore, destinationName); 147 topicMessageStores.put(destinationName, store); 148 } 149 return store; 150 } 151 152 public TransactionStore createTransactionStore() throws JMSException { 153 return transactionStore; 154 } 155 156 public void beginTransaction() throws JMSException { 157 longTermPersistence.beginTransaction(); 158 } 159 160 public void commitTransaction() throws JMSException { 161 longTermPersistence.commitTransaction(); 162 } 163 164 public void rollbackTransaction() { 165 longTermPersistence.rollbackTransaction(); 166 } 167 168 public synchronized void start() throws JMSException { 169 170 if( longTermPersistence instanceof JDBCPersistenceAdapter ) { 171 // Disabled periodic clean up as it deadlocks with the checkpoint operations. 172 ((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0); 173 } 174 175 longTermPersistence.start(); 176 createTransactionStore(); 177 if (journal == null) { 178 try { 179 log.info("Opening journal."); 180 journal = createJournal(); 181 log.info("Opened journal: " + journal); 182 journal.setJournalEventListener(this); 183 } 184 catch (Exception e) { 185 throw JMSExceptionHelper.newJMSException("Failed to open transaction journal: " + e, e); 186 } 187 try { 188 recover(); 189 } 190 catch (Exception e) { 191 throw JMSExceptionHelper.newJMSException("Failed to recover transactions from journal: " + e, e); 192 } 193 } 194 195 // Do a checkpoint periodically. 196 clockTicket = getClockDaemon().executePeriodically(checkpointInterval, new Runnable() { 197 public void run() { 198 checkpoint(false); 199 } 200 }, false); 201 202 } 203 204 public synchronized void stop() throws JMSException { 205 206 if (clockTicket != null) { 207 // Stop the periodical checkpoint. 208 ClockDaemon.cancel(clockTicket); 209 clockTicket=null; 210 clockDaemon.shutDown(); 211 } 212 213 // Take one final checkpoint and stop checkpoint processing. 214 checkpoint(true); 215 checkpointExecutor.shutdownAfterProcessingCurrentlyQueuedTasks(); 216 217 JMSException firstException = null; 218 if (journal != null) { 219 try { 220 journal.close(); 221 journal = null; 222 } 223 catch (Exception e) { 224 firstException = JMSExceptionHelper.newJMSException("Failed to close journals: " + e, e); 225 } 226 } 227 longTermPersistence.stop(); 228 229 if (firstException != null) { 230 throw firstException; 231 } 232 } 233 234 // Properties 235 //------------------------------------------------------------------------- 236 public PersistenceAdapter getLongTermPersistence() { 237 return longTermPersistence; 238 } 239 240 public void setLongTermPersistence(PersistenceAdapter longTermPersistence) { 241 this.longTermPersistence = longTermPersistence; 242 } 243 244 /** 245 * @return Returns the directory. 246 */ 247 public File getDirectory() { 248 return directory; 249 } 250 251 /** 252 * @param directory The directory to set. 253 */ 254 public void setDirectory(File directory) { 255 this.directory = directory; 256 } 257 258 /** 259 * @return Returns the wireFormat. 260 */ 261 public WireFormat getWireFormat() { 262 return wireFormat; 263 } 264 265 public String getJournalType() { 266 return journalType; 267 } 268 269 public void setJournalType(String journalType) { 270 this.journalType = journalType; 271 } 272 273 protected Journal createJournal() throws IOException { 274 if( DEFAULT_JOURNAL_TYPE.equals(journalType) ) { 275 return new JournalImpl(directory,logFileCount,logFileSize); 276 } 277 278 if( HOWL_JOURNAL_TYPE.equals(journalType) ) { 279 try { 280 Configuration config = new Configuration(); 281 config.setLogFileDir(directory.getCanonicalPath()); 282 return new HowlJournal(config); 283 } catch (IOException e) { 284 throw e; 285 } catch (Exception e) { 286 throw (IOException)new IOException("Could not open HOWL journal: "+e.getMessage()).initCause(e); 287 } 288 } 289 290 throw new IllegalStateException("Unsupported valued for journalType attribute: "+journalType); 291 } 292 293 // Implementation methods 294 //------------------------------------------------------------------------- 295 296 /** 297 * The Journal give us a call back so that we can move old data out of the journal. 298 * Taking a checkpoint does this for us. 299 * 300 * @see org.activemq.journal.JournalEventListener#overflowNotification(org.activemq.journal.RecordLocation) 301 */ 302 public void overflowNotification(RecordLocation safeLocation) { 303 checkpoint(false); 304 } 305 306 /** 307 * When we checkpoint we move all the journaled data to long term storage. 308 * @param b 309 */ 310 public void checkpoint(boolean sync) { 311 try { 312 313 if( journal == null ) 314 throw new IllegalStateException("Journal is closed."); 315 316 // Do the checkpoint asynchronously? 317 Latch latch=null; 318 if( sync ) { 319 latch = new Latch(); 320 checkpointRequests.put(latch); 321 } else { 322 checkpointRequests.put(Boolean.TRUE); 323 } 324 325 checkpointExecutor.execute(new Runnable() { 326 public void run() { 327 328 ArrayList listners = new ArrayList(); 329 330 try { 331 // Avoid running a checkpoint too many times in a row. 332 // Consume any queued up checkpoint requests. 333 try { 334 boolean requested = false; 335 Object t; 336 while ((t=checkpointRequests.poll(0)) != null) { 337 if( t.getClass()==Latch.class ) 338 listners.add(t); 339 requested = true; 340 } 341 if (!requested) { 342 return; 343 } 344 } 345 catch (InterruptedException e1) { 346 return; 347 } 348 349 log.debug("Checkpoint started."); 350 RecordLocation newMark = null; 351 352 Iterator iterator = messageStores.values().iterator(); 353 while (iterator.hasNext()) { 354 try { 355 JournalMessageStore ms = (JournalMessageStore) iterator.next(); 356 RecordLocation mark = ms.checkpoint(); 357 if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { 358 newMark = mark; 359 } 360 } 361 catch (Exception e) { 362 log.error("Failed to checkpoint a message store: " + e, e); 363 } 364 } 365 366 iterator = topicMessageStores.values().iterator(); 367 while (iterator.hasNext()) { 368 try { 369 JournalTopicMessageStore ms = (JournalTopicMessageStore) iterator.next(); 370 RecordLocation mark = ms.checkpoint(); 371 if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { 372 newMark = mark; 373 } 374 } 375 catch (Exception e) { 376 log.error("Failed to checkpoint a message store: " + e, e); 377 } 378 } 379 380 try { 381 if (newMark != null) { 382 if( log.isDebugEnabled() ) 383 log.debug("Marking journal: "+newMark); 384 journal.setMark(newMark, true); 385 } 386 } 387 catch (Exception e) { 388 log.error("Failed to mark the Journal: " + e, e); 389 } 390 391 // Clean up the DB if it's a JDBC store. 392 if( longTermPersistence instanceof JDBCPersistenceAdapter ) { 393 // Disabled periodic clean up as it deadlocks with the checkpoint operations. 394 ((JDBCPersistenceAdapter)longTermPersistence).cleanup(); 395 } 396 397 log.debug("Checkpoint done."); 398 } finally { 399 for (Iterator iter = listners.iterator(); iter.hasNext();) { 400 Latch latch = (Latch) iter.next(); 401 latch.release(); 402 } 403 } 404 } 405 }); 406 407 if( sync ) { 408 latch.acquire(); 409 } 410 } 411 catch (InterruptedException e) { 412 log.warn("Request to start checkpoint failed: " + e, e); 413 } 414 } 415 416 /** 417 * @param destinationName 418 * @param message 419 * @param sync 420 * @throws JMSException 421 */ 422 public RecordLocation writePacket(String destination, Packet packet, boolean sync) throws JMSException { 423 try { 424 425 PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream(); 426 DataOutputStream os = new DataOutputStream(pos); 427 os.writeByte(PACKET_RECORD_TYPE); 428 os.writeUTF(destination); 429 os.close(); 430 org.activeio.Packet p = wireFormat.writePacket(packet, pos); 431 return journal.write(p, sync); 432 } 433 catch (IOException e) { 434 throw createWriteException(packet, e); 435 } 436 } 437 438 /** 439 * @param destinationName 440 * @param message 441 * @param sync 442 * @throws JMSException 443 */ 444 public RecordLocation writeCommand(String command, boolean sync) throws JMSException { 445 try { 446 447 PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream(); 448 DataOutputStream os = new DataOutputStream(pos); 449 os.writeByte(COMMAND_RECORD_TYPE); 450 os.writeUTF(command); 451 os.close(); 452 return journal.write(pos.getPacket(), sync); 453 454 } 455 catch (IOException e) { 456 throw createWriteException(command, e); 457 } 458 } 459 460 /** 461 * @param location 462 * @return 463 * @throws JMSException 464 */ 465 public Packet readPacket(RecordLocation location) throws JMSException { 466 try { 467 org.activeio.Packet data = journal.read(location); 468 DataInputStream is = new DataInputStream(new PacketInputStream(data)); 469 byte type = is.readByte(); 470 if (type != PACKET_RECORD_TYPE) { 471 throw new IOException("Record is not a packet type."); 472 } 473 String destination = is.readUTF(); 474 Packet packet = wireFormat.readPacket(data); 475 is.close(); 476 return packet; 477 478 } 479 catch (InvalidRecordLocationException e) { 480 throw createReadException(location, e); 481 } 482 catch (IOException e) { 483 throw createReadException(location, e); 484 } 485 } 486 487 488 /** 489 * Move all the messages that were in the journal into long term storeage. We just replay and do a checkpoint. 490 * 491 * @throws JMSException 492 * @throws IOException 493 * @throws InvalidRecordLocationException 494 * @throws IllegalStateException 495 */ 496 private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, JMSException { 497 498 RecordLocation pos = null; 499 int transactionCounter = 0; 500 501 log.info("Journal Recovery Started."); 502 503 // While we have records in the journal. 504 while ((pos = journal.getNextRecordLocation(pos)) != null) { 505 org.activeio.Packet data = journal.read(pos); 506 DataInputStream is = new DataInputStream(new PacketInputStream(data)); 507 508 // Read the destination and packate from the record. 509 String destination = null; 510 Packet packet = null; 511 try { 512 byte type = is.readByte(); 513 switch (type) { 514 case PACKET_RECORD_TYPE: 515 516 // Is the current packet part of the destination? 517 destination = is.readUTF(); 518 packet = wireFormat.readPacket(data); 519 520 // Try to replay the packet. 521 if (packet instanceof ActiveMQMessage) { 522 ActiveMQMessage msg = (ActiveMQMessage) packet; 523 524 JournalMessageStore store = (JournalMessageStore) createMessageStore(destination, msg.getJMSActiveMQDestination().isQueue()); 525 if( msg.getTransactionId()!=null ) { 526 transactionStore.addMessage(store, msg, pos); 527 } else { 528 store.replayAddMessage(msg); 529 transactionCounter++; 530 } 531 } 532 else if (packet instanceof MessageAck) { 533 MessageAck ack = (MessageAck) packet; 534 JournalMessageStore store = (JournalMessageStore) createMessageStore(destination, ack.getDestination().isQueue()); 535 if( ack.getTransactionId()!=null ) { 536 transactionStore.removeMessage(store, ack, pos); 537 } else { 538 store.replayRemoveMessage(ack); 539 transactionCounter++; 540 } 541 } 542 else { 543 log.error("Unknown type of packet in transaction log which will be discarded: " + packet); 544 } 545 546 break; 547 case TX_COMMAND_RECORD_TYPE: 548 549 TxCommand command = new TxCommand(); 550 command.setType(is.readByte()); 551 command.setWasPrepared(is.readBoolean()); 552 switch(command.getType()) { 553 case TxCommand.LOCAL_COMMIT: 554 case TxCommand.LOCAL_ROLLBACK: 555 command.setTransactionId(is.readUTF()); 556 break; 557 default: 558 command.setTransactionId(ActiveMQXid.read(is)); 559 break; 560 } 561 562 // Try to replay the packet. 563 switch(command.getType()) { 564 case TxCommand.XA_PREPARE: 565 transactionStore.replayPrepare(command.getTransactionId()); 566 break; 567 case TxCommand.XA_COMMIT: 568 case TxCommand.LOCAL_COMMIT: 569 Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared()); 570 // Replay the committed operations. 571 if( tx!=null) { 572 tx.getOperations(); 573 for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) { 574 TxOperation op = (TxOperation) iter.next(); 575 if( op.operationType == TxOperation.ADD_OPERATION_TYPE ) { 576 op.store.replayAddMessage((ActiveMQMessage) op.data); 577 } 578 if( op.operationType == TxOperation.REMOVE_OPERATION_TYPE) { 579 op.store.replayRemoveMessage((MessageAck) op.data); 580 } 581 if( op.operationType == TxOperation.ACK_OPERATION_TYPE) { 582 JournalAck ack = (JournalAck) op.data; 583 ((JournalTopicMessageStore)op.store).replayAcknowledge(ack.getSubscription(), new MessageIdentity(ack.getMessageId())); 584 } 585 } 586 transactionCounter++; 587 } 588 break; 589 case TxCommand.LOCAL_ROLLBACK: 590 case TxCommand.XA_ROLLBACK: 591 transactionStore.replayRollback(command.getTransactionId()); 592 break; 593 } 594 595 break; 596 597 case ACK_RECORD_TYPE: 598 599 destination = is.readUTF(); 600 String subscription = is.readUTF(); 601 String messageId = is.readUTF(); 602 Object transactionId=null; 603 604 JournalTopicMessageStore store = (JournalTopicMessageStore) createMessageStore(destination, false); 605 if( transactionId!=null ) { 606 JournalAck ack = new JournalAck(destination, subscription, messageId, transactionId); 607 transactionStore.acknowledge(store, ack, pos); 608 } else { 609 store.replayAcknowledge(subscription, new MessageIdentity(messageId)); 610 transactionCounter++; 611 } 612 613 case COMMAND_RECORD_TYPE: 614 615 break; 616 default: 617 log.error("Unknown type of record in transaction log which will be discarded: " + type); 618 break; 619 } 620 } 621 finally { 622 is.close(); 623 } 624 } 625 626 RecordLocation location = writeCommand("RECOVERED", true); 627 journal.setMark(location, true); 628 629 log.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered."); 630 } 631 632 private JMSException createReadException(RecordLocation location, Exception e) { 633 return JMSExceptionHelper.newJMSException("Failed to read to journal for: " + location + ". Reason: " + e, e); 634 } 635 636 protected JMSException createWriteException(Packet packet, Exception e) { 637 return JMSExceptionHelper.newJMSException("Failed to write to journal for: " + packet + ". Reason: " + e, e); 638 } 639 640 private XAException createWriteException(TxCommand command, Exception e) { 641 return (XAException)new XAException("Failed to write to journal for: " + command + ". Reason: " + e).initCause(e); 642 } 643 644 645 protected JMSException createWriteException(String command, Exception e) { 646 return JMSExceptionHelper.newJMSException("Failed to write to journal for command: " + command + ". Reason: " + e, e); 647 } 648 649 protected JMSException createRecoveryFailedException(Exception e) { 650 return JMSExceptionHelper.newJMSException("Failed to recover from journal. Reason: " + e, e); 651 } 652 653 public ClockDaemon getClockDaemon() { 654 if (clockDaemon == null) { 655 clockDaemon = new ClockDaemon(); 656 clockDaemon.setThreadFactory(new ThreadFactory() { 657 public Thread newThread(Runnable runnable) { 658 Thread thread = new Thread(runnable, "Checkpoint Timmer"); 659 thread.setDaemon(true); 660 return thread; 661 } 662 }); 663 } 664 return clockDaemon; 665 } 666 667 public void setClockDaemon(ClockDaemon clockDaemon) { 668 this.clockDaemon = clockDaemon; 669 } 670 671 /** 672 * @param xid 673 * @return 674 */ 675 public RecordLocation writeTxCommand(TxCommand command, boolean sync) throws XAException { 676 try { 677 678 PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream(); 679 DataOutputStream os = new DataOutputStream(pos); 680 os.writeByte(TX_COMMAND_RECORD_TYPE); 681 os.writeByte(command.getType()); 682 os.writeBoolean(command.getWasPrepared()); 683 switch(command.getType()) { 684 case TxCommand.LOCAL_COMMIT: 685 case TxCommand.LOCAL_ROLLBACK: 686 os.writeUTF( (String) command.getTransactionId() ); 687 break; 688 default: 689 ActiveMQXid xid = (ActiveMQXid) command.getTransactionId(); 690 xid.write(os); 691 break; 692 } 693 os.close(); 694 return journal.write(pos.getPacket(), sync); 695 } 696 catch (IOException e) { 697 throw createWriteException(command, e); 698 } 699 } 700 701 /** 702 * @param destinationName 703 * @param persistentKey 704 * @param messageIdentity 705 * @param b 706 * @return 707 */ 708 public RecordLocation writePacket(String destinationName, String subscription, MessageIdentity messageIdentity, boolean sync) throws JMSException{ 709 try { 710 711 PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream(); 712 DataOutputStream os = new DataOutputStream(pos); 713 os.writeByte(ACK_RECORD_TYPE); 714 os.writeUTF(destinationName); 715 os.writeUTF(subscription); 716 os.writeUTF(messageIdentity.getMessageID()); 717 os.close(); 718 return journal.write(pos.getPacket(), sync); 719 720 } 721 catch (IOException e) { 722 throw createWriteException("Ack for message: "+messageIdentity, e); 723 } 724 } 725 726 public JournalTransactionStore getTransactionStore() { 727 return transactionStore; 728 } 729 730 public int getLogFileCount() { 731 return logFileCount; 732 } 733 734 public void setLogFileCount(int logFileCount) { 735 this.logFileCount = logFileCount; 736 } 737 738 public int getLogFileSize() { 739 return logFileSize; 740 } 741 742 public void setLogFileSize(int logFileSize) { 743 this.logFileSize = logFileSize; 744 } 745 746 public long getCheckpointInterval() { 747 return checkpointInterval; 748 } 749 public void setCheckpointInterval(long checkpointInterval) { 750 this.checkpointInterval = checkpointInterval; 751 } 752 }