package com.solutionappliance.msgqueue.file.impl;

import com.solutionappliance.core.data.ByteArray;
import com.solutionappliance.core.data.bytereader.ByteBufferByteReader;
import com.solutionappliance.core.lang.MultiPartName;
import com.solutionappliance.core.lang.sync.SyncException;
import com.solutionappliance.core.lang.sync.monitor.LatestEventSource;
import com.solutionappliance.core.system.ActorContext;
import com.solutionappliance.core.type.Type;
import com.solutionappliance.core.type.Types;
import com.solutionappliance.core.util.CommonUtil;
import com.solutionappliance.core.util.Pair;
import com.solutionappliance.core.util.StringHelper;
import com.solutionappliance.msgqueue.MsgId;
import com.solutionappliance.msgqueue.MsgQueueException;
import com.solutionappliance.msgqueue.MsgQueueReader;
import com.solutionappliance.msgqueue.common.MsgQueueFilter;
import com.solutionappliance.msgqueue.common.MsgQueueStatus;
import com.solutionappliance.msgqueue.serializer.EncodedMsg;
import com.solutionappliance.msgqueue.serializer.MsgQueueSerializer;
import java.time.Duration;
import java.util.LinkedList;

/* loaded from: input_file:com/solutionappliance/msgqueue/file/impl/FileMsgQueueReader.class */
class FileMsgQueueReader<T> implements AutoCloseable, MsgQueueReader<T> {
    private final FileMsgQueue parent;
    private final LatestEventSource<MsgId>.LatestEventConsumer eventConsumer;
    private final MultiPartName id;
    private Pair<MsgQueueDataFile, ByteBufferByteReader> io = null;
    private final LinkedList<Pair<MsgId, Object>> messages = new LinkedList<>();
    private final ActorContext ctx;
    private final int fetchSize;
    private final MsgQueueFilter filter;
    private final Duration maxWaitTime;
    private final Type<T> msgType;
    private MsgId nextMsgId;
    private Pair<MsgId, Object> nextEntry;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public FileMsgQueueReader(ActorContext actorContext, FileMsgQueue fileMsgQueue, String str, Type<T> type, MsgQueueFilter msgQueueFilter, MsgId msgId, Duration duration, int i) {
        this.ctx = actorContext;
        this.parent = fileMsgQueue;
        this.id = MultiPartName.valueOf(str);
        this.eventConsumer = this.parent.newEventConsumer(this.id.shortName());
        this.filter = msgQueueFilter;
        this.fetchSize = i;
        this.nextMsgId = msgId;
        this.msgType = type;
        this.maxWaitTime = duration;
    }

    @Override // com.solutionappliance.msgqueue.MsgQueueReader
    public T next() throws MsgQueueException {
        return (T) nextEntry().right();
    }

    @Override // com.solutionappliance.msgqueue.MsgQueueReader
    public Pair<MsgId, T> nextEntry() throws MsgQueueException {
        Pair pair = (Pair) CommonUtil.asNonNull("nextEntry", this.nextEntry);
        this.nextEntry = null;
        return (this.msgType == Types.javaObject || this.msgType == EncodedMsg.type) ? Pair.of((MsgId) pair.left(), this.msgType.cast(pair.right())) : Pair.of((MsgId) pair.left(), this.msgType.convert(this.ctx, pair.right()));
    }

    @Override // com.solutionappliance.msgqueue.MsgQueueReader
    public boolean hasNext() throws MsgQueueException {
        if (this.messages.isEmpty()) {
            fill(this.maxWaitTime, true);
        }
        if (this.messages.isEmpty()) {
            this.nextEntry = null;
            return false;
        }
        this.nextEntry = this.messages.removeFirst();
        return true;
    }

    private void fill(Duration duration, boolean z) throws MsgQueueException {
        while (true) {
            if (((!z || this.messages.size() >= this.fetchSize) && !this.messages.isEmpty()) || !rawWaitAndSeek(this.parent.materialize(this.nextMsgId), duration)) {
                return;
            }
            Pair<MsgId, ByteArray> rawReadMessage = rawReadMessage();
            this.nextMsgId = ((MsgId) rawReadMessage.left()).estNextMsgId();
            Object encodedMsg = this.msgType == EncodedMsg.type ? new EncodedMsg((ByteArray) rawReadMessage.right()) : MsgQueueSerializer.readObject(this.ctx, (ByteArray) rawReadMessage.right(), this.filter);
            if (encodedMsg != null) {
                this.messages.add(Pair.of((MsgId) rawReadMessage.left(), encodedMsg));
            }
        }
    }

