package uk.co.real_logic.artio.system_tests;

import io.aeron.archive.ArchivingMediaDriver;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.List;
import org.agrona.CloseHelper;
import org.agrona.concurrent.EpochNanoClock;
import org.agrona.concurrent.OffsetEpochNanoClock;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import uk.co.real_logic.artio.TestFixtures;
import uk.co.real_logic.artio.Timing;
import uk.co.real_logic.artio.builder.Encoder;
import uk.co.real_logic.artio.builder.LogonEncoder;
import uk.co.real_logic.artio.builder.TestRequestEncoder;
import uk.co.real_logic.artio.engine.ConnectedSessionInfo;
import uk.co.real_logic.artio.engine.EngineConfiguration;
import uk.co.real_logic.artio.engine.FixEngine;
import uk.co.real_logic.artio.engine.LockStepFramerEngineScheduler;
import uk.co.real_logic.artio.engine.framer.LibraryInfo;
import uk.co.real_logic.artio.fields.UtcTimestampEncoder;
import uk.co.real_logic.artio.library.FixLibrary;
import uk.co.real_logic.artio.library.LibraryConfiguration;
import uk.co.real_logic.artio.messages.SessionReplyStatus;
import uk.co.real_logic.artio.messages.SessionState;
import uk.co.real_logic.artio.session.Session;
import uk.co.real_logic.artio.util.MutableAsciiBuffer;

/* loaded from: input_file:uk/co/real_logic/artio/system_tests/SlowConsumerTest.class */
public class SlowConsumerTest {
    private static final int BUFFER_CAPACITY = 16384;
    private static final int TEST_TIMEOUT = 20000;
    private ArchivingMediaDriver mediaDriver;
    private FixEngine engine;
    private FixLibrary library;
    private TestSystem testSystem;
    private SocketChannel socket;
    private Session session;
    private final EpochNanoClock nanoClock = new OffsetEpochNanoClock();
    private final int port = TestFixtures.unusedPort();
    private final FakeOtfAcceptor acceptingOtfAcceptor = new FakeOtfAcceptor();
    private final FakeHandler handler = new FakeHandler(this.acceptingOtfAcceptor);
    private final TestRequestEncoder testRequest = newTestRequest();
    private final LogonEncoder logon = new LogonEncoder();
    private final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(BUFFER_CAPACITY);
    private final MutableAsciiBuffer buffer = new MutableAsciiBuffer(this.byteBuffer);
    private final LockStepFramerEngineScheduler scheduler = new LockStepFramerEngineScheduler();

    @Test(timeout = 20000)
    public void shouldQuarantineThenDisconnectASlowConsumer() throws IOException {
        setup(FixConnection.BUFFER_SIZE, null);
        initiateConnection();
        FakeHandler fakeHandler = this.handler;
        TestSystem testSystem = this.testSystem;
        testSystem.getClass();
        this.session = SystemTestUtil.acquireSession(this.handler, this.library, fakeHandler.awaitSessionId(testSystem::poll), this.testSystem);
        ConnectedSessionInfo sessionInfo = getSessionInfo();
        while (!socketIsConnected()) {
            this.testSystem.poll();
        }
        assertNotSlow();
        boolean z = false;
        while (socketIsConnected()) {
            if (this.session.isActive()) {
                if (this.handler.isSlow(this.session)) {
                    z = true;
                }
                this.session.trySend(this.testRequest);
            }
            this.testSystem.poll();
        }
        bytesInBufferAtLeast(sessionInfo, 8192L);
        Assert.assertTrue(z);
    }

    @Test(timeout = 20000)
    public void shouldRestoreConnectionFromSlowGroupWhenItCatchesUp() throws IOException {
        MessageTimingCaptor messageTimingCaptor = new MessageTimingCaptor();
        ConnectedSessionInfo sessionBecomesSlow = sessionBecomesSlow(messageTimingCaptor);
        this.socket.configureBlocking(false);
        this.testSystem.poll();
        while (true) {
            if (sessionBecomesSlow.bytesInBuffer() <= 0 && !this.handler.isSlow(this.session)) {
                assertNotSlow();
                messageTimingCaptor.verifyConsecutiveSequenceNumbers(this.session.lastSentMsgSeqNum());
                Assert.assertEquals(SessionState.ACTIVE, this.session.state());
                Assert.assertTrue(socketIsConnected());
                return;
            }
            do {
                this.byteBuffer.position(1).limit(BUFFER_CAPACITY);
            } while (this.socket.read(this.byteBuffer) > 0);
            this.session.trySend(this.testRequest);
            this.testSystem.poll();
        }
    }

