package com.epam.deltix.qsrv.hf.topic.consumer;

import com.epam.deltix.qsrv.hf.pub.TypeLoader;
import com.epam.deltix.qsrv.hf.pub.codec.CodecFactory;
import com.epam.deltix.qsrv.hf.pub.md.RecordClassDescriptor;
import com.epam.deltix.streaming.MessageSource;
import com.epam.deltix.timebase.messages.InstrumentMessage;
import com.epam.deltix.util.concurrent.CursorIsClosedException;
import com.epam.deltix.util.concurrent.IntermittentlyAvailableCursor;
import com.epam.deltix.util.concurrent.NextResult;
import com.epam.deltix.util.io.idlestrat.IdleStrategy;
import io.aeron.Aeron;
import io.aeron.AvailableImageHandler;
import io.aeron.ControlledFragmentAssembler;
import io.aeron.Image;
import io.aeron.Subscription;
import io.aeron.logbuffer.ControlledFragmentHandler;
import io.aeron.logbuffer.Header;
import java.nio.ByteOrder;
import java.util.List;
import javax.annotation.ParametersAreNonnullByDefault;
import org.agrona.BitUtil;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;

@ParametersAreNonnullByDefault
/* loaded from: input_file:com/epam/deltix/qsrv/hf/topic/consumer/DirectMessageSource.class */
class DirectMessageSource implements MessageSource<InstrumentMessage>, IntermittentlyAvailableCursor {
    private static final int MESSAGES_PER_POLL = 100;
    private static final int HEADER_ALIGNMENT = 4;
    private static final int BODY_ALIGNMENT = 8;
    private static final byte SIZE_HEADER_SIZE = 4;
    private static final int INITIAL_BUFFER_SIZE;
    private InstrumentMessage curMsg;
    private final UnsafeBuffer messageBuffer;
    private final Subscription subscription;
    private final IdleStrategy idleStrategy;
    private final DirectBuffer arrayBuffer;
    private final ControlledFragmentAssembler fragmentHandler;
    private final DirectMessageDecoder decoder;
    static final /* synthetic */ boolean $assertionsDisabled;
    private int bufferLimit = 0;
    private int bufferPos = 0;
    private volatile boolean closed = false;
    private volatile boolean dataLoss = false;

    /* loaded from: input_file:com/epam/deltix/qsrv/hf/topic/consumer/DirectMessageSource$FragmentHandler.class */
    private class FragmentHandler implements ControlledFragmentHandler {
        static final /* synthetic */ boolean $assertionsDisabled;

        private FragmentHandler() {
        }

        public ControlledFragmentHandler.Action onFragment(DirectBuffer directBuffer, int i, int i2, Header header) {
            int align = BitUtil.align(BitUtil.align(DirectMessageSource.this.bufferLimit + 4, DirectMessageSource.BODY_ALIGNMENT) + i2, 4) - DirectMessageSource.this.bufferLimit;
            if (DirectMessageSource.this.remainingSpace() >= align) {
                DirectMessageSource.this.copyMessageToBuffer(directBuffer, i, i2);
                return ControlledFragmentHandler.Action.CONTINUE;
            }
            if (DirectMessageSource.this.hasBufferedMessages()) {
                return ControlledFragmentHandler.Action.ABORT;
            }
            if (!$assertionsDisabled && (DirectMessageSource.this.bufferPos != 0 || DirectMessageSource.this.bufferLimit != 0)) {
                throw new AssertionError("Buffer expected to be empty");
            }
            DirectMessageSource.this.messageBuffer.wrap(new byte[com.epam.deltix.util.BitUtil.nextPowerOfTwo(align)]);
            DirectMessageSource.this.copyMessageToBuffer(directBuffer, i, i2);
            return ControlledFragmentHandler.Action.BREAK;
        }

        static {
            $assertionsDisabled = !DirectMessageSource.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DirectMessageSource(Aeron aeron, boolean z, String str, int i, CodecFactory codecFactory, TypeLoader typeLoader, List<RecordClassDescriptor> list, IdleStrategy idleStrategy, MappingProvider mappingProvider) {
        if (!ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN)) {
            throw new IllegalArgumentException("Only LITTLE_ENDIAN byte order supported");
        }
        this.subscription = aeron.addSubscription(str, i, (AvailableImageHandler) null, this::onUnavailableImage);
        this.messageBuffer = new UnsafeBuffer(new byte[INITIAL_BUFFER_SIZE]);
        this.arrayBuffer = new UnsafeBuffer(this.messageBuffer.byteArray(), 0, 0);
        this.idleStrategy = idleStrategy;
        this.fragmentHandler = new ControlledFragmentAssembler(new FragmentHandler());
        this.decoder = new DirectMessageDecoder(this.arrayBuffer, z, codecFactory, typeLoader, list, mappingProvider.getMappingSnapshot(), mappingProvider);
    }