    @Override // com.solutionappliance.msgqueue.MsgQueueReader
    public MsgId estNextMsgId() {
        return !this.messages.isEmpty() ? (MsgId) this.messages.peek().left() : this.nextMsgId;
    }

    @Override // com.solutionappliance.msgqueue.MsgQueueReader
    public boolean isOpen() {
        return this.eventConsumer.isOpen();
    }

    @Override // java.lang.AutoCloseable, com.solutionappliance.msgqueue.MsgQueueReader
    public void close() throws MsgQueueException {
        Pair<MsgQueueDataFile, ByteBufferByteReader> pair = this.io;
        this.io = null;
        if (pair != null) {
            ((MsgQueueDataFile) pair.left()).closeReader(this.id.shortName());
        }
        this.eventConsumer.close();
    }

    protected void finalize() throws Exception {
        close();
    }

    public String toString() {
        return new StringHelper(getClass()).append("name", this.id.shortName()).append("nextMsgId", estNextMsgId().toDebugString()).toString();
    }

    private boolean rawWaitAndSeek(MsgId msgId, Duration duration) throws MsgQueueException {
        try {
            if (duration != null) {
                if (this.eventConsumer.waitForEvent(msgId, duration)) {
                    doSeek(msgId);
                    return true;
                }
            } else if (this.eventConsumer.hasEvent(msgId)) {
                doSeek(msgId);
                return true;
            }
            this.parent.assertOpen();
            return false;
        } catch (SyncException e) {
            throw ((MsgQueueException.MsgQueueExceptionBuilder) exceptionBuilder("seekFailure", "Failed to seek to $[msgId] due to $[exception (debugString)]", e).add("msgId", msgId)).toException();
        }
    }

    private void doSeek(MsgId msgId) throws MsgQueueException {
        try {
            if (!this.eventConsumer.hasEvent(msgId)) {
                throw exceptionBuilder("seekFailure", "Cannot seek to $[msgId (debugString)] as it isn't available yet", null).toException();
            }
            Pair<MsgQueueDataFile, ByteBufferByteReader> pair = this.io;
            if (pair != null) {
                MsgQueueDataFile msgQueueDataFile = (MsgQueueDataFile) pair.left();
                MsgId firstMsgId = msgQueueDataFile.firstMsgId();
                MsgId nextMsgId = msgQueueDataFile.nextMsgId();
                if (msgId.groupNo() == msgQueueDataFile.fileNo() && msgId.offset() >= nextMsgId.offset() && !msgQueueDataFile.forWrite()) {
                    msgQueueDataFile.flush();
                    firstMsgId = msgQueueDataFile.firstMsgId();
                    nextMsgId = msgQueueDataFile.nextMsgId();
                }
                if (msgId.compareTo(firstMsgId) < 0 || msgId.compareTo(nextMsgId) > 0) {
                    msgQueueDataFile.closeReader(this.id.shortName());
                    this.io = null;
                    this.io = this.parent.openDataReader(this.id.shortName(), Long.valueOf(msgId.groupNo()));
                }
            } else {
                this.io = this.parent.openDataReader(this.id.shortName(), Long.valueOf(msgId.groupNo()));
            }
            Pair pair2 = (Pair) CommonUtil.asNonNull(this.io);
            MsgQueueDataFile msgQueueDataFile2 = (MsgQueueDataFile) pair2.left();
            MsgId firstMsgId2 = msgQueueDataFile2.firstMsgId();
            MsgId nextMsgId2 = msgQueueDataFile2.nextMsgId();
            MsgQueueStatus fileStatus = msgQueueDataFile2.fileStatus();
            int compareTo = msgId.compareTo(nextMsgId2);
            if (compareTo > 0 || (compareTo == 0 && fileStatus == MsgQueueStatus.complete)) {
                long groupNo = msgId.groupNo();
                if (firstMsgId2.groupNo() == groupNo) {
                    groupNo++;
                }
                msgQueueDataFile2.closeReader(this.id.shortName());
                this.io = null;
                Pair<MsgQueueDataFile, ByteBufferByteReader> openDataReader = this.parent.openDataReader(this.id.shortName(), Long.valueOf(groupNo));
                this.io = openDataReader;
                ((ByteBufferByteReader) openDataReader.right()).position(512);
            } else {
                ((ByteBufferByteReader) pair2.right()).position(((int) msgId.offset()) + 512);
            }
        } catch (SyncException e) {
            throw ((MsgQueueException.MsgQueueExceptionBuilder) exceptionBuilder("seekFailure", "Failed to seek to $[msgId (debugString)] due to $[exception (debugString)]", e).add("msgId", msgId)).toException();
        }
    }

