package org.mule.util.queue;

import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import org.activeio.Packet;
import org.activeio.journal.Journal;
import org.activeio.journal.JournalEventListener;
import org.activeio.journal.RecordLocation;
import org.activeio.journal.active.JournalImpl;
import org.activeio.journal.active.LogFileManager;
import org.activeio.packet.ByteArrayPacket;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.doomdark.uuid.UUID;
import org.doomdark.uuid.UUIDGenerator;
import org.mule.MuleManager;
import org.mule.config.MuleConfiguration;
import org.mule.util.queue.QueuePersistenceStrategy;

/* loaded from: input_file:org/mule/util/queue/JournalPersistenceStrategy.class */
public class JournalPersistenceStrategy implements QueuePersistenceStrategy, Runnable, JournalEventListener {
    private static final Log logger;
    private Journal journal;
    private Map marks;
    private Thread marker;
    private SortedSet pendingMarks;
    private SortedSet unusedMarks;
    private File store;
    private static final byte STORE_BYTE = 0;
    private static final byte DELETE_BYTE = 1;
    private static final int UUID_LENGTH;
    static Class class$org$mule$util$queue$JournalPersistenceStrategy;
    private UUIDGenerator gen = UUIDGenerator.getInstance();
    private Object markLock = new Object();
    private RecordLocation overflowLocation = null;

    /* loaded from: input_file:org/mule/util/queue/JournalPersistenceStrategy$HolderImpl.class */
    protected static class HolderImpl implements QueuePersistenceStrategy.Holder {
        private String queue;
        private Object id;

        public HolderImpl(String str, Object obj) {
            this.queue = str;
            this.id = obj;
        }

        @Override // org.mule.util.queue.QueuePersistenceStrategy.Holder
        public Object getId() {
            return this.id;
        }

        @Override // org.mule.util.queue.QueuePersistenceStrategy.Holder
        public String getQueue() {
            return this.queue;
        }
    }

    protected UUID getId(Object obj) {
        return this.gen.generateTimeBasedUUID();
    }

    @Override // org.mule.util.queue.QueuePersistenceStrategy
    public Object store(String str, Object obj) throws IOException {
        UUID id = getId(obj);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(256);
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
        objectOutputStream.writeByte(0);
        objectOutputStream.writeUTF(str);
        objectOutputStream.write(id.asByteArray());
        objectOutputStream.writeObject(obj);
        objectOutputStream.close();
        RecordLocation write = this.journal.write(new ByteArrayPacket(byteArrayOutputStream.toByteArray()), false);
        synchronized (this.markLock) {
            this.marks.put(id, write);
            this.pendingMarks.add(write);
        }
        return id;
    }

