package com.solutionappliance.msgqueue.file.impl;

import com.solutionappliance.core.data.int8.ByteArray;
import com.solutionappliance.core.data.int8.codec.NumberCodecs;
import com.solutionappliance.core.lang.KeyValuePair;
import com.solutionappliance.core.lang.MultiPartName;
import com.solutionappliance.core.lang.sync.SyncException;
import com.solutionappliance.core.lang.sync.monitor.LatestEventSource;
import com.solutionappliance.core.system.ActorContext;
import com.solutionappliance.core.text.writer.TextWriter;
import com.solutionappliance.core.util.CommonUtil;
import com.solutionappliance.msgqueue.MsgId;
import com.solutionappliance.msgqueue.MsgQueueException;
import com.solutionappliance.msgqueue.MsgQueueReader;
import com.solutionappliance.msgqueue.common.MsgQueueFilter;
import com.solutionappliance.msgqueue.common.MsgQueueStatus;
import com.solutionappliance.msgqueue.entity.MsgEntity;
import java.time.Duration;
import java.util.LinkedList;
import org.checkerframework.dataflow.qual.SideEffectFree;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/solutionappliance/msgqueue/file/impl/FileMsgQueueReader.class */
public class FileMsgQueueReader implements AutoCloseable, MsgQueueReader {
    private final FileMsgQueue parent;
    private final LatestEventSource<MsgId>.LatestEventConsumer eventConsumer;
    private final MultiPartName id;
    private KeyValuePair<MsgQueueDataFile, MsgQueueDataReader> io = null;
    private final LinkedList<KeyValuePair<MsgId, MsgEntity>> messages = new LinkedList<>();
    private final ActorContext ctx;
    private final int fetchSize;
    private final MsgQueueFilter filter;
    private final Duration maxWaitTime;
    private MsgId nextMsgId;
    private KeyValuePair<MsgId, MsgEntity> nextEntry;

    /* JADX INFO: Access modifiers changed from: package-private */
    public FileMsgQueueReader(ActorContext actorContext, FileMsgQueue fileMsgQueue, String str, MsgQueueFilter msgQueueFilter, MsgId msgId, Duration duration, int i) {
        this.ctx = actorContext;
        this.parent = fileMsgQueue;
        this.id = MultiPartName.fromFullyQualifiedString(str);
        this.eventConsumer = this.parent.newEventConsumer(this.id.shortName());
        this.filter = msgQueueFilter;
        this.fetchSize = i;
        this.nextMsgId = msgId;
        this.maxWaitTime = duration;
    }

    @Override // com.solutionappliance.msgqueue.MsgQueueReader
    public MsgEntity next() throws MsgQueueException {
        return (MsgEntity) nextEntry().getValue();
    }

    @Override // com.solutionappliance.msgqueue.MsgQueueReader
    public KeyValuePair<MsgId, MsgEntity> nextEntry() throws MsgQueueException {
        KeyValuePair<MsgId, MsgEntity> keyValuePair = (KeyValuePair) CommonUtil.asNonNull("nextEntry", this.nextEntry);
        this.nextEntry = null;
        return keyValuePair;
    }

    @Override // com.solutionappliance.msgqueue.MsgQueueReader
    public boolean hasNext() throws MsgQueueException {
        if (this.messages.isEmpty()) {
            fill(this.maxWaitTime, true);
        }
        if (this.messages.isEmpty()) {
            this.nextEntry = null;
            return false;
        }
        this.nextEntry = this.messages.removeFirst();
        return true;
    }

    private void fill(Duration duration, boolean z) throws MsgQueueException {
        while (true) {
            if (((!z || this.messages.size() >= this.fetchSize) && !this.messages.isEmpty()) || !rawWaitAndSeek(this.parent.materialize(this.nextMsgId), duration)) {
                return;
            }
            KeyValuePair<MsgId, ByteArray> rawReadMessage = rawReadMessage();
            this.nextMsgId = ((MsgId) rawReadMessage.getKey()).estNextMsgId();
            MsgEntity readMsgEntity = MsgEntity.readMsgEntity(this.ctx, (ByteArray) rawReadMessage.getValue(), this.filter);
            if (readMsgEntity != null) {
                this.messages.add(KeyValuePair.of((MsgId) rawReadMessage.getKey(), readMsgEntity));
            }
        }
    }

