package uk.co.real_logic.artio.engine.framer;

import io.aeron.logbuffer.ControlledFragmentHandler;
import io.aeron.logbuffer.Header;
import java.util.concurrent.TimeUnit;
import org.agrona.DirectBuffer;
import org.agrona.ErrorHandler;
import org.agrona.MutableDirectBuffer;
import uk.co.real_logic.artio.DebugLogger;
import uk.co.real_logic.artio.LogTag;
import uk.co.real_logic.artio.Pressure;
import uk.co.real_logic.artio.builder.AbstractSequenceResetEncoder;
import uk.co.real_logic.artio.builder.Encoder;
import uk.co.real_logic.artio.builder.SessionHeaderEncoder;
import uk.co.real_logic.artio.decoder.SessionHeaderDecoder;
import uk.co.real_logic.artio.engine.logger.FixMessageTracker;
import uk.co.real_logic.artio.engine.logger.ReplayOperation;
import uk.co.real_logic.artio.engine.logger.ReplayQuery;
import uk.co.real_logic.artio.engine.logger.SequenceNumberIndexReader;
import uk.co.real_logic.artio.fields.EpochFractionFormat;
import uk.co.real_logic.artio.fields.UtcTimestampEncoder;
import uk.co.real_logic.artio.messages.FixMessageDecoder;
import uk.co.real_logic.artio.messages.FixMessageEncoder;
import uk.co.real_logic.artio.messages.MessageHeaderDecoder;
import uk.co.real_logic.artio.messages.MessageStatus;
import uk.co.real_logic.artio.messages.ReplayMessagesStatus;
import uk.co.real_logic.artio.messages.SessionReplyStatus;
import uk.co.real_logic.artio.protocol.GatewayPublication;
import uk.co.real_logic.artio.util.AsciiBuffer;
import uk.co.real_logic.artio.util.CharFormatter;
import uk.co.real_logic.artio.util.MutableAsciiBuffer;

/* loaded from: input_file:uk/co/real_logic/artio/engine/framer/CatchupReplayer.class */
public class CatchupReplayer implements ControlledFragmentHandler, Continuation {
    private static final int ENCODE_BUFFER_SIZE = 8192;
    public static final int FRAME_LENGTH = 65 + FixMessageEncoder.bodyHeaderLength();
    private static final int OUT_OF_RANGE = -1;
    private final SequenceNumberIndexReader receivedSequenceNumberIndex;
    private final ReplayQuery inboundMessages;
    private final GatewayPublication inboundPublication;
    private final ErrorHandler errorHandler;
    private final long correlationId;
    private final long connectionId;
    private final int libraryId;
    private final int replayToSequenceNumber;
    private final int replayToSequenceIndex;
    private final FixGatewaySession session;
    private final long catchupEndTimeInMs;
    private final long requiredPosition;
    private final SessionHeaderDecoder headerDecoder;
    private final ReplayFor replayFor;
    private final Formatters formatters;
    private final EpochFractionFormat epochFractionFormat;
    private int replayFromSequenceNumber;
    private int replayFromSequenceIndex;
    private String missingMessagesReason;
    private AbstractSequenceResetEncoder sequenceResetEncoder;
    private UtcTimestampEncoder timestampEncoder;
    private MutableAsciiBuffer encodeBuffer;
    private final MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder();
    private final FixMessageDecoder messageDecoder = new FixMessageDecoder();
    private final FixMessageEncoder messageEncoder = new FixMessageEncoder();
    private final AsciiBuffer asciiBuffer = new MutableAsciiBuffer();
    private State state = State.AWAITING_INDEX;
    private int heartbeatRangeSequenceNumberStart = -1;
    private ReplayOperation replayOperation = null;