    @Override // org.mule.util.queue.QueuePersistenceStrategy
    public Object load(String str, Object obj) throws IOException {
        try {
            ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(this.journal.read((RecordLocation) this.marks.get(obj)).sliceAsBytes()));
            objectInputStream.readByte();
            objectInputStream.readUTF();
            objectInputStream.skipBytes(UUID_LENGTH);
            return objectInputStream.readObject();
        } catch (IOException e) {
            throw e;
        } catch (Exception e2) {
            throw ((IOException) new IOException().initCause(e2));
        }
    }

    @Override // org.mule.util.queue.QueuePersistenceStrategy
    public void remove(String str, Object obj) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(256);
        byteArrayOutputStream.write(1);
        byteArrayOutputStream.write(((UUID) obj).asByteArray());
        RecordLocation write = this.journal.write(new ByteArrayPacket(byteArrayOutputStream.toByteArray()), false);
        synchronized (this.markLock) {
            this.unusedMarks.add(write);
            RecordLocation recordLocation = (RecordLocation) this.marks.remove(obj);
            if (recordLocation != null) {
                this.pendingMarks.remove(recordLocation);
                this.unusedMarks.add(recordLocation);
            }
            this.markLock.notify();
        }
    }

    @Override // org.mule.util.queue.QueuePersistenceStrategy
    public List restore() throws IOException {
        HashMap hashMap = new HashMap();
        RecordLocation recordLocation = null;
        logger.info("Journal Recovery Started.");
        try {
            synchronized (this.markLock) {
                while (true) {
                    RecordLocation nextRecordLocation = this.journal.getNextRecordLocation(recordLocation);
                    recordLocation = nextRecordLocation;
                    if (nextRecordLocation != null) {
                        byte[] sliceAsBytes = this.journal.read(recordLocation).sliceAsBytes();
                        if (sliceAsBytes[0] == 1) {
                            UUID uuid = new UUID(sliceAsBytes, 1);
                            hashMap.remove(uuid);
                            this.unusedMarks.add(recordLocation);
                            RecordLocation recordLocation2 = (RecordLocation) this.marks.remove(uuid);
                            if (recordLocation2 != null) {
                                this.pendingMarks.remove(recordLocation2);
                                this.unusedMarks.add(recordLocation2);
                            }
                            this.markLock.notify();
                        } else {
                            ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(sliceAsBytes));
                            objectInputStream.readByte();
                            String readUTF = objectInputStream.readUTF();
                            byte[] bArr = new byte[UUID_LENGTH];
                            objectInputStream.read(bArr);
                            UUID uuid2 = new UUID(bArr);
                            hashMap.put(uuid2, new HolderImpl(readUTF, uuid2));
                            this.marks.put(uuid2, recordLocation);
                            this.pendingMarks.add(recordLocation);
                        }
                    }
                }
            }
            logger.info(new StringBuffer().append("Journal Recovered: ").append(hashMap.size()).append(" message(s) in transactions recovered.").toString());
            return new ArrayList(hashMap.values());
        } catch (IOException e) {
            throw e;
        } catch (Exception e2) {
            throw ((IOException) new IOException().initCause(e2));
        }
    }

    @Override // org.mule.util.queue.QueuePersistenceStrategy
    public void open() throws IOException {
        File file = this.store;
        if (file == null) {
            file = new File(new StringBuffer().append(MuleManager.getConfiguration().getWorkingDirectory()).append(File.separator).append(MuleConfiguration.DEFAULT_QUEUE_STORE).toString());
        }
        file.mkdirs();
        this.journal = new JournalImpl(file, 8, LogFileManager.DEFAULT_LOGFILE_SIZE);
        this.journal.setJournalEventListener(this);
        this.marks = new ConcurrentHashMap();
        this.pendingMarks = new TreeSet();
        this.unusedMarks = new TreeSet();
        this.marker = new Thread(this, "JournalPersistenceStrategy");
        this.marker.setDaemon(true);
        this.marker.start();
    }

    @Override // org.mule.util.queue.QueuePersistenceStrategy
    public void close() throws IOException {
        this.marker.interrupt();
        this.journal.close();
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!Thread.interrupted()) {
            try {
                Thread.sleep(100L);
                synchronized (this.markLock) {
                    try {
                        if (this.overflowLocation != null) {
                            RecordLocation recordLocation = this.overflowLocation;
                            this.overflowLocation = null;
                            SortedSet headSet = this.pendingMarks.headSet(recordLocation);
                            if (headSet != null && headSet.size() > 0) {
                                logger.trace(new StringBuffer().append("Relocating ").append(headSet.size()).append(" records").toString());
                                int i = 0;
                                while (headSet.size() > 0) {
                                    RecordLocation recordLocation2 = (RecordLocation) headSet.first();
                                    Packet read = this.journal.read(recordLocation2);
                                    RecordLocation write = this.journal.write(read, false);
                                    ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(read.sliceAsBytes()));
                                    objectInputStream.readByte();
                                    objectInputStream.readUTF();
                                    byte[] bArr = new byte[UUID_LENGTH];
                                    objectInputStream.read(bArr);
                                    UUID uuid = new UUID(bArr);
                                    i++;
                                    if (i % 10 == 0) {
                                        RecordLocation nextRecordLocation = this.journal.getNextRecordLocation(recordLocation2);
                                        this.journal.setMark(nextRecordLocation != null ? nextRecordLocation : recordLocation2, false);
                                    }
                                    this.unusedMarks.add(recordLocation2);
                                    this.pendingMarks.add(write);
                                    this.marks.put(uuid, write);
                                    headSet.remove(recordLocation2);
                                }
                            }
                        }
                        SortedSet headSet2 = this.pendingMarks.isEmpty() ? this.unusedMarks : this.unusedMarks.headSet((RecordLocation) this.pendingMarks.first());
                        if (!headSet2.isEmpty()) {
                            RecordLocation recordLocation3 = (RecordLocation) headSet2.last();
                            RecordLocation nextRecordLocation2 = this.journal.getNextRecordLocation((RecordLocation) headSet2.last());
                            if (nextRecordLocation2 == null) {
                                nextRecordLocation2 = recordLocation3;
                            }
                            if (logger.isDebugEnabled()) {
                                logger.debug(new StringBuffer().append("Marking to ").append(nextRecordLocation2).append(" / ").append(this.pendingMarks.isEmpty() ? "null" : this.pendingMarks.last()).toString());
                            }
                            if (this.journal.getMark() == null || nextRecordLocation2.compareTo(this.journal.getMark()) > 0) {
                                this.journal.setMark(nextRecordLocation2, false);
                            }
                            headSet2.clear();
                        }
                    } catch (Exception e) {
                        logger.warn("Error when relocating records", e);
                    }
                }
            } catch (InterruptedException e2) {
                logger.debug("Marker thread interrupted");
                return;
            }
        }
    }

    public void overflowNotification(RecordLocation recordLocation) {
        this.overflowLocation = recordLocation;
        logger.debug(new StringBuffer().append("Overflow to ").append(recordLocation).toString());
    }

    public File getStore() {
        return this.store;
    }

    public void setStore(File file) {
        this.store = file;
    }

    @Override // org.mule.util.queue.QueuePersistenceStrategy
    public boolean isTransient() {
        return false;
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$mule$util$queue$JournalPersistenceStrategy == null) {
            cls = class$("org.mule.util.queue.JournalPersistenceStrategy");
            class$org$mule$util$queue$JournalPersistenceStrategy = cls;
        } else {
            cls = class$org$mule$util$queue$JournalPersistenceStrategy;
        }
        logger = LogFactory.getLog(cls);
        UUID_LENGTH = new UUID().asByteArray().length;
    }
}
