package org.activemq.store.journal;

import EDU.oswego.cs.dl.util.concurrent.Channel;
import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.Latch;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import javax.jms.JMSException;
import javax.transaction.xa.XAException;
import org.activeio.adapter.PacketByteArrayOutputStream;
import org.activeio.adapter.PacketInputStream;
import org.activeio.journal.InvalidRecordLocationException;
import org.activeio.journal.Journal;
import org.activeio.journal.JournalEventListener;
import org.activeio.journal.RecordLocation;
import org.activeio.journal.active.JournalImpl;
import org.activeio.journal.howl.HowlJournal;
import org.activemq.io.WireFormat;
import org.activemq.io.impl.StatelessDefaultWireFormat;
import org.activemq.message.ActiveMQMessage;
import org.activemq.message.ActiveMQXid;
import org.activemq.message.MessageAck;
import org.activemq.message.Packet;
import org.activemq.service.MessageIdentity;
import org.activemq.store.MessageStore;
import org.activemq.store.PersistenceAdapter;
import org.activemq.store.TopicMessageStore;
import org.activemq.store.TransactionStore;
import org.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.activemq.store.journal.JournalTransactionStore;
import org.activemq.util.JMSExceptionHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.objectweb.howl.log.Configuration;

/* loaded from: input_file:org/activemq/store/journal/JournalPersistenceAdapter.class */
public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener {
    private static final Log log;
    public static final String DEFAULT_JOURNAL_TYPE = "default";
    public static final String HOWL_JOURNAL_TYPE = "howl";
    private Journal journal;
    private String journalType;
    private PersistenceAdapter longTermPersistence;
    private File directory;
    private final StatelessDefaultWireFormat wireFormat;
    private final ConcurrentHashMap messageStores;
    private final ConcurrentHashMap topicMessageStores;
    private static final int PACKET_RECORD_TYPE = 0;
    private static final int COMMAND_RECORD_TYPE = 1;
    private static final int TX_COMMAND_RECORD_TYPE = 2;
    private static final int ACK_RECORD_TYPE = 3;
    private Channel checkpointRequests;
    private QueuedExecutor checkpointExecutor;
    ClockDaemon clockDaemon;
    private Object clockTicket;
    private JournalTransactionStore transactionStore;
    private int logFileSize;
    private int logFileCount;
    static Class class$org$activemq$store$journal$JournalPersistenceAdapter;
    static Class class$EDU$oswego$cs$dl$util$concurrent$Latch;

    public JournalPersistenceAdapter() {
        this.journalType = DEFAULT_JOURNAL_TYPE;
        this.directory = new File("logs");
        this.wireFormat = new StatelessDefaultWireFormat();
        this.messageStores = new ConcurrentHashMap();
        this.topicMessageStores = new ConcurrentHashMap();
        this.checkpointRequests = new LinkedQueue();
        this.transactionStore = new JournalTransactionStore(this);
        this.logFileSize = 20971520;
        this.logFileCount = 2;
        this.checkpointExecutor = new QueuedExecutor(new LinkedQueue());
        this.checkpointExecutor.setThreadFactory(new ThreadFactory(this) { // from class: org.activemq.store.journal.JournalPersistenceAdapter.1
            private final JournalPersistenceAdapter this$0;

            {
                this.this$0 = this;
            }

            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable, "Checkpoint Worker");
                thread.setDaemon(true);
                thread.setPriority(10);
                return thread;
            }
        });
    }

    public JournalPersistenceAdapter(File file, PersistenceAdapter persistenceAdapter) throws IOException {
        this();
        this.directory = file;
        this.longTermPersistence = persistenceAdapter;
    }

    @Override // org.activemq.store.PersistenceAdapter
    public Map getInitialDestinations() {
        return this.longTermPersistence.getInitialDestinations();
    }

    private MessageStore createMessageStore(String str, boolean z) throws JMSException {
        return z ? createQueueMessageStore(str) : createTopicMessageStore(str);
    }

    @Override // org.activemq.store.PersistenceAdapter
    public MessageStore createQueueMessageStore(String str) throws JMSException {
        JournalMessageStore journalMessageStore = (JournalMessageStore) this.messageStores.get(str);
        if (journalMessageStore == null) {
            journalMessageStore = new JournalMessageStore(this, this.longTermPersistence.createQueueMessageStore(str), str);
            this.messageStores.put(str, journalMessageStore);
        }
        return journalMessageStore;
    }

    @Override // org.activemq.store.PersistenceAdapter
    public TopicMessageStore createTopicMessageStore(String str) throws JMSException {
        JournalTopicMessageStore journalTopicMessageStore = (JournalTopicMessageStore) this.topicMessageStores.get(str);
        if (journalTopicMessageStore == null) {
            journalTopicMessageStore = new JournalTopicMessageStore(this, this.longTermPersistence.createTopicMessageStore(str), str);
            this.topicMessageStores.put(str, journalTopicMessageStore);
        }
        return journalTopicMessageStore;
    }

    @Override // org.activemq.store.PersistenceAdapter
    public TransactionStore createTransactionStore() throws JMSException {
        return this.transactionStore;
    }

    @Override // org.activemq.store.PersistenceAdapter
    public void beginTransaction() throws JMSException {
        this.longTermPersistence.beginTransaction();
    }

    @Override // org.activemq.store.PersistenceAdapter
    public void commitTransaction() throws JMSException {
        this.longTermPersistence.commitTransaction();
    }

    @Override // org.activemq.store.PersistenceAdapter
    public void rollbackTransaction() {
        this.longTermPersistence.rollbackTransaction();
    }

    @Override // org.activemq.service.Service
    public synchronized void start() throws JMSException {
        if (this.longTermPersistence instanceof JDBCPersistenceAdapter) {
            ((JDBCPersistenceAdapter) this.longTermPersistence).setCleanupPeriod(0);
        }
        this.longTermPersistence.start();
        createTransactionStore();
        if (this.journal == null) {
            try {
                log.info("Opening journal.");
                this.journal = createJournal();
                log.info(new StringBuffer().append("Opened journal: ").append(this.journal).toString());
                this.journal.setJournalEventListener(this);
                try {
                    recover();
                } catch (Exception e) {
                    throw JMSExceptionHelper.newJMSException(new StringBuffer().append("Failed to recover transactions from journal: ").append(e).toString(), e);
                }
            } catch (Exception e2) {
                throw JMSExceptionHelper.newJMSException(new StringBuffer().append("Failed to open transaction journal: ").append(e2).toString(), e2);
            }
        }
        this.clockTicket = getClockDaemon().executePeriodically(300000L, new Runnable(this) { // from class: org.activemq.store.journal.JournalPersistenceAdapter.2
            private final JournalPersistenceAdapter this$0;

            {
                this.this$0 = this;
            }

            @Override // java.lang.Runnable
            public void run() {
                this.this$0.checkpoint(false);
            }
        }, false);
    }

    @Override // org.activemq.service.Service
    public synchronized void stop() throws JMSException {
        if (this.clockTicket != null) {
            ClockDaemon.cancel(this.clockTicket);
            this.clockTicket = null;
            this.clockDaemon.shutDown();
        }
        checkpoint(true);
        this.checkpointExecutor.shutdownAfterProcessingCurrentlyQueuedTasks();
        Throwable th = null;
        if (this.journal != null) {
            try {
                this.journal.close();
                this.journal = null;
            } catch (Exception e) {
                th = JMSExceptionHelper.newJMSException(new StringBuffer().append("Failed to close journals: ").append(e).toString(), e);
            }
        }
        this.longTermPersistence.stop();
        if (th != null) {
            throw th;
        }
    }

    public PersistenceAdapter getLongTermPersistence() {
        return this.longTermPersistence;
    }

    public void setLongTermPersistence(PersistenceAdapter persistenceAdapter) {
        this.longTermPersistence = persistenceAdapter;
    }

    public File getDirectory() {
        return this.directory;
    }

    public void setDirectory(File file) {
        this.directory = file;
    }

    public WireFormat getWireFormat() {
        return this.wireFormat;
    }

    public String getJournalType() {
        return this.journalType;
    }

    public void setJournalType(String str) {
        this.journalType = str;
    }

    protected Journal createJournal() throws IOException {
        if (DEFAULT_JOURNAL_TYPE.equals(this.journalType)) {
            return new JournalImpl(this.directory, this.logFileCount, this.logFileSize);
        }
        if (!HOWL_JOURNAL_TYPE.equals(this.journalType)) {
            throw new IllegalStateException(new StringBuffer().append("Unsupported valued for journalType attribute: ").append(this.journalType).toString());
        }
        try {
            Configuration configuration = new Configuration();
            configuration.setLogFileDir(this.directory.getCanonicalPath());
            return new HowlJournal(configuration);
        } catch (IOException e) {
            throw e;
        } catch (Exception e2) {
            throw ((IOException) new IOException(new StringBuffer().append("Could not open HOWL journal: ").append(e2.getMessage()).toString()).initCause(e2));
        }
    }

    public void overflowNotification(RecordLocation recordLocation) {
        checkpoint(false);
    }

    public void checkpoint(boolean z) {
        try {
            if (this.journal == null) {
                throw new IllegalStateException("Journal is closed.");
            }
            Latch latch = null;
            if (z) {
                latch = new Latch();
                this.checkpointRequests.put(latch);
            } else {
                this.checkpointRequests.put(Boolean.TRUE);
            }
            this.checkpointExecutor.execute(new Runnable(this) { // from class: org.activemq.store.journal.JournalPersistenceAdapter.3
                private final JournalPersistenceAdapter this$0;

                {
                    this.this$0 = this;
                }

                /*  JADX ERROR: NullPointerException in pass: RegionMakerVisitor
                    java.lang.NullPointerException: Cannot invoke "java.util.List.isEmpty()" because "s" is null
                    	at jadx.core.utils.BlockUtils.getNextBlock(BlockUtils.java:411)
                    	at jadx.core.dex.visitors.regions.RegionMaker.traverse(RegionMaker.java:172)
                    	at jadx.core.dex.visitors.regions.RegionMaker.makeRegion(RegionMaker.java:91)
                    	at jadx.core.dex.visitors.regions.RegionMaker.processIf(RegionMaker.java:740)
                    	at jadx.core.dex.visitors.regions.RegionMaker.traverse(RegionMaker.java:152)
                    	at jadx.core.dex.visitors.regions.RegionMaker.makeRegion(RegionMaker.java:91)
                    	at jadx.core.dex.visitors.regions.RegionMakerVisitor.visit(RegionMakerVisitor.java:52)
                    */
                /* JADX WARN: Removed duplicated region for block: B:86:0x01ce A[DONT_GENERATE, LOOP:4: B:84:0x01c4->B:86:0x01ce, LOOP_END] */
                @Override // java.lang.Runnable
                /*
                    Code decompiled incorrectly, please refer to instructions dump.
                    To view partially-correct add '--show-bad-code' argument
                */
                public void run() {
                    /*
                        Method dump skipped, instructions count: 485
                        To view this dump add '--comments-level debug' option
                    */
                    throw new UnsupportedOperationException("Method not decompiled: org.activemq.store.journal.JournalPersistenceAdapter.AnonymousClass3.run():void");
                }
            });
            if (z) {
                latch.acquire();
            }
        } catch (InterruptedException e) {
            log.warn(new StringBuffer().append("Request to start checkpoint failed: ").append(e).toString(), e);
        }
    }

    public RecordLocation writePacket(String str, Packet packet, boolean z) throws JMSException {
        try {
            PacketByteArrayOutputStream packetByteArrayOutputStream = new PacketByteArrayOutputStream();
            DataOutputStream dataOutputStream = new DataOutputStream(packetByteArrayOutputStream);
            dataOutputStream.writeByte(0);
            dataOutputStream.writeUTF(str);
            dataOutputStream.close();
            return this.journal.write(this.wireFormat.writePacket(packet, packetByteArrayOutputStream), z);
        } catch (IOException e) {
            throw createWriteException(packet, e);
        }
    }

    public RecordLocation writeCommand(String str, boolean z) throws JMSException {
        try {
            PacketByteArrayOutputStream packetByteArrayOutputStream = new PacketByteArrayOutputStream();
            DataOutputStream dataOutputStream = new DataOutputStream(packetByteArrayOutputStream);
            dataOutputStream.writeByte(1);
            dataOutputStream.writeUTF(str);
            dataOutputStream.close();
            return this.journal.write(packetByteArrayOutputStream.getPacket(), z);
        } catch (IOException e) {
            throw createWriteException(str, e);
        }
    }

    public Packet readPacket(RecordLocation recordLocation) throws JMSException {
        try {
            org.activeio.Packet read = this.journal.read(recordLocation);
            DataInputStream dataInputStream = new DataInputStream(new PacketInputStream(read));
            if (dataInputStream.readByte() != 0) {
                throw new IOException("Record is not a packet type.");
            }
            dataInputStream.readUTF();
            Packet readPacket = this.wireFormat.readPacket(read);
            dataInputStream.close();
            return readPacket;
        } catch (IOException e) {
            throw createReadException(recordLocation, e);
        } catch (InvalidRecordLocationException e2) {
            throw createReadException(recordLocation, e2);
        }
    }

    /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
    /* JADX WARN: Failed to find 'out' block for switch in B:7:0x0048. Please report as an issue. */
    private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, JMSException {
        RecordLocation recordLocation = null;
        int i = 0;
        log.info("Journal Recovery Started.");
        while (true) {
            RecordLocation nextRecordLocation = this.journal.getNextRecordLocation(recordLocation);
            recordLocation = nextRecordLocation;
            if (nextRecordLocation == null) {
                this.journal.setMark(writeCommand("RECOVERED", true), true);
                log.info(new StringBuffer().append("Journal Recovered: ").append(i).append(" message(s) in transactions recovered.").toString());
                return;
            }
            org.activeio.Packet read = this.journal.read(recordLocation);
            DataInputStream dataInputStream = new DataInputStream(new PacketInputStream(read));
            try {
                byte readByte = dataInputStream.readByte();
                switch (readByte) {
                    case 0:
                        String readUTF = dataInputStream.readUTF();
                        Packet readPacket = this.wireFormat.readPacket(read);
                        if (readPacket instanceof ActiveMQMessage) {
                            ActiveMQMessage activeMQMessage = (ActiveMQMessage) readPacket;
                            JournalMessageStore journalMessageStore = (JournalMessageStore) createMessageStore(readUTF, activeMQMessage.getJMSActiveMQDestination().isQueue());
                            if (activeMQMessage.getTransactionId() != null) {
                                this.transactionStore.addMessage(journalMessageStore, activeMQMessage, recordLocation);
                            } else {
                                journalMessageStore.replayAddMessage(activeMQMessage);
                                i++;
                            }
                        } else if (readPacket instanceof MessageAck) {
                            MessageAck messageAck = (MessageAck) readPacket;
                            JournalMessageStore journalMessageStore2 = (JournalMessageStore) createMessageStore(readUTF, messageAck.getDestination().isQueue());
                            if (messageAck.getTransactionId() != null) {
                                this.transactionStore.removeMessage(journalMessageStore2, messageAck, recordLocation);
                            } else {
                                journalMessageStore2.replayRemoveMessage(messageAck);
                                i++;
                            }
                        } else {
                            log.error(new StringBuffer().append("Unknown type of packet in transaction log which will be discarded: ").append(readPacket).toString());
                        }
                    case 1:
                    case 2:
                        TxCommand txCommand = new TxCommand();
                        txCommand.setType(dataInputStream.readByte());
                        txCommand.setWasPrepared(dataInputStream.readBoolean());
                        switch (txCommand.getType()) {
                            case 4:
                            case 5:
                                txCommand.setTransactionId(dataInputStream.readUTF());
                                break;
                            default:
                                txCommand.setTransactionId(ActiveMQXid.read(dataInputStream));
                                break;
                        }
                        switch (txCommand.getType()) {
                            case 1:
                                this.transactionStore.replayPrepare(txCommand.getTransactionId());
                                break;
                            case 2:
                            case 4:
                                JournalTransactionStore.Tx replayCommit = this.transactionStore.replayCommit(txCommand.getTransactionId(), txCommand.getWasPrepared());
                                if (replayCommit != null) {
                                    replayCommit.getOperations();
                                    Iterator it = replayCommit.getOperations().iterator();
                                    while (it.hasNext()) {
                                        JournalTransactionStore.TxOperation txOperation = (JournalTransactionStore.TxOperation) it.next();
                                        if (txOperation.operationType == 0) {
                                            txOperation.store.replayAddMessage((ActiveMQMessage) txOperation.data);
                                        }
                                        if (txOperation.operationType == 1) {
                                            txOperation.store.replayRemoveMessage((MessageAck) txOperation.data);
                                        }
                                        if (txOperation.operationType == 3) {
                                            JournalAck journalAck = (JournalAck) txOperation.data;
                                            ((JournalTopicMessageStore) txOperation.store).replayAcknowledge(journalAck.getSubscription(), new MessageIdentity(journalAck.getMessageId()));
                                        }
                                    }
                                    i++;
                                    break;
                                }
                                break;
                            case 3:
                            case 5:
                                this.transactionStore.replayRollback(txCommand.getTransactionId());
                                break;
                        }
                        break;
                    case 3:
                        String readUTF2 = dataInputStream.readUTF();
                        String readUTF3 = dataInputStream.readUTF();
                        String readUTF4 = dataInputStream.readUTF();
                        JournalTopicMessageStore journalTopicMessageStore = (JournalTopicMessageStore) createMessageStore(readUTF2, false);
                        if (0 != 0) {
                            this.transactionStore.acknowledge(journalTopicMessageStore, new JournalAck(readUTF2, readUTF3, readUTF4, null), recordLocation);
                        } else {
                            journalTopicMessageStore.replayAcknowledge(readUTF3, new MessageIdentity(readUTF4));
                            i++;
                        }
                    default:
                        log.error(new StringBuffer().append("Unknown type of record in transaction log which will be discarded: ").append((int) readByte).toString());
                }
            } finally {
                dataInputStream.close();
            }
        }
    }

    private JMSException createReadException(RecordLocation recordLocation, Exception exc) {
        return JMSExceptionHelper.newJMSException(new StringBuffer().append("Failed to read to journal for: ").append(recordLocation).append(". Reason: ").append(exc).toString(), exc);
    }

    protected JMSException createWriteException(Packet packet, Exception exc) {
        return JMSExceptionHelper.newJMSException(new StringBuffer().append("Failed to write to journal for: ").append(packet).append(". Reason: ").append(exc).toString(), exc);
    }

    private XAException createWriteException(TxCommand txCommand, Exception exc) {
        return new XAException(new StringBuffer().append("Failed to write to journal for: ").append(txCommand).append(". Reason: ").append(exc).toString()).initCause(exc);
    }

    protected JMSException createWriteException(String str, Exception exc) {
        return JMSExceptionHelper.newJMSException(new StringBuffer().append("Failed to write to journal for command: ").append(str).append(". Reason: ").append(exc).toString(), exc);
    }

    protected JMSException createRecoveryFailedException(Exception exc) {
        return JMSExceptionHelper.newJMSException(new StringBuffer().append("Failed to recover from journal. Reason: ").append(exc).toString(), exc);
    }

    public ClockDaemon getClockDaemon() {
        if (this.clockDaemon == null) {
            this.clockDaemon = new ClockDaemon();
            this.clockDaemon.setThreadFactory(new ThreadFactory(this) { // from class: org.activemq.store.journal.JournalPersistenceAdapter.4
                private final JournalPersistenceAdapter this$0;

                {
                    this.this$0 = this;
                }

                public Thread newThread(Runnable runnable) {
                    Thread thread = new Thread(runnable, "Checkpoint Timmer");
                    thread.setDaemon(true);
                    return thread;
                }
            });
        }
        return this.clockDaemon;
    }

    public void setClockDaemon(ClockDaemon clockDaemon) {
        this.clockDaemon = clockDaemon;
    }

    public RecordLocation writeTxCommand(TxCommand txCommand, boolean z) throws XAException {
        try {
            PacketByteArrayOutputStream packetByteArrayOutputStream = new PacketByteArrayOutputStream();
            DataOutputStream dataOutputStream = new DataOutputStream(packetByteArrayOutputStream);
            dataOutputStream.writeByte(2);
            dataOutputStream.writeByte(txCommand.getType());
            dataOutputStream.writeBoolean(txCommand.getWasPrepared());
            switch (txCommand.getType()) {
                case 4:
                case 5:
                    dataOutputStream.writeUTF((String) txCommand.getTransactionId());
                    break;
                default:
                    ((ActiveMQXid) txCommand.getTransactionId()).write(dataOutputStream);
                    break;
            }
            dataOutputStream.close();
            return this.journal.write(packetByteArrayOutputStream.getPacket(), z);
        } catch (IOException e) {
            throw createWriteException(txCommand, e);
        }
    }

    public RecordLocation writePacket(String str, String str2, MessageIdentity messageIdentity, boolean z) throws JMSException {
        try {
            PacketByteArrayOutputStream packetByteArrayOutputStream = new PacketByteArrayOutputStream();
            DataOutputStream dataOutputStream = new DataOutputStream(packetByteArrayOutputStream);
            dataOutputStream.writeByte(3);
            dataOutputStream.writeUTF(str);
            dataOutputStream.writeUTF(str2);
            dataOutputStream.writeUTF(messageIdentity.getMessageID());
            dataOutputStream.close();
            return this.journal.write(packetByteArrayOutputStream.getPacket(), z);
        } catch (IOException e) {
            throw createWriteException(new StringBuffer().append("Ack for message: ").append(messageIdentity).toString(), e);
        }
    }

    public JournalTransactionStore getTransactionStore() {
        return this.transactionStore;
    }

    public int getLogFileCount() {
        return this.logFileCount;
    }

    public void setLogFileCount(int i) {
        this.logFileCount = i;
    }

    public int getLogFileSize() {
        return this.logFileSize;
    }

    public void setLogFileSize(int i) {
        this.logFileSize = i;
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$activemq$store$journal$JournalPersistenceAdapter == null) {
            cls = class$("org.activemq.store.journal.JournalPersistenceAdapter");
            class$org$activemq$store$journal$JournalPersistenceAdapter = cls;
        } else {
            cls = class$org$activemq$store$journal$JournalPersistenceAdapter;
        }
        log = LogFactory.getLog(cls);
    }
}
