package org.apache.kafka.common.network;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/common/network/SelectorTest.class */
public class SelectorTest {
    private static final List<NetworkSend> EMPTY = new ArrayList();
    private static final int BUFFER_SIZE = 4096;
    private EchoServer server;
    private Selectable selector;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/common/network/SelectorTest$EchoServer.class */
    public static class EchoServer extends Thread {
        public final int port = TestUtils.choosePort();
        private final ServerSocket serverSocket = new ServerSocket(this.port);
        private final List<Thread> threads = Collections.synchronizedList(new ArrayList());
        private final List<Socket> sockets = Collections.synchronizedList(new ArrayList());

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    final Socket accept = this.serverSocket.accept();
                    this.sockets.add(accept);
                    Thread thread = new Thread() { // from class: org.apache.kafka.common.network.SelectorTest.EchoServer.1
                        @Override // java.lang.Thread, java.lang.Runnable
                        public void run() {
                            try {
                                DataInputStream dataInputStream = new DataInputStream(accept.getInputStream());
                                DataOutputStream dataOutputStream = new DataOutputStream(accept.getOutputStream());
                                while (accept.isConnected() && !accept.isClosed()) {
                                    int readInt = dataInputStream.readInt();
                                    byte[] bArr = new byte[readInt];
                                    dataInputStream.readFully(bArr);
                                    dataOutputStream.writeInt(readInt);
                                    dataOutputStream.write(bArr);
                                    dataOutputStream.flush();
                                }
                                try {
                                    accept.close();
                                } catch (IOException e) {
                                }
                            } catch (IOException e2) {
                                try {
                                    accept.close();
                                } catch (IOException e3) {
                                }
                            } catch (Throwable th) {
                                try {
                                    accept.close();
                                } catch (IOException e4) {
                                }
                                throw th;
                            }
                        }
                    };
                    thread.start();
                    this.threads.add(thread);
                } catch (IOException e) {
                    return;
                }
            }
        }

        public void closeConnections() throws IOException {
            Iterator<Socket> it = this.sockets.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
        }

        public void close() throws IOException, InterruptedException {
            this.serverSocket.close();
            closeConnections();
            Iterator<Thread> it = this.threads.iterator();
            while (it.hasNext()) {
                it.next().join();
            }
            join();
        }
    }

    @Before
    public void setup() throws Exception {
        this.server = new EchoServer();
        this.server.start();
        this.selector = new Selector(new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap());
    }

    @After
    public void teardown() throws Exception {
        this.selector.close();
        this.server.close();
    }

    @Test
    public void testServerDisconnect() throws Exception {
        blockingConnect(0);
        Assert.assertEquals("hello", blockingRequest(0, "hello"));
        this.server.closeConnections();
        while (!this.selector.disconnected().contains(0)) {
            this.selector.poll(1000L, EMPTY);
        }
        blockingConnect(0);
        Assert.assertEquals("hello", blockingRequest(0, "hello"));
    }

    @Test
    public void testClientDisconnect() throws Exception {
        blockingConnect(0);
        this.selector.disconnect(0);
        this.selector.poll(10L, Arrays.asList(createSend(0, "hello1")));
        Assert.assertEquals("Request should not have succeeded", 0L, this.selector.completedSends().size());
        Assert.assertEquals("There should be a disconnect", 1L, this.selector.disconnected().size());
        Assert.assertTrue("The disconnect should be from our node", this.selector.disconnected().contains(0));
        blockingConnect(0);
        Assert.assertEquals("hello2", blockingRequest(0, "hello2"));
    }

    @Test(expected = IllegalStateException.class)
    public void testCantSendWithInProgress() throws Exception {
        blockingConnect(0);
        this.selector.poll(1000L, Arrays.asList(createSend(0, "test1"), createSend(0, "test2")));
    }

    @Test(expected = IllegalStateException.class)
    public void testCantSendWithoutConnecting() throws Exception {
        this.selector.poll(1000L, Arrays.asList(createSend(0, "test")));
    }

    @Test(expected = IOException.class)
    public void testNoRouteToHost() throws Exception {
        this.selector.connect(0, new InetSocketAddress("asdf.asdf.dsc", this.server.port), BUFFER_SIZE, BUFFER_SIZE);
    }

    @Test
    public void testConnectionRefused() throws Exception {
        this.selector.connect(0, new InetSocketAddress("localhost", TestUtils.choosePort()), BUFFER_SIZE, BUFFER_SIZE);
        while (this.selector.disconnected().contains(0)) {
            this.selector.poll(1000L, EMPTY);
        }
    }

    @Test
    public void testNormalOperation() throws Exception {
        InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", this.server.port);
        for (int i = 0; i < 5; i++) {
            this.selector.connect(i, inetSocketAddress, BUFFER_SIZE, BUFFER_SIZE);
        }
        int[] iArr = new int[5];
        int[] iArr2 = new int[5];
        int i2 = 0;
        ArrayList arrayList = new ArrayList();
        for (int i3 = 0; i3 < 5; i3++) {
            arrayList.add(createSend(i3, i3 + "-0"));
        }
        while (i2 < 5 * 500) {
            this.selector.poll(0L, arrayList);
            Assert.assertEquals("No disconnects should have occurred.", 0L, this.selector.disconnected().size());
            for (NetworkReceive networkReceive : this.selector.completedReceives()) {
                String[] split = asString(networkReceive).split("-");
                Assert.assertEquals("Should be in the form 'conn-counter'", 2L, split.length);
                Assert.assertEquals("Check the source", networkReceive.source(), Integer.parseInt(split[0]));
                Assert.assertEquals("Check that the receive has kindly been rewound", 0L, networkReceive.payload().position());
                Assert.assertEquals("Check the request counter", iArr2[networkReceive.source()], Integer.parseInt(split[1]));
                int source = networkReceive.source();
                iArr2[source] = iArr2[source] + 1;
                i2++;
            }
            arrayList.clear();
            Iterator it = this.selector.completedSends().iterator();
            while (it.hasNext()) {
                int destination = ((NetworkSend) it.next()).destination();
                iArr[destination] = iArr[destination] + 1;
                if (iArr[destination] < 500) {
                    arrayList.add(createSend(destination, destination + "-" + iArr[destination]));
                }
            }
        }
    }

    @Test
    public void testSendLargeRequest() throws Exception {
        blockingConnect(0);
        String randomString = TestUtils.randomString(40960);
        Assert.assertEquals(randomString, blockingRequest(0, randomString));
    }

    @Test
    public void testEmptyRequest() throws Exception {
        blockingConnect(0);
        Assert.assertEquals("", blockingRequest(0, ""));
    }

    @Test(expected = IllegalStateException.class)
    public void testExistingConnectionId() throws IOException {
        blockingConnect(0);
        blockingConnect(0);
    }

    private String blockingRequest(int i, String str) throws IOException {
        this.selector.poll(1000L, Arrays.asList(createSend(i, str)));
        while (true) {
            this.selector.poll(1000L, EMPTY);
            for (NetworkReceive networkReceive : this.selector.completedReceives()) {
                if (networkReceive.source() == i) {
                    return asString(networkReceive);
                }
            }
        }
    }

    private void blockingConnect(int i) throws IOException {
        this.selector.connect(i, new InetSocketAddress("localhost", this.server.port), BUFFER_SIZE, BUFFER_SIZE);
        while (!this.selector.connected().contains(Integer.valueOf(i))) {
            this.selector.poll(10000L, EMPTY);
        }
    }

    private NetworkSend createSend(int i, String str) {
        return new NetworkSend(i, new ByteBuffer[]{ByteBuffer.wrap(str.getBytes())});
    }

    private String asString(NetworkReceive networkReceive) {
        return new String(Utils.toArray(networkReceive.payload()));
    }
}