    /* loaded from: input_file:uk/co/real_logic/artio/engine/framer/CatchupReplayer$Formatters.class */
    public static class Formatters {
        private final CharFormatter attemptFormatter = new CharFormatter("Attempt replay for sessionId=%s");
        private final CharFormatter okFormatter = new CharFormatter("OK for sessionId=%s");
        private final CharFormatter missingFormatter = new CharFormatter("Missing Messages for sessionId=%s");
        private final CharFormatter awaitIndexFormatter = new CharFormatter("Awaiting index position: indexed=%s vs required=%s");
        private final CharFormatter replayQueryingFormatter = new CharFormatter("Querying for sessionId=%s, currently at (%s, %s)");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:uk/co/real_logic/artio/engine/framer/CatchupReplayer$ReplayFor.class */
    public enum ReplayFor {
        REPLAY_MESSAGES { // from class: uk.co.real_logic.artio.engine.framer.CatchupReplayer.ReplayFor.1
            @Override // uk.co.real_logic.artio.engine.framer.CatchupReplayer.ReplayFor
            long sendOk(GatewayPublication gatewayPublication, int i, long j, FixGatewaySession fixGatewaySession) {
                return gatewayPublication.saveReplayMessagesReply(i, j, ReplayMessagesStatus.OK);
            }

            @Override // uk.co.real_logic.artio.engine.framer.CatchupReplayer.ReplayFor
            long sendMissing(GatewayPublication gatewayPublication, int i, long j, FixGatewaySession fixGatewaySession) {
                return gatewayPublication.saveReplayMessagesReply(i, j, ReplayMessagesStatus.MISSING_MESSAGES);
            }
        },
        REQUEST_SESSION { // from class: uk.co.real_logic.artio.engine.framer.CatchupReplayer.ReplayFor.2
            @Override // uk.co.real_logic.artio.engine.framer.CatchupReplayer.ReplayFor
            long sendOk(GatewayPublication gatewayPublication, int i, long j, FixGatewaySession fixGatewaySession) {
                long saveRequestSessionReply = gatewayPublication.saveRequestSessionReply(i, SessionReplyStatus.OK, j);
                if (saveRequestSessionReply > 0) {
                    fixGatewaySession.play();
                }
                return saveRequestSessionReply;
            }

            @Override // uk.co.real_logic.artio.engine.framer.CatchupReplayer.ReplayFor
            long sendMissing(GatewayPublication gatewayPublication, int i, long j, FixGatewaySession fixGatewaySession) {
                long saveRequestSessionReply = gatewayPublication.saveRequestSessionReply(i, SessionReplyStatus.MISSING_MESSAGES, j);
                if (saveRequestSessionReply > 0) {
                    fixGatewaySession.play();
                }
                return saveRequestSessionReply;
            }
        };

        abstract long sendOk(GatewayPublication gatewayPublication, int i, long j, FixGatewaySession fixGatewaySession);

        abstract long sendMissing(GatewayPublication gatewayPublication, int i, long j, FixGatewaySession fixGatewaySession);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:uk/co/real_logic/artio/engine/framer/CatchupReplayer$State.class */
    public enum State {
        AWAITING_INDEX,
        REPLAY_QUERY,
        REPLAYING,
        SEND_MISSING,
        SEND_OK
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CatchupReplayer(SequenceNumberIndexReader sequenceNumberIndexReader, ReplayQuery replayQuery, GatewayPublication gatewayPublication, ErrorHandler errorHandler, long j, long j2, int i, int i2, int i3, int i4, int i5, FixGatewaySession fixGatewaySession, long j3, ReplayFor replayFor, Formatters formatters, EpochFractionFormat epochFractionFormat) {
        this.receivedSequenceNumberIndex = sequenceNumberIndexReader;
        this.inboundMessages = replayQuery;
        this.inboundPublication = gatewayPublication;
        this.errorHandler = errorHandler;
        this.correlationId = j;
        this.connectionId = j2;
        this.libraryId = i;
        this.replayToSequenceNumber = i2;
        this.replayToSequenceIndex = i3;
        this.replayFromSequenceNumber = i4;
        this.replayFromSequenceIndex = i5;
        this.session = fixGatewaySession;
        this.catchupEndTimeInMs = j3;
        this.requiredPosition = gatewayPublication.position();
        this.headerDecoder = fixGatewaySession.fixDictionary().makeHeaderDecoder();
        this.replayFor = replayFor;
        this.formatters = formatters;
        this.epochFractionFormat = epochFractionFormat;
    }

    private void updateMessageHeader(MutableDirectBuffer mutableDirectBuffer, int i) {
        this.messageEncoder.wrap(mutableDirectBuffer, i + 8).connection(this.connectionId).libraryId(this.libraryId).status(MessageStatus.CATCHUP_REPLAY);
    }

