package com.solutionappliance.msgqueue.heap;

import com.solutionappliance.core.data.ByteArray;
import com.solutionappliance.core.data.ByteWriter;
import com.solutionappliance.core.data.MutableByteArray;
import com.solutionappliance.core.lang.sync.SyncException;
import com.solutionappliance.core.lang.sync.monitor.LatestEventSource;
import com.solutionappliance.core.lang.sync.monitor.SaEventConsumer;
import com.solutionappliance.core.system.ActorContext;
import com.solutionappliance.core.type.Type;
import com.solutionappliance.core.type.Types;
import com.solutionappliance.core.util.CommonUtil;
import com.solutionappliance.core.util.DebugString;
import com.solutionappliance.core.util.Pair;
import com.solutionappliance.core.util.StringHelper;
import com.solutionappliance.msgqueue.MsgId;
import com.solutionappliance.msgqueue.MsgQueue;
import com.solutionappliance.msgqueue.MsgQueueException;
import com.solutionappliance.msgqueue.MsgQueueReader;
import com.solutionappliance.msgqueue.MsgQueueWriter;
import com.solutionappliance.msgqueue.common.MsgQueueControl;
import com.solutionappliance.msgqueue.common.MsgQueueControlRecord;
import com.solutionappliance.msgqueue.common.MsgQueueFilter;
import com.solutionappliance.msgqueue.common.MsgQueueStatus;
import com.solutionappliance.msgqueue.serializer.MsgQueueSerializer;
import java.time.Duration;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;

/* loaded from: input_file:com/solutionappliance/msgqueue/heap/HeapMsgQueue.class */
public class HeapMsgQueue implements MsgQueue {
    private final HeapMsgQueueSpi spi;
    private final ActorContext ctx;
    private final int maxData;
    private final LatestEventSource<MsgId> eventProducer;
    private MsgQueueStatus status = MsgQueueStatus.open;
    private final TreeMap<MsgId, ByteArray> data = new TreeMap<>();
    private boolean closed = false;
    private final AtomicInteger writerCount = new AtomicInteger(0);
    private MsgId nextMsgId = MsgId.head;