    @Override // com.solutionappliance.msgqueue.MsgQueueReader
    public MsgId estNextMsgId() {
        return !this.messages.isEmpty() ? (MsgId) ((KeyValuePair) CommonUtil.asNonNull(this.messages.peek())).getKey() : this.nextMsgId;
    }

    @Override // com.solutionappliance.msgqueue.MsgQueueReader
    public boolean isOpen() {
        return this.eventConsumer.isOpen();
    }

    @Override // java.lang.AutoCloseable, com.solutionappliance.msgqueue.MsgQueueReader
    public void close() throws MsgQueueException {
        KeyValuePair<MsgQueueDataFile, MsgQueueDataReader> keyValuePair = this.io;
        this.io = null;
        if (keyValuePair != null) {
            ((MsgQueueDataFile) keyValuePair.getKey()).closeReader(this.id.shortName());
        }
        this.eventConsumer.close();
    }

    @SideEffectFree
    public String toString() {
        return TextWriter.forClass(getClass()).printKeyValueLine("name", this.id.shortName()).printKeyValueLine("nextMsgId", estNextMsgId().toString()).done().toString();
    }

    private boolean rawWaitAndSeek(MsgId msgId, Duration duration) throws MsgQueueException {
        try {
            if (duration != null) {
                if (this.eventConsumer.waitForEvent(msgId, duration)) {
                    doSeek(msgId);
                    return true;
                }
            } else if (this.eventConsumer.hasEvent(msgId)) {
                doSeek(msgId);
                return true;
            }
            this.parent.assertOpen();
            return false;
        } catch (SyncException e) {
            throw newException("seekFailure", "Failed to seek to $[msgId] due to $[exception (debugString)]", e).m4add("msgId", (Object) msgId);
        }
    }

    private void doSeek(MsgId msgId) throws MsgQueueException {
        try {
            if (!this.eventConsumer.hasEvent(msgId)) {
                throw newException("seekFailure", "Cannot seek to $[msgId (debugString)] as it isn't available yet", null);
            }
            KeyValuePair<MsgQueueDataFile, MsgQueueDataReader> keyValuePair = this.io;
            if (keyValuePair != null) {
                MsgQueueDataFile msgQueueDataFile = (MsgQueueDataFile) keyValuePair.getKey();
                MsgId firstMsgId = msgQueueDataFile.firstMsgId();
                MsgId nextMsgId = msgQueueDataFile.nextMsgId();
                if (msgId.groupNo() == msgQueueDataFile.fileNo() && msgId.offset() >= nextMsgId.offset() && !msgQueueDataFile.forWrite()) {
                    msgQueueDataFile.flush();
                    firstMsgId = msgQueueDataFile.firstMsgId();
                    nextMsgId = msgQueueDataFile.nextMsgId();
                }
                if (msgId.compareTo(firstMsgId) < 0 || msgId.compareTo(nextMsgId) > 0) {
                    msgQueueDataFile.closeReader(this.id.shortName());
                    this.io = null;
                    this.io = this.parent.openDataReader(this.id.shortName(), Long.valueOf(msgId.groupNo()));
                }
            } else {
                this.io = this.parent.openDataReader(this.id.shortName(), Long.valueOf(msgId.groupNo()));
            }
            KeyValuePair keyValuePair2 = (KeyValuePair) CommonUtil.asNonNull(this.io);
            MsgQueueDataFile msgQueueDataFile2 = (MsgQueueDataFile) keyValuePair2.getKey();
            MsgId firstMsgId2 = msgQueueDataFile2.firstMsgId();
            MsgId nextMsgId2 = msgQueueDataFile2.nextMsgId();
            MsgQueueStatus fileStatus = msgQueueDataFile2.fileStatus();
            int compareTo = msgId.compareTo(nextMsgId2);
            if (compareTo > 0 || (compareTo == 0 && fileStatus == MsgQueueStatus.complete)) {
                long groupNo = msgId.groupNo();
                if (firstMsgId2.groupNo() == groupNo) {
                    groupNo++;
                }
                msgQueueDataFile2.closeReader(this.id.shortName());
                this.io = null;
                KeyValuePair<MsgQueueDataFile, MsgQueueDataReader> openDataReader = this.parent.openDataReader(this.id.shortName(), Long.valueOf(groupNo));
                this.io = openDataReader;
                ((MsgQueueDataReader) openDataReader.getValue()).position(512);
            } else {
                ((MsgQueueDataReader) keyValuePair2.getValue()).position(((int) msgId.offset()) + 512);
            }
        } catch (SyncException e) {
            throw newException("seekFailure", "Failed to seek to $[msgId (debugString)] due to $[exception (debugString)]", e).m4add("msgId", (Object) msgId);
        }
    }