    @Override // io.aeron.logbuffer.ControlledFragmentHandler
    public ControlledFragmentHandler.Action onFragment(DirectBuffer directBuffer, int i, int i2, Header header) {
        this.messageHeaderDecoder.wrap(directBuffer, i);
        this.messageDecoder.wrap(directBuffer, i + 8, this.messageHeaderDecoder.blockLength(), this.messageHeaderDecoder.version());
        long messageType = MessageTypeExtractor.getMessageType(this.messageDecoder);
        this.messageDecoder.skipMetaData();
        int bodyLength = this.messageDecoder.bodyLength();
        this.asciiBuffer.wrap(directBuffer, this.messageDecoder.limit() + FixMessageDecoder.bodyHeaderLength(), bodyLength);
        this.headerDecoder.decode(this.asciiBuffer, 0, bodyLength);
        if (messageType != 48) {
            return (this.heartbeatRangeSequenceNumberStart == -1 || sendGapFill()) ? processNormalMessage(directBuffer, i, i2) : ControlledFragmentHandler.Action.ABORT;
        }
        if (this.heartbeatRangeSequenceNumberStart == -1) {
            this.heartbeatRangeSequenceNumberStart = this.headerDecoder.msgSeqNum();
        }
        return ControlledFragmentHandler.Action.CONTINUE;
    }

    private boolean sendGapFill() {
        if (this.sequenceResetEncoder == null) {
            this.sequenceResetEncoder = this.session.fixDictionary().makeSequenceResetEncoder();
            this.timestampEncoder = new UtcTimestampEncoder(this.epochFractionFormat);
            this.encodeBuffer = new MutableAsciiBuffer(new byte[8192]);
            this.sequenceResetEncoder.gapFillFlag(true);
            SessionHeaderEncoder possDupFlag = this.sequenceResetEncoder.header().possDupFlag(true);
            possDupFlag.senderCompID(this.headerDecoder.senderCompID());
            if (this.headerDecoder.hasSenderSubID()) {
                possDupFlag.senderSubID(this.headerDecoder.senderSubID());
            }
            if (this.headerDecoder.hasSenderLocationID()) {
                possDupFlag.senderLocationID(this.headerDecoder.senderLocationID());
            }
            possDupFlag.targetCompID(this.headerDecoder.targetCompID());
            if (this.headerDecoder.hasTargetSubID()) {
                possDupFlag.targetSubID(this.headerDecoder.targetSubID());
            }
            if (this.headerDecoder.hasTargetLocationID()) {
                possDupFlag.targetLocationID(this.headerDecoder.targetLocationID());
            }
        }
        int msgSeqNum = this.headerDecoder.msgSeqNum();
        this.sequenceResetEncoder.header().msgSeqNum(this.heartbeatRangeSequenceNumberStart);
        this.sequenceResetEncoder.newSeqNo(msgSeqNum);
        this.sequenceResetEncoder.header().sendingTime(this.timestampEncoder.buffer(), this.timestampEncoder.encodeFrom(System.currentTimeMillis(), TimeUnit.MILLISECONDS));
        long encode = this.sequenceResetEncoder.encode(this.encodeBuffer, 0);
        boolean z = this.inboundPublication.saveMessage(this.encodeBuffer, Encoder.offset(encode), Encoder.length(encode), this.libraryId, 52L, this.messageDecoder.session(), this.replayFromSequenceIndex, (long) this.libraryId, MessageStatus.CATCHUP_REPLAY, msgSeqNum) > 0;
        if (z) {
            this.heartbeatRangeSequenceNumberStart = -1;
        }
        return z;
    }

    private ControlledFragmentHandler.Action processNormalMessage(DirectBuffer directBuffer, int i, int i2) {
        updateMessageHeader((MutableDirectBuffer) directBuffer, i);
        ControlledFragmentHandler.Action apply = Pressure.apply(this.inboundPublication.offer(directBuffer, i, i2));
        if (apply == ControlledFragmentHandler.Action.CONTINUE) {
            this.replayFromSequenceNumber = this.headerDecoder.msgSeqNum() + 1;
            this.replayFromSequenceIndex = this.messageDecoder.sequenceIndex();
        }
        return apply;
    }