    public HeapMsgQueue(ActorContext actorContext, String str, int i) throws MsgQueueException {
        this.spi = new HeapMsgQueueSpi(actorContext, str);
        this.ctx = actorContext;
        this.maxData = i;
        this.eventProducer = new LatestEventSource<>(this.spi.id(), this.spi.toMsgId(0L));
        MsgQueueWriter openWriter = openWriter(actorContext, "ControlRecord", Types.javaObject);
        Throwable th = null;
        try {
            try {
                openWriter.enqueue(new MsgQueueControlRecord(actorContext, MsgQueueControl.created));
                if (openWriter != null) {
                    if (0 == 0) {
                        openWriter.close();
                        return;
                    }
                    try {
                        openWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (openWriter != null) {
                if (th != null) {
                    try {
                        openWriter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    openWriter.close();
                }
            }
            throw th4;
        }
    }

    public String toString() {
        return new StringHelper(getClass()).append(this.spi.id().shortName()).append("size", Integer.valueOf(this.data.size())).append("nextMsg", this.nextMsgId.toDebugString()).append("writers", Integer.valueOf(this.writerCount.get())).append("closed", this.closed).toString();
    }

    @Override // com.solutionappliance.msgqueue.MsgQueue
    public MsgQueueStatus status() throws MsgQueueException {
        return this.status;
    }

    @Override // com.solutionappliance.msgqueue.MsgQueue
    public MsgId firstMsgId() throws MsgQueueException {
        synchronized (this.data) {
            if (this.data.isEmpty()) {
                return MsgId.head;
            }
            return this.data.firstKey();
        }
    }

    @Override // com.solutionappliance.msgqueue.MsgQueue
    public MsgId lastMsgId() throws MsgQueueException {
        synchronized (this.data) {
            if (this.data.isEmpty()) {
                return MsgId.tail;
            }
            return this.data.lastKey();
        }
    }

    @Override // com.solutionappliance.msgqueue.MsgQueue
    public MsgId nextMsgId() throws MsgQueueException {
        return this.nextMsgId;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleWriterClose() {
        synchronized (this) {
            if (this.writerCount.decrementAndGet() == 0 && this.closed) {
                this.status = MsgQueueStatus.closed;
            }
        }
    }

    protected void assertOpen() throws MsgQueueException {
        if (this.closed) {
            throw this.spi.exceptionBuilder("closed", "This queue has been closed", null).toException();
        }
    }

    @Override // com.solutionappliance.msgqueue.MsgQueue, java.lang.AutoCloseable
    public void close() throws MsgQueueException {
        synchronized (this) {
            this.closed = true;
            if (this.writerCount.get() == 0) {
                this.status = MsgQueueStatus.closed;
            }
        }
    }

    public DebugString toDebugString() {
        Map.Entry<MsgId, ByteArray> firstEntry;
        Map.Entry<MsgId, ByteArray> lastEntry;
        StringHelper append = new StringHelper(getClass()).append("name", this.spi.id().shortName()).append("status", this.status);
        synchronized (this.data) {
            firstEntry = this.data.firstEntry();
            lastEntry = this.data.lastEntry();
        }
        if (firstEntry != null) {
            append.append("first", firstEntry.getKey().toDebugString());
        }
        if (lastEntry != null && lastEntry != firstEntry) {
            append.append("last", lastEntry.getKey().toDebugString());
        }
        return append.toDebugString();
    }

    public boolean isClosed() {
        return this.status == MsgQueueStatus.closed;
    }

    @Override // com.solutionappliance.msgqueue.MsgQueue
    public <T> MsgQueueWriter<T> openWriter(final ActorContext actorContext, final String str, Type<T> type) throws MsgQueueException {
        assertOpen();
        this.writerCount.incrementAndGet();
        return new MsgQueueWriter<T>() { // from class: com.solutionappliance.msgqueue.heap.HeapMsgQueue.1
            private boolean writerClosed = false;

            @Override // com.solutionappliance.msgqueue.MsgQueueWriter
            public MsgId enqueue(T t) throws MsgQueueException {
                MsgId msgId;
                if (this.writerClosed) {
                    throw new MsgQueueException.MsgQueueExceptionBuilder(HeapMsgQueue.this.spi.id().append(new String[]{str}), "Writer is closed", null).toException();
                }
                HeapMsgQueue.this.assertOpen();
                ByteWriter mutableByteArray = new MutableByteArray(1024);
                ((MsgQueueSerializer) MsgQueueSerializer.type.convert(actorContext, t)).writeMessage(mutableByteArray);
                synchronized (HeapMsgQueue.this.data) {
                    msgId = HeapMsgQueue.this.nextMsgId;
                    HeapMsgQueue.this.data.put(msgId, mutableByteArray);
                    HeapMsgQueue.this.nextMsgId = HeapMsgQueue.this.spi.toMsgId(msgId.uniqueId() + 1);
                    while (HeapMsgQueue.this.data.size() > HeapMsgQueue.this.maxData) {
                        HeapMsgQueue.this.data.remove(HeapMsgQueue.this.data.firstKey());
                    }
                    HeapMsgQueue.this.data.notifyAll();
                }
                HeapMsgQueue.this.eventProducer.publishEvent(msgId);
                return msgId;
            }

            @Override // com.solutionappliance.msgqueue.MsgQueueWriter
            public MsgQueueWriter.WritePromise asyncEnqueue(T t) throws MsgQueueException {
                final MsgId enqueue = enqueue(t);
                return new MsgQueueWriter.WritePromise() { // from class: com.solutionappliance.msgqueue.heap.HeapMsgQueue.1.1
                    @Override // com.solutionappliance.msgqueue.MsgQueueWriter.WritePromise
                    public MsgId await() throws MsgQueueException {
                        return enqueue;
                    }
                };
            }

            @Override // com.solutionappliance.msgqueue.MsgQueueWriter, java.lang.AutoCloseable
            public void close() {
                if (this.writerClosed) {
                    return;
                }
                this.writerClosed = true;
                HeapMsgQueue.this.handleWriterClose();
            }
        };
    }

    @Override // com.solutionappliance.msgqueue.MsgQueue
    public <T> MsgQueueReader<T> openReader(final ActorContext actorContext, final String str, final Type<T> type, final MsgQueueFilter msgQueueFilter, final MsgId msgId, final Duration duration, int i) throws MsgQueueException {
        assertOpen();
        return new MsgQueueReader<T>() { // from class: com.solutionappliance.msgqueue.heap.HeapMsgQueue.2
            final SaEventConsumer<MsgId> eventMonitor;
            MsgId nextMsgId;
            Pair<MsgId, Object> nextEntry;
            boolean readerClosed = false;

            {
                this.eventMonitor = HeapMsgQueue.this.eventProducer.newEventConsumer(str);
                this.nextMsgId = msgId;
            }

            @Override // com.solutionappliance.msgqueue.MsgQueueReader
            public boolean isOpen() {
                return !this.readerClosed;
            }

            @Override // com.solutionappliance.msgqueue.MsgQueueReader
            public boolean hasNext() throws MsgQueueException {
                Map.Entry ceilingEntry;
                MsgId msgId2;
                Object readObject;
                if (this.readerClosed) {
                    throw ((MsgQueueException.MsgQueueExceptionBuilder) new MsgQueueException.MsgQueueExceptionBuilder(HeapMsgQueue.this.spi.id().append(new String[]{str}).append(new String[]{"closed"}), "Reader $[name] is closed", null).add("name", str)).toException();
                }
                HeapMsgQueue.this.assertOpen();
                do {
                    try {
                        if (duration == null) {
                            if (!this.eventMonitor.hasEvent(this.nextMsgId)) {
                                return false;
                            }
                        } else if (!this.eventMonitor.waitForEvent(this.nextMsgId)) {
                            return false;
                        }
                        synchronized (HeapMsgQueue.this.data) {
                            ceilingEntry = HeapMsgQueue.this.data.ceilingEntry(this.nextMsgId);
                            if (ceilingEntry == null) {
                                throw ((MsgQueueException.MsgQueueExceptionBuilder) new MsgQueueException.MsgQueueExceptionBuilder(HeapMsgQueue.this.spi.id().append(new String[]{str}).append(new String[]{"nextFailure"}), "Failed fetching $[msgId]", null).add("msgId", this.nextMsgId.toString())).toException();
                            }
                        }
                        msgId2 = (MsgId) ceilingEntry.getKey();
                        this.nextMsgId = HeapMsgQueue.this.spi.toMsgId(msgId2.uniqueId() + 1);
                        readObject = MsgQueueSerializer.readObject(actorContext, (ByteArray) ceilingEntry.getValue(), msgQueueFilter);
                    } catch (SyncException e) {
                        throw new MsgQueueException.MsgQueueExceptionBuilder(HeapMsgQueue.this.spi.id().append(new String[]{str}).append(new String[]{"waitFailure"}), "Failed waiting for event due to $[cause (debugString)]", e).toException();
                    }
                } while (readObject == null);
                this.nextEntry = Pair.of(msgId2, readObject);
                return true;
            }

            @Override // com.solutionappliance.msgqueue.MsgQueueReader
            public Pair<MsgId, T> nextEntry() throws MsgQueueException {
                Pair pair = (Pair) CommonUtil.asNonNull("nextEntry", this.nextEntry);
                return type == Types.javaObject ? Pair.of((MsgId) pair.left(), type.cast(pair.right())) : Pair.of((MsgId) pair.left(), type.convert(actorContext, pair.right()));
            }

            @Override // com.solutionappliance.msgqueue.MsgQueueReader
            public MsgId estNextMsgId() {
                return this.nextMsgId;
            }

            @Override // com.solutionappliance.msgqueue.MsgQueueReader, java.lang.AutoCloseable
            public void close() throws MsgQueueException {
                this.readerClosed = true;
            }
        };
    }
}