    /* renamed from: getMessage, reason: merged with bridge method [inline-methods] */
    public InstrumentMessage m4getMessage() {
        return this.curMsg;
    }

    public boolean next() throws CursorIsClosedException {
        assertNotClosed();
        while (true) {
            if (hasBufferedMessages()) {
                if (setupMessageFromBuffer()) {
                    return true;
                }
            } else if (pageDataIn()) {
                this.idleStrategy.reset();
            } else {
                this.idleStrategy.idle();
                assertNotClosed();
            }
        }
    }

    public NextResult nextIfAvailable() {
        assertNotClosed();
        do {
            if (!hasBufferedMessages() && !pageDataIn()) {
                return NextResult.UNAVAILABLE;
            }
        } while (!setupMessageFromBuffer());
        return NextResult.OK;
    }

    private void assertNotClosed() {
        if (this.subscription.isClosed()) {
            if (!this.dataLoss) {
                throw new CursorIsClosedException();
            }
            throw new ClosedDueToDataLossException();
        }
    }

    private boolean pageDataIn() {
        if (!$assertionsDisabled && remainingData() != 0) {
            throw new AssertionError();
        }
        this.bufferPos = 0;
        this.bufferLimit = 0;
        return this.subscription.controlledPoll(this.fragmentHandler, MESSAGES_PER_POLL) > 0;
    }

    private boolean setupMessageFromBuffer() {
        if (!$assertionsDisabled && !hasBufferedMessages()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !BitUtil.isAligned(this.bufferPos, 4)) {
            throw new AssertionError();
        }
        int i = this.messageBuffer.getInt(this.bufferPos);
        this.bufferPos = BitUtil.align(this.bufferPos + 4, BODY_ALIGNMENT);
        if (!$assertionsDisabled && !BitUtil.isAligned(this.bufferPos, BODY_ALIGNMENT)) {
            throw new AssertionError();
        }
        this.arrayBuffer.wrap(this.messageBuffer.byteArray(), this.bufferPos, i);
        InstrumentMessage processSingleMessageFromBuffer = this.decoder.processSingleMessageFromBuffer(i);
        this.bufferPos = BitUtil.align(this.bufferPos + i, 4);
        if (processSingleMessageFromBuffer == null) {
            return false;
        }
        this.curMsg = processSingleMessageFromBuffer;
        return true;
    }

    public boolean isAtEnd() {
        return false;
    }

    private boolean hasBufferedMessages() {
        return this.bufferLimit > this.bufferPos;
    }

    private int remainingData() {
        return this.bufferLimit - this.bufferPos;
    }

    private int remainingSpace() {
        return this.messageBuffer.capacity() - this.bufferLimit;
    }

    private void cleanup() {
        this.subscription.close();
    }

    public synchronized void close() {
        if (this.closed) {
            return;
        }
        cleanup();
        this.closed = true;
    }

    private void onUnavailableImage(Image image) {
        int sessionId = image.sessionId();
        if (this.subscription.isClosed() || this.decoder == null || this.decoder.checkIfSessionGracefullyClosed(sessionId)) {
            return;
        }
        onDataLossDetected();
    }

    private void onDataLossDetected() {
        this.dataLoss = true;
        this.subscription.close();
    }

    private void copyMessageToBuffer(DirectBuffer directBuffer, int i, int i2) {
        writeSizeHeaderToBuffer(i2);
        loadDataToBuffer(directBuffer, i, i2);
    }

    private void writeSizeHeaderToBuffer(int i) {
        if (!$assertionsDisabled && !BitUtil.isAligned(this.bufferLimit, 4)) {
            throw new AssertionError();
        }
        this.messageBuffer.putInt(this.bufferLimit, i);
        this.bufferLimit = BitUtil.align(this.bufferLimit + 4, BODY_ALIGNMENT);
    }

    private void loadDataToBuffer(DirectBuffer directBuffer, int i, int i2) {
        if (!$assertionsDisabled && !BitUtil.isAligned(this.bufferLimit, BODY_ALIGNMENT)) {
            throw new AssertionError();
        }
        directBuffer.getBytes(i, this.messageBuffer, this.bufferLimit, i2);
        this.bufferLimit = BitUtil.align(this.bufferLimit + i2, 4);
    }

    static {
        $assertionsDisabled = !DirectMessageSource.class.desiredAssertionStatus();
        INITIAL_BUFFER_SIZE = com.epam.deltix.util.BitUtil.nextPowerOfTwo(131072);
    }
}