    @Override // uk.co.real_logic.artio.engine.framer.Continuation
    public long attempt() {
        if (DebugLogger.isEnabled(LogTag.CATCHUP)) {
            DebugLogger.log(LogTag.CATCHUP, this.formatters.attemptFormatter.clear().with(this.session.sessionId()));
        }
        switch (this.state) {
            case AWAITING_INDEX:
                long indexedPosition = this.receivedSequenceNumberIndex.indexedPosition(this.inboundPublication.sessionId());
                if (indexedPosition >= this.requiredPosition) {
                    this.state = State.REPLAY_QUERY;
                    return -2L;
                }
                DebugLogger.log(LogTag.CATCHUP, this.formatters.awaitIndexFormatter, indexedPosition, this.requiredPosition);
                return -2L;
            case REPLAY_QUERY:
                if (notLoggingInboundMessages()) {
                    return switchToMissingMessages("Not logging inbound messages");
                }
                DebugLogger.log(LogTag.CATCHUP, this.formatters.replayQueryingFormatter, this.session.sessionId(), this.replayToSequenceNumber, this.replayToSequenceIndex);
                this.replayOperation = this.inboundMessages.query(this.session.sessionId(), this.replayFromSequenceNumber, this.replayFromSequenceIndex, this.replayToSequenceNumber, this.replayToSequenceIndex, LogTag.CATCHUP, new FixMessageTracker(LogTag.CATCHUP, this, this.session.sessionId()));
                this.state = State.REPLAYING;
                return -2L;
            case REPLAYING:
                if (System.currentTimeMillis() > this.catchupEndTimeInMs) {
                    return switchToMissingMessages("Catchup operation timed out");
                }
                if (!this.replayOperation.pollReplay()) {
                    return -2L;
                }
                if (hasMissingMessages()) {
                    return switchToMissingMessages("Is missing messages from replay index query");
                }
                this.state = State.SEND_OK;
                return sendOk(this.inboundPublication, this.correlationId, this.session);
            case SEND_MISSING:
                return sendMissingMessages();
            case SEND_OK:
                return sendOk(this.inboundPublication, this.correlationId, this.session);
            default:
                return 1L;
        }
    }

    private long switchToMissingMessages(String str) {
        this.state = State.SEND_MISSING;
        this.missingMessagesReason = str;
        return sendMissingMessages();
    }

    private boolean hasMissingMessages() {
        return this.replayFromSequenceIndex < this.replayToSequenceIndex || this.replayFromSequenceNumber < this.replayToSequenceNumber;
    }

    private boolean notLoggingInboundMessages() {
        return this.inboundMessages == null;
    }

    private long sendOk(GatewayPublication gatewayPublication, long j, FixGatewaySession fixGatewaySession) {
        if (DebugLogger.isEnabled(LogTag.CATCHUP)) {
            DebugLogger.log(LogTag.CATCHUP, this.formatters.okFormatter.clear().with(fixGatewaySession.sessionId()));
        }
        return this.replayFor.sendOk(gatewayPublication, this.libraryId, j, fixGatewaySession);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static long sendOk(GatewayPublication gatewayPublication, long j, FixGatewaySession fixGatewaySession, int i, Formatters formatters) {
        if (DebugLogger.isEnabled(LogTag.CATCHUP)) {
            DebugLogger.log(LogTag.CATCHUP, formatters.okFormatter.clear().with(fixGatewaySession.sessionId()));
        }
        return ReplayFor.REQUEST_SESSION.sendOk(gatewayPublication, i, j, fixGatewaySession);
    }

    private long sendMissingMessages() {
        if (DebugLogger.isEnabled(LogTag.CATCHUP)) {
            DebugLogger.log(LogTag.CATCHUP, this.formatters.missingFormatter.clear().with(this.session.sessionId()));
        }
        long sendMissing = this.replayFor.sendMissing(this.inboundPublication, this.libraryId, this.correlationId, this.session);
        if (sendMissing > 0) {
            this.errorHandler.onError(new IllegalStateException(String.format("Failed to read correct number of messages for sessionId=%d, finished at [%d, %d] instead of [%d, %d] - %s", Long.valueOf(this.session.sessionId()), Integer.valueOf(this.replayFromSequenceIndex), Integer.valueOf(this.replayFromSequenceNumber), Integer.valueOf(this.replayToSequenceIndex), Integer.valueOf(this.replayToSequenceNumber), this.missingMessagesReason)));
            this.missingMessagesReason = null;
        }
        return sendMissing;
    }

    @Override // uk.co.real_logic.artio.engine.framer.Continuation
    public void close() {
        if (this.replayOperation != null) {
            this.replayOperation.closeNow();
        }
    }
}