    private KeyValuePair<MsgId, ByteArray> rawReadMessage() throws MsgQueueException {
        KeyValuePair<MsgQueueDataFile, MsgQueueDataReader> keyValuePair = this.io;
        if (keyValuePair == null) {
            throw newException("readFailure", "Not read to read a message, please seek first", null);
        }
        MsgQueueDataFile msgQueueDataFile = (MsgQueueDataFile) keyValuePair.getKey();
        MsgQueueDataReader msgQueueDataReader = (MsgQueueDataReader) keyValuePair.getValue();
        long nextMsgOffset = msgQueueDataFile.nextMsgOffset();
        int position = msgQueueDataReader.position() - 512;
        if (position >= 0 && position >= nextMsgOffset && !msgQueueDataFile.forWrite()) {
            msgQueueDataFile.flush();
            nextMsgOffset = msgQueueDataFile.nextMsgOffset();
        }
        if (position < 0 || position >= nextMsgOffset) {
            throw newException("readFailure", "Message offset is invalid: at $[offset]/$[limit]", null).m4add("offset", (Object) Integer.valueOf(position)).m4add("limit", (Object) Long.valueOf(nextMsgOffset));
        }
        try {
            if (!msgQueueDataReader.readArray(4).equals(MsgQueueDataFile.startOfMessage)) {
                throw newException("readFailure", "Failed to read message at $[pos]/$[limit]", null).m4add("pos", (Object) Integer.valueOf(position)).m4add("limit", (Object) Long.valueOf(nextMsgOffset));
            }
            long longValue = ((Long) msgQueueDataReader.read(NumberCodecs.fixedLenInt64)).longValue();
            ByteArray readArray = msgQueueDataReader.readArray(((Integer) msgQueueDataReader.read(NumberCodecs.fixedLenInt32)).intValue());
            if (msgQueueDataReader.readArray(4).equals(MsgQueueDataFile.endOfMessage)) {
                return KeyValuePair.of(msgQueueDataFile.toMsgId(longValue, position, (msgQueueDataReader.position() - position) - 512), readArray);
            }
            throw newException("readMessage", "Failed to read message at $[pos]/$[limit]", null).m4add("pos", (Object) Integer.valueOf(position)).m4add("limit", (Object) Long.valueOf(nextMsgOffset));
        } catch (MsgQueueException e) {
            throw e;
        } catch (Exception e2) {
            throw newException("readMessage", "Failed to read message at $[offset]/$[limit] with $[exception (debugString)]", e2).m4add("offset", (Object) Integer.valueOf(position)).m4add("limit", (Object) Long.valueOf(nextMsgOffset));
        }
    }

    protected MsgQueueException newException(String str, String str2, Throwable th) {
        return new MsgQueueException(this.id.append(new String[]{str}), str2, th);
    }
}