    private Pair<MsgId, ByteArray> rawReadMessage() throws MsgQueueException {
        Pair<MsgQueueDataFile, ByteBufferByteReader> pair = this.io;
        if (pair == null) {
            throw exceptionBuilder("readFailure", "Not read to read a message, please seek first", null).toException();
        }
        if (!$assertionsDisabled && pair == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && pair.left() == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && pair.right() == null) {
            throw new AssertionError();
        }
        MsgQueueDataFile msgQueueDataFile = (MsgQueueDataFile) pair.left();
        ByteBufferByteReader byteBufferByteReader = (ByteBufferByteReader) pair.right();
        long nextMsgOffset = msgQueueDataFile.nextMsgOffset();
        int position = byteBufferByteReader.position() - 512;
        if (position >= 0 && position >= nextMsgOffset && !msgQueueDataFile.forWrite()) {
            msgQueueDataFile.flush();
            nextMsgOffset = msgQueueDataFile.nextMsgOffset();
        }
        if (position < 0 || position >= nextMsgOffset) {
            throw ((MsgQueueException.MsgQueueExceptionBuilder) ((MsgQueueException.MsgQueueExceptionBuilder) exceptionBuilder("readFailure", "Message offset is invalid: at $[offset]/$[limit]", null).add("offset", Integer.valueOf(position))).add("limit", Long.valueOf(nextMsgOffset))).toException();
        }
        try {
            if (!byteBufferByteReader.readRawByteArray(4).equals(MsgQueueDataFile.startOfMessage)) {
                throw ((MsgQueueException.MsgQueueExceptionBuilder) ((MsgQueueException.MsgQueueExceptionBuilder) exceptionBuilder("readFailure", "Failed to read message at $[pos]/$[limit]", null).add("pos", Integer.valueOf(position))).add("limit", Long.valueOf(nextMsgOffset))).toException();
            }
            long readFixedSizeLong = byteBufferByteReader.readFixedSizeLong();
            ByteArray readRawByteArrayFully = byteBufferByteReader.readRawByteArrayFully(byteBufferByteReader.readFixedSizeInt());
            if (byteBufferByteReader.readRawByteArray(4).equals(MsgQueueDataFile.endOfMessage)) {
                return Pair.of(msgQueueDataFile.toMsgId(readFixedSizeLong, position, (byteBufferByteReader.position() - position) - 512), readRawByteArrayFully);
            }
            throw ((MsgQueueException.MsgQueueExceptionBuilder) ((MsgQueueException.MsgQueueExceptionBuilder) exceptionBuilder("readMessage", "Failed to read message at $[pos]/$[limit]", null).add("pos", Integer.valueOf(position))).add("limit", Long.valueOf(nextMsgOffset))).toException();
        } catch (MsgQueueException e) {
            throw e;
        } catch (Exception e2) {
            throw ((MsgQueueException.MsgQueueExceptionBuilder) ((MsgQueueException.MsgQueueExceptionBuilder) exceptionBuilder("readMessage", "Failed to read message at $[offset]/$[limit] with $[exception (debugString)]", e2).add("offset", Integer.valueOf(position))).add("limit", Long.valueOf(nextMsgOffset))).toException();
        }
    }

    protected MsgQueueException.MsgQueueExceptionBuilder exceptionBuilder(String str, String str2, Throwable th) {
        return new MsgQueueException.MsgQueueExceptionBuilder(this.id.append(new String[]{str}), str2, th);
    }

    static {
        $assertionsDisabled = !FileMsgQueueReader.class.desiredAssertionStatus();
    }
}