    @Test(timeout = 20000)
    public void shouldNotifyLibraryOfSlowConnectionWhenAcquired() throws IOException {
        sessionBecomesSlow(null);
        Assert.assertEquals(SessionReplyStatus.OK, SystemTestUtil.releaseToEngine(this.library, this.session, this.testSystem));
        this.session = SystemTestUtil.acquireSession(this.handler, this.library, this.session.id(), this.testSystem);
        Assert.assertTrue("Session not slow", this.handler.lastSessionWasSlow());
    }

    private ConnectedSessionInfo sessionBecomesSlow(MessageTimingCaptor messageTimingCaptor) throws IOException {
        setup(4194304, messageTimingCaptor);
        initiateConnection();
        FakeHandler fakeHandler = this.handler;
        TestSystem testSystem = this.testSystem;
        testSystem.getClass();
        this.session = SystemTestUtil.acquireSession(this.handler, this.library, fakeHandler.awaitSessionId(testSystem::poll), this.testSystem);
        ConnectedSessionInfo sessionInfo = getSessionInfo();
        assertNotSlow();
        while (true) {
            if (sessionInfo.bytesInBuffer() != 0 && this.handler.isSlow(this.session)) {
                assertIsSlow();
                return sessionInfo;
            }
            for (int i = 0; i < 10; i++) {
                this.session.trySend(this.testRequest);
            }
            this.testSystem.poll();
        }
    }

    private void assertIsSlow() {
        Assert.assertTrue(this.handler.isSlow(this.session));
    }

    private void assertNotSlow() {
        Assert.assertFalse(this.handler.isSlow(this.session));
    }

    private void bytesInBufferAtLeast(ConnectedSessionInfo connectedSessionInfo, long j) {
        Timing.assertEventuallyTrue("Buffer doesn't have enough bytes in", () -> {
            MatcherAssert.assertThat(Long.valueOf(connectedSessionInfo.bytesInBuffer()), Matchers.greaterThanOrEqualTo(Long.valueOf(j)));
            this.testSystem.poll();
        });
    }

    private TestRequestEncoder newTestRequest() {
        TestRequestEncoder testRequestEncoder = new TestRequestEncoder();
        testRequestEncoder.testReqID("some relatively long test req id");
        return testRequestEncoder;
    }

    private ConnectedSessionInfo getSessionInfo() {
        List<LibraryInfo> libraries = SystemTestUtil.libraries(this.engine, this.testSystem);
        MatcherAssert.assertThat(libraries, Matchers.hasSize(2));
        List sessions = libraries.get(0).sessions();
        MatcherAssert.assertThat(sessions, Matchers.hasSize(1));
        return (ConnectedSessionInfo) sessions.get(0);
    }

    private void initiateConnection() throws IOException {
        this.socket = SocketChannel.open(new InetSocketAddress("localhost", this.port));
        UtcTimestampEncoder utcTimestampEncoder = new UtcTimestampEncoder();
        utcTimestampEncoder.encode(System.currentTimeMillis());
        this.logon.heartBtInt(10).encryptMethod(0).header().sendingTime(utcTimestampEncoder.buffer()).msgSeqNum(1).senderCompID(SystemTestUtil.INITIATOR_ID).targetCompID(SystemTestUtil.ACCEPTOR_ID);
        long encode = this.logon.encode(this.buffer, 0);
        int offset = Encoder.offset(encode);
        int length = Encoder.length(encode);
        this.byteBuffer.position(offset);
        this.byteBuffer.limit(offset + length);
        Assert.assertEquals(length, this.socket.write(this.byteBuffer));
    }

    private boolean socketIsConnected() {
        try {
            this.byteBuffer.position(0).limit(1);
            return this.socket.write(this.byteBuffer) >= 0;
        } catch (IOException e) {
            return false;
        }
    }

    @After
    public void cleanup() {
        this.testSystem.awaitBlocking(() -> {
            CloseHelper.close(this.engine);
        });
        CloseHelper.close(this.library);
        TestFixtures.cleanupMediaDriver(this.mediaDriver);
        CloseHelper.close(this.socket);
    }

    private void setup(int i, MessageTimingCaptor messageTimingCaptor) {
        this.mediaDriver = TestFixtures.launchMediaDriver(8388608);
        EngineConfiguration scheduler = SystemTestUtil.acceptingConfig(this.port, SystemTestUtil.ACCEPTOR_ID, SystemTestUtil.INITIATOR_ID, this.nanoClock).scheduler(this.scheduler);
        scheduler.deleteLogFileDirOnStart(true);
        scheduler.senderMaxBytesInBuffer(i);
        scheduler.messageTimingHandler(messageTimingCaptor);
        this.engine = FixEngine.launch(scheduler);
        this.testSystem = new TestSystem(this.scheduler, new FixLibrary[0]);
        LibraryConfiguration acceptingLibraryConfig = SystemTestUtil.acceptingLibraryConfig(this.handler, this.nanoClock);
        acceptingLibraryConfig.outboundMaxClaimAttempts(1);
        this.library = this.testSystem.connect(acceptingLibraryConfig);
    }
}
