package kafka.network;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.Properties;
import java.util.Random;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.TrustManager;
import kafka.network.RequestChannel;
import kafka.security.CredentialProvider;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.utils.NotNothing$;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.scalactic.source.Position;
import org.scalatest.junit.JUnitSuite;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$$eq$colon$eq$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.MapLike;
import scala.collection.TraversableLike;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.Set;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;

/* compiled from: SocketServerTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\t}a\u0001B\u0001\u0003\u0001\u001d\u0011\u0001cU8dW\u0016$8+\u001a:wKJ$Vm\u001d;\u000b\u0005\r!\u0011a\u00028fi^|'o\u001b\u0006\u0002\u000b\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\t!\tI\u0001#D\u0001\u000b\u0015\tYA\"A\u0003kk:LGO\u0003\u0002\u000e\u001d\u0005I1oY1mCR,7\u000f\u001e\u0006\u0002\u001f\u0005\u0019qN]4\n\u0005EQ!A\u0003&V]&$8+^5uK\")1\u0003\u0001C\u0001)\u00051A(\u001b8jiz\"\u0012!\u0006\t\u0003-\u0001i\u0011A\u0001\u0005\b1\u0001\u0011\r\u0011\"\u0001\u001a\u0003\u0015\u0001(o\u001c9t+\u0005Q\u0002CA\u000e!\u001b\u0005a\"BA\u000f\u001f\u0003\u0011)H/\u001b7\u000b\u0003}\tAA[1wC&\u0011\u0011\u0005\b\u0002\u000b!J|\u0007/\u001a:uS\u0016\u001c\bBB\u0012\u0001A\u0003%!$\u0001\u0004qe>\u00048\u000f\t\u0005\bK\u0001\u0011\r\u0011\"\u0001'\u0003\u0019\u0019wN\u001c4jOV\tq\u0005\u0005\u0002)W5\t\u0011F\u0003\u0002+\t\u000511/\u001a:wKJL!\u0001L\u0015\u0003\u0017-\u000bgm[1D_:4\u0017n\u001a\u0005\u0007]\u0001\u0001\u000b\u0011B\u0014\u0002\u000f\r|gNZ5hA!9\u0001\u0007\u0001b\u0001\n\u0003\t\u0014aB7fiJL7m]\u000b\u0002eA\u00111GO\u0007\u0002i)\u0011\u0001'\u000e\u0006\u0003m]\naaY8n[>t'BA\u00039\u0015\tId\"\u0001\u0004ba\u0006\u001c\u0007.Z\u0005\u0003wQ\u0012q!T3ue&\u001c7\u000f\u0003\u0004>\u0001\u0001\u0006IAM\u0001\t[\u0016$(/[2tA!9q\b\u0001b\u0001\n\u0003\u0001\u0015AE2sK\u0012,g\u000e^5bYB\u0013xN^5eKJ,\u0012!\u0011\t\u0003\u0005\u0016k\u0011a\u0011\u0006\u0003\t\u0012\t\u0001b]3dkJLG/_\u0005\u0003\r\u000e\u0013!c\u0011:fI\u0016tG/[1m!J|g/\u001b3fe\"1\u0001\n\u0001Q\u0001\n\u0005\u000b1c\u0019:fI\u0016tG/[1m!J|g/\u001b3fe\u0002BqA\u000b\u0001C\u0002\u0013\u0005!*F\u0001L!\t1B*\u0003\u0002N\u0005\ta1k\\2lKR\u001cVM\u001d<fe\"1q\n\u0001Q\u0001\n-\u000bqa]3sm\u0016\u0014\b\u0005C\u0004R\u0001\t\u0007I\u0011\u0001*\u0002\u000fM|7m[3ugV\t1\u000bE\u0002U7vk\u0011!\u0016\u0006\u0003-^\u000bq!\\;uC\ndWM\u0003\u0002Y3\u0006Q1m\u001c7mK\u000e$\u0018n\u001c8\u000b\u0003i\u000bQa]2bY\u0006L!\u0001X+\u0003\u0017\u0005\u0013(/Y=Ck\u001a4WM\u001d\t\u0003=\u0006l\u0011a\u0018\u0006\u0003Az\t1A\\3u\u0013\t\u0011wL\u0001\u0004T_\u000e\\W\r\u001e\u0005\u0007I\u0002\u0001\u000b\u0011B*\u0002\u0011M|7m[3ug\u0002BQA\u001a\u0001\u0005\u0002\u001d\f1b]3oIJ+\u0017/^3tiR)\u0001\u000e\u001c8w}B\u0011\u0011N[\u0007\u00023&\u00111.\u0017\u0002\u0005+:LG\u000fC\u0003nK\u0002\u0007Q,\u0001\u0004t_\u000e\\W\r\u001e\u0005\u0006_\u0016\u0004\r\u0001]\u0001\be\u0016\fX/Z:u!\rI\u0017o]\u0005\u0003ef\u0013Q!\u0011:sCf\u0004\"!\u001b;\n\u0005UL&\u0001\u0002\"zi\u0016Dqa^3\u0011\u0002\u0003\u0007\u00010\u0001\u0002jIB\u0019\u0011._>\n\u0005iL&AB(qi&|g\u000e\u0005\u0002jy&\u0011Q0\u0017\u0002\u0006'\"|'\u000f\u001e\u0005\t\u007f\u0016\u0004\n\u00111\u0001\u0002\u0002\u0005)a\r\\;tQB\u0019\u0011.a\u0001\n\u0007\u0005\u0015\u0011LA\u0004C_>dW-\u00198\t\u000f\u0005%\u0001\u0001\"\u0001\u0002\f\u0005y!/Z2fSZ,'+Z:q_:\u001cX\rF\u0002q\u0003\u001bAa!\\A\u0004\u0001\u0004i\u0006bBA\t\u0001\u0011%\u00111C\u0001\u000fe\u0016\u001cW-\u001b<f%\u0016\fX/Z:u)\u0019\t)\"a\t\u0002.A!\u0011qCA\u000f\u001d\r1\u0012\u0011D\u0005\u0004\u00037\u0011\u0011A\u0004*fcV,7\u000f^\"iC:tW\r\\\u0005\u0005\u0003?\t\tCA\u0004SKF,Xm\u001d;\u000b\u0007\u0005m!\u0001\u0003\u0005\u0002&\u0005=\u0001\u0019AA\u0014\u0003\u001d\u0019\u0007.\u00198oK2\u00042AFA\u0015\u0013\r\tYC\u0001\u0002\u000f%\u0016\fX/Z:u\u0007\"\fgN\\3m\u0011)\ty#a\u0004\u0011\u0002\u0003\u0007\u0011\u0011G\u0001\bi&lWm\\;u!\rI\u00171G\u0005\u0004\u0003kI&\u0001\u0002'p]\u001eDq!!\u000f\u0001\t\u0003\tY$\u0001\bqe>\u001cWm]:SKF,Xm\u001d;\u0015\u0007!\fi\u0004\u0003\u0005\u0002&\u0005]\u0002\u0019AA\u0014\u0011\u001d\tI\u0004\u0001C\u0001\u0003\u0003\"R\u0001[A\"\u0003\u000bB\u0001\"!\n\u0002@\u0001\u0007\u0011q\u0005\u0005\b_\u0006}\u0002\u0019AA\u000b\u0011\u001d\tI\u0005\u0001C\u0001\u0003\u0017\nqaY8o]\u0016\u001cG\u000fF\u0003^\u0003\u001b\n\t\u0006C\u0005\u0002P\u0005\u001d\u0003\u0013!a\u0001\u0017\u0006\t1\u000f\u0003\u0006\u0002T\u0005\u001d\u0003\u0013!a\u0001\u0003+\n\u0001\u0002\u001d:pi>\u001cw\u000e\u001c\t\u0005\u0003/\nY&\u0004\u0002\u0002Z)\u0019\u00111K\u001b\n\t\u0005u\u0013\u0011\f\u0002\u0011'\u0016\u001cWO]5usB\u0013x\u000e^8d_2Dq!!\u0019\u0001\t\u0003\t\u0019'\u0001\u0005uK\u0006\u0014Hi\\<o)\u0005A\u0007\u0006BA0\u0003O\u0002B!!\u001b\u0002n5\u0011\u00111\u000e\u0006\u0003\u00179IA!a\u001c\u0002l\t)\u0011I\u001a;fe\"9\u00111\u000f\u0001\u0005\n\u0005U\u0014\u0001\u00069s_\u0012,8-\u001a:SKF,Xm\u001d;CsR,7\u000fF\u0002q\u0003oB\u0011\"!\u001f\u0002rA\u0005\t\u0019A>\u0002\u0007\u0005\u001c7\u000eC\u0004\u0002~\u0001!I!a \u0002=M,g\u000e\u001a*fcV,7\u000f^:V]RLGn\u0015;bO\u0016$'+Z2fSZ,G\u0003CA\u000b\u0003\u0003\u000b\u0019)!\"\t\r)\nY\b1\u0001L\u0011\u0019i\u00171\u0010a\u0001;\"9\u0011qQA>\u0001\u0004\u0001\u0018\u0001\u0004:fcV,7\u000f\u001e\"zi\u0016\u001c\bbBAF\u0001\u0011\u0005\u00111M\u0001\u000eg&l\u0007\u000f\\3SKF,Xm\u001d;)\t\u0005%\u0015q\u0012\t\u0005\u0003S\n\t*\u0003\u0003\u0002\u0014\u0006-$\u0001\u0002+fgRDq!a&\u0001\t\u0003\t\u0019'A\fu_>\u0014\u0015n\u001a*fcV,7\u000f^%t%\u0016TWm\u0019;fI\"\"\u0011QSAH\u0011\u001d\ti\n\u0001C\u0001\u0003G\n\u0011\u0003^3ti\u001e\u0013\u0018mY3gk2\u001cEn\\:fQ\u0011\tY*a$\t\u000f\u0005\r\u0006\u0001\"\u0001\u0002d\u0005QB/Z:u'>\u001c7.\u001a;t\u00072|7/Z(o'\",H\u000fZ8x]\"\"\u0011\u0011UAH\u0011\u001d\tI\u000b\u0001C\u0001\u0003G\nq\u0003^3ti6\u000b\u0007pQ8o]\u0016\u001cG/[8ogB+'/\u00139)\t\u0005\u001d\u0016q\u0012\u0005\b\u0003_\u0003A\u0011AA2\u0003\u0001\"Xm\u001d;NCb\u001cuN\u001c8fGRLwN\\:QKJL\u0005o\u0014<feJLG-Z:)\t\u00055\u0016q\u0012\u0005\b\u0003k\u0003A\u0011AA2\u0003M!Xm\u001d;Tg2\u001cvnY6fiN+'O^3sQ\u0011\t\u0019,a$\t\u000f\u0005m\u0006\u0001\"\u0001\u0002d\u0005!B/Z:u'\u0016\u001c8/[8o!JLgnY5qC2DC!!/\u0002\u0010\"9\u0011\u0011\u0019\u0001\u0005\u0002\u0005\r\u0014\u0001\f;fgR\u001cE.[3oi\u0012K7oY8o]\u0016\u001cG/[8o+B$\u0017\r^3t%\u0016\fX/Z:u\u001b\u0016$(/[2tQ\u0011\ty,a$\t\u000f\u0005\u001d\u0007\u0001\"\u0001\u0002d\u00059D/Z:u\u00072LWM\u001c;ESN\u001cwN\u001c8fGRLwN\\,ji\"\u001cF/Y4fIJ+7-Z5wKN4U\u000f\u001c7z!J|7-Z:tK\u0012DC!!2\u0002\u0010\"9\u0011Q\u001a\u0001\u0005\u0002\u0005\r\u0014!\u000e;fgR\u0014%o\\6feN+g\u000eZ!gi\u0016\u00148\t[1o]\u0016d7\t\\8tK\u0012,\u0006\u000fZ1uKN\u0014V-];fgRlU\r\u001e:jGNDC!a3\u0002\u0010\"9\u00111\u001b\u0001\u0005\u0002\u0005\r\u0014!\t;fgRlU\r\u001e:jG\u000e{G\u000e\\3di&|g.\u00114uKJ\u001c\u0006.\u001e;e_^t\u0007\u0006BAi\u0003\u001fCq!!7\u0001\t\u0003\t\u0019'\u0001\ruKN$\bK]8dKN\u001cxN]'fiJL7m\u001d+bONDC!a6\u0002\u0010\"I\u0011q\u001c\u0001\u0012\u0002\u0013\u0005\u0011\u0011]\u0001\u0016g\u0016tGMU3rk\u0016\u001cH\u000f\n3fM\u0006,H\u000e\u001e\u00134+\t\t\u0019OK\u0002y\u0003K\\#!a:\u0011\t\u0005%\u00181_\u0007\u0003\u0003WTA!!<\u0002p\u0006IQO\\2iK\u000e\\W\r\u001a\u0006\u0004\u0003cL\u0016AC1o]>$\u0018\r^5p]&!\u0011Q_Av\u0005E)hn\u00195fG.,GMV1sS\u0006t7-\u001a\u0005\n\u0003s\u0004\u0011\u0013!C\u0001\u0003w\fQc]3oIJ+\u0017/^3ti\u0012\"WMZ1vYR$C'\u0006\u0002\u0002~*\"\u0011\u0011AAs\u0011%\u0011\t\u0001AI\u0001\n\u0013\u0011\u0019!\u0001\rsK\u000e,\u0017N^3SKF,Xm\u001d;%I\u00164\u0017-\u001e7uII*\"A!\u0002+\t\u0005E\u0012Q\u001d\u0005\n\u0005\u0013\u0001\u0011\u0013!C\u0001\u0005\u0017\t\u0011cY8o]\u0016\u001cG\u000f\n3fM\u0006,H\u000e\u001e\u00132+\t\u0011iAK\u0002L\u0003KD\u0011B!\u0005\u0001#\u0003%\tAa\u0005\u0002#\r|gN\\3di\u0012\"WMZ1vYR$#'\u0006\u0002\u0003\u0016)\"\u0011QKAs\u0011%\u0011I\u0002AI\u0001\n\u0013\u0011Y\"\u0001\u0010qe>$WoY3s%\u0016\fX/Z:u\u0005f$Xm\u001d\u0013eK\u001a\fW\u000f\u001c;%cU\u0011!Q\u0004\u0016\u0004w\u0006\u0015\b")
/* loaded from: input_file:kafka/network/SocketServerTest.class */
public class SocketServerTest extends JUnitSuite {
    private final Properties props = TestUtils$.MODULE$.createBrokerConfig(0, TestUtils$.MODULE$.MockZkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), 0, TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16());
    private final KafkaConfig config;
    private final Metrics metrics;
    private final CredentialProvider credentialProvider;
    private final SocketServer server;
    private final ArrayBuffer<Socket> sockets;

    public Properties props() {
        return this.props;
    }

    public KafkaConfig config() {
        return this.config;
    }

    public Metrics metrics() {
        return this.metrics;
    }

    public CredentialProvider credentialProvider() {
        return this.credentialProvider;
    }

    public SocketServer server() {
        return this.server;
    }

    public ArrayBuffer<Socket> sockets() {
        return this.sockets;
    }

    public void sendRequest(Socket socket, byte[] bArr, Option<Object> option, boolean z) {
        DataOutputStream dataOutputStream = new DataOutputStream(socket.getOutputStream());
        if (option instanceof Some) {
            short unboxToShort = BoxesRunTime.unboxToShort(((Some) option).x());
            dataOutputStream.writeInt(bArr.length + 2);
            dataOutputStream.writeShort(unboxToShort);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!None$.MODULE$.equals(option)) {
                throw new MatchError(option);
            }
            dataOutputStream.writeInt(bArr.length);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        dataOutputStream.write(bArr);
        if (z) {
            dataOutputStream.flush();
        }
    }

    public Option<Object> sendRequest$default$3() {
        return None$.MODULE$;
    }

    public boolean sendRequest$default$4() {
        return true;
    }

    public byte[] receiveResponse(Socket socket) {
        DataInputStream dataInputStream = new DataInputStream(socket.getInputStream());
        byte[] bArr = new byte[dataInputStream.readInt()];
        dataInputStream.readFully(bArr);
        return bArr;
    }

    private RequestChannel.Request receiveRequest(RequestChannel requestChannel, long j) {
        RequestChannel.Request receiveRequest = requestChannel.receiveRequest(j);
        Assert.assertNotNull("receiveRequest timed out", receiveRequest);
        return receiveRequest;
    }

    private long receiveRequest$default$2() {
        return 2000L;
    }

    public void processRequest(RequestChannel requestChannel) {
        processRequest(requestChannel, receiveRequest(requestChannel, receiveRequest$default$2()));
    }

    public void processRequest(RequestChannel requestChannel, RequestChannel.Request request) {
        ByteBuffer serialize = request.body(ClassTag$.MODULE$.apply(AbstractRequest.class), NotNothing$.MODULE$.notNothingEvidence(Predef$$eq$colon$eq$.MODULE$.tpEquals())).serialize(request.header());
        serialize.rewind();
        requestChannel.sendResponse(RequestChannel$Response$.MODULE$.apply(request, new NetworkSend(request.connectionId(), serialize)));
    }

    public Socket connect(SocketServer socketServer, SecurityProtocol securityProtocol) {
        Socket socket = new Socket("localhost", socketServer.boundPort(ListenerName.forSecurityProtocol(securityProtocol)));
        sockets().$plus$eq(socket);
        return socket;
    }

    public SocketServer connect$default$1() {
        return server();
    }

    public SecurityProtocol connect$default$2() {
        return SecurityProtocol.PLAINTEXT;
    }

    @After
    public void tearDown() {
        metrics().close();
        server().shutdown();
        sockets().foreach(new SocketServerTest$$anonfun$tearDown$1(this));
        sockets().clear();
    }

    private byte[] producerRequestBytes(short s) {
        ProduceRequest build = new ProduceRequest.Builder((byte) 2, s, 10000, new HashMap()).build();
        ByteBuffer serialize = build.serialize(new RequestHeader((short) 0, build.version(), "", -1));
        serialize.rewind();
        byte[] bArr = new byte[serialize.remaining()];
        serialize.get(bArr);
        return bArr;
    }

    private short producerRequestBytes$default$1() {
        return (short) 0;
    }

    private RequestChannel.Request sendRequestsUntilStagedReceive(SocketServer socketServer, Socket socket, byte[] bArr) {
        Tuple2 computeUntilTrue = TestUtils$.MODULE$.computeUntilTrue(new SocketServerTest$$anonfun$4(this, socketServer, socket, bArr), TestUtils$.MODULE$.computeUntilTrue$default$2(), TestUtils$.MODULE$.computeUntilTrue$default$3(), new SocketServerTest$$anonfun$5(this, socketServer));
        if (computeUntilTrue == null) {
            throw new MatchError(computeUntilTrue);
        }
        Tuple2 tuple2 = new Tuple2((RequestChannel.Request) computeUntilTrue._1(), BoxesRunTime.boxToBoolean(computeUntilTrue._2$mcZ$sp()));
        RequestChannel.Request request = (RequestChannel.Request) tuple2._1();
        Assert.assertTrue(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Receives not staged for ", " ms"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(15000L)})), tuple2._2$mcZ$sp());
        return request;
    }

    @Test
    public void simpleRequest() {
        Socket connect = connect(connect$default$1(), SecurityProtocol.PLAINTEXT);
        Socket connect2 = connect(connect$default$1(), SecurityProtocol.TRACE);
        byte[] producerRequestBytes = producerRequestBytes(producerRequestBytes$default$1());
        sendRequest(connect, producerRequestBytes, sendRequest$default$3(), sendRequest$default$4());
        processRequest(server().requestChannel());
        Assert.assertEquals(Predef$.MODULE$.byteArrayOps(producerRequestBytes).toSeq(), Predef$.MODULE$.byteArrayOps(receiveResponse(connect)).toSeq());
        sendRequest(connect2, producerRequestBytes, sendRequest$default$3(), sendRequest$default$4());
        processRequest(server().requestChannel());
        Assert.assertEquals(Predef$.MODULE$.byteArrayOps(producerRequestBytes).toSeq(), Predef$.MODULE$.byteArrayOps(receiveResponse(connect2)).toSeq());
    }

    @Test
    public void tooBigRequestIsRejected() {
        byte[] bArr = new byte[Predef$.MODULE$.Integer2int(server().config().socketRequestMaxBytes()) + 1];
        new Random().nextBytes(bArr);
        Socket connect = connect(connect$default$1(), connect$default$2());
        DataOutputStream dataOutputStream = new DataOutputStream(connect.getOutputStream());
        dataOutputStream.writeInt(bArr.length);
        try {
            dataOutputStream.write(bArr);
            dataOutputStream.flush();
            receiveResponse(connect);
        } catch (IOException unused) {
        }
    }

    @Test
    public void testGracefulClose() {
        Socket connect = connect(connect$default$1(), SecurityProtocol.PLAINTEXT);
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 10).foreach$mVc$sp(new SocketServerTest$$anonfun$testGracefulClose$1(this, connect, producerRequestBytes(producerRequestBytes$default$1())));
        connect.close();
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 10).foreach$mVc$sp(new SocketServerTest$$anonfun$testGracefulClose$2(this));
    }

    @Test
    public void testSocketsCloseOnShutdown() {
        Socket connect = connect(connect$default$1(), SecurityProtocol.PLAINTEXT);
        connect.setTcpNoDelay(true);
        Socket connect2 = connect(connect$default$1(), SecurityProtocol.TRACE);
        connect2.setTcpNoDelay(true);
        byte[] bArr = new byte[40];
        sendRequest(connect, bArr, new Some(BoxesRunTime.boxToShort((short) 0)), sendRequest$default$4());
        sendRequest(connect2, bArr, new Some(BoxesRunTime.boxToShort((short) 0)), sendRequest$default$4());
        processRequest(server().requestChannel());
        Thread.sleep(200L);
        server().acceptors().values().map(new SocketServerTest$$anonfun$testSocketsCloseOnShutdown$1(this), Iterable$.MODULE$.canBuildFrom());
        server().shutdown();
        byte[] bArr2 = new byte[1000000];
        try {
            sendRequest(connect, bArr2, new Some(BoxesRunTime.boxToShort((short) 0)), sendRequest$default$4());
            throw fail("expected exception when writing to closed plain socket", new Position("SocketServerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 234));
        } catch (IOException unused) {
            try {
                sendRequest(connect2, bArr2, new Some(BoxesRunTime.boxToShort((short) 0)), sendRequest$default$4());
                throw fail("expected exception when writing to closed trace socket", new Position("SocketServerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 241));
            } catch (IOException unused2) {
            }
        }
    }

    @Test
    public void testMaxConnectionsPerIp() {
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), Predef$.MODULE$.Integer2int(server().config().maxConnectionsPerIp())).map(new SocketServerTest$$anonfun$6(this), IndexedSeq$.MODULE$.canBuildFrom());
        Socket connect = connect(connect$default$1(), connect$default$2());
        connect.setSoTimeout(3000);
        Assert.assertEquals(-1L, connect.getInputStream().read());
        connect.close();
        InetAddress inetAddress = ((Socket) indexedSeq.head()).getInetAddress();
        ((Socket) indexedSeq.head()).close();
        TestUtils$.MODULE$.waitUntilTrue(new SocketServerTest$$anonfun$testMaxConnectionsPerIp$1(this, indexedSeq, inetAddress), new SocketServerTest$$anonfun$testMaxConnectionsPerIp$2(this), TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4());
        sendRequest(connect(connect$default$1(), connect$default$2()), producerRequestBytes(producerRequestBytes$default$1()), sendRequest$default$3(), sendRequest$default$4());
        Assert.assertNotNull(server().requestChannel().receiveRequest(2000L));
    }

    @Test
    public void testMaxConnectionsPerIpOverrides() {
        int Integer2int = Predef$.MODULE$.Integer2int(server().config().maxConnectionsPerIp()) + 1;
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(0, TestUtils$.MODULE$.MockZkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), 0, TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16());
        createBrokerConfig.put(KafkaConfig$.MODULE$.MaxConnectionsPerIpOverridesProp(), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"localhost:", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(Integer2int)})));
        Metrics metrics = new Metrics();
        SocketServer socketServer = new SocketServer(KafkaConfig$.MODULE$.fromProps(createBrokerConfig), metrics, Time.SYSTEM, credentialProvider());
        try {
            socketServer.startup();
            sendRequest((Socket) ((IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), Integer2int).map(new SocketServerTest$$anonfun$7(this, socketServer), IndexedSeq$.MODULE$.canBuildFrom())).last(), producerRequestBytes(producerRequestBytes$default$1()), sendRequest$default$3(), sendRequest$default$4());
            Assert.assertNotNull(socketServer.requestChannel().receiveRequest(2000L));
            connect(socketServer, connect$default$2()).setSoTimeout(3000);
            Assert.assertEquals(-1L, r0.getInputStream().read());
        } finally {
            socketServer.shutdown();
            metrics.close();
        }
    }

    @Test
    public void testSslSocketServer() {
        File createTempFile = File.createTempFile("truststore", ".jks");
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(0, TestUtils$.MODULE$.MockZkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), new Some<>(SecurityProtocol.SSL), new Some<>(createTempFile), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16());
        createBrokerConfig.put(KafkaConfig$.MODULE$.ListenersProp(), "SSL://localhost:0");
        Metrics metrics = new Metrics();
        SocketServer socketServer = new SocketServer(KafkaConfig$.MODULE$.fromProps(createBrokerConfig), metrics, Time.SYSTEM, credentialProvider());
        try {
            socketServer.startup();
            SSLContext sSLContext = SSLContext.getInstance("TLSv1.2");
            sSLContext.init(null, new TrustManager[]{TestUtils$.MODULE$.trustAllCerts()}, new SecureRandom());
            SSLSocket sSLSocket = (SSLSocket) sSLContext.getSocketFactory().createSocket("localhost", socketServer.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.SSL)));
            sSLSocket.setNeedClientAuth(false);
            short s = ApiKeys.PRODUCE.id;
            ProduceRequest build = new ProduceRequest.Builder((byte) 2, (short) 0, 10000, new HashMap()).build();
            ByteBuffer serialize = build.serialize(new RequestHeader(s, build.version(), "", -1));
            serialize.rewind();
            byte[] bArr = new byte[serialize.remaining()];
            serialize.get(bArr);
            sendRequest(sSLSocket, bArr, sendRequest$default$3(), sendRequest$default$4());
            processRequest(socketServer.requestChannel());
            Assert.assertEquals(Predef$.MODULE$.byteArrayOps(bArr).toSeq(), Predef$.MODULE$.byteArrayOps(receiveResponse(sSLSocket)).toSeq());
            sSLSocket.close();
        } finally {
            socketServer.shutdown();
            metrics.close();
        }
    }

    @Test
    public void testSessionPrincipal() {
        sendRequest(connect(connect$default$1(), connect$default$2()), new byte[40], new Some(BoxesRunTime.boxToShort((short) 0)), sendRequest$default$4());
        Assert.assertEquals(KafkaPrincipal.ANONYMOUS, server().requestChannel().receiveRequest(2000L).session().principal());
    }

    @Test
    public void testClientDisconnectionUpdatesRequestMetrics() {
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(0, TestUtils$.MODULE$.MockZkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), 0, TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16());
        Metrics metrics = new Metrics();
        ObjectRef create = ObjectRef.create((Object) null);
        SocketServerTest$$anon$2 socketServerTest$$anon$2 = new SocketServerTest$$anon$2(this, createBrokerConfig, metrics, create);
        try {
            socketServerTest$$anon$2.startup();
            create.elem = connect(socketServerTest$$anon$2, connect$default$2());
            sendRequest((Socket) create.elem, producerRequestBytes(producerRequestBytes$default$1()), sendRequest$default$3(), sendRequest$default$4());
            RequestChannel requestChannel = socketServerTest$$anon$2.requestChannel();
            RequestChannel.Request receiveRequest = requestChannel.receiveRequest(2000L);
            RequestMetrics requestMetrics = (RequestMetrics) RequestMetrics$.MODULE$.metricsMap().apply(ApiKeys.forId(receiveRequest.requestId()).name);
            long kafka$network$SocketServerTest$$totalTimeHistCount$1 = kafka$network$SocketServerTest$$totalTimeHistCount$1(requestMetrics) + 1;
            requestChannel.sendResponse(RequestChannel$Response$.MODULE$.apply(receiveRequest, new NetworkSend(receiveRequest.connectionId(), ByteBuffer.allocate(550000))));
            TestUtils$.MODULE$.waitUntilTrue(new SocketServerTest$$anonfun$testClientDisconnectionUpdatesRequestMetrics$1(this, requestMetrics, kafka$network$SocketServerTest$$totalTimeHistCount$1), new SocketServerTest$$anonfun$testClientDisconnectionUpdatesRequestMetrics$2(this, requestMetrics, kafka$network$SocketServerTest$$totalTimeHistCount$1), TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4());
        } finally {
            socketServerTest$$anon$2.shutdown();
            metrics.close();
        }
    }

    @Test
    public void testClientDisconnectionWithStagedReceivesFullyProcessed() {
        Socket connect = connect(server(), connect$default$2());
        RequestChannel.Request sendRequestsUntilStagedReceive = sendRequestsUntilStagedReceive(server(), connect, producerRequestBytes((short) 1));
        String connectionId = sendRequestsUntilStagedReceive.connectionId();
        connect.setSoLinger(true, 0);
        connect.close();
        processRequest(server().requestChannel(), sendRequestsUntilStagedReceive);
        TestUtils$.MODULE$.waitUntilTrue(new SocketServerTest$$anonfun$testClientDisconnectionWithStagedReceivesFullyProcessed$1(this, connectionId), new SocketServerTest$$anonfun$testClientDisconnectionWithStagedReceivesFullyProcessed$2(this), TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4());
    }

    @Test
    public void testBrokerSendAfterChannelClosedUpdatesRequestMetrics() {
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(0, TestUtils$.MODULE$.MockZkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), 0, TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16());
        createBrokerConfig.setProperty(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), "100");
        Metrics metrics = new Metrics();
        SocketServer socketServer = new SocketServer(KafkaConfig$.MODULE$.fromProps(createBrokerConfig), metrics, Time.SYSTEM, credentialProvider());
        try {
            socketServer.startup();
            sendRequest(connect(socketServer, connect$default$2()), producerRequestBytes(producerRequestBytes$default$1()), sendRequest$default$3(), sendRequest$default$4());
            RequestChannel requestChannel = socketServer.requestChannel();
            RequestChannel.Request receiveRequest = requestChannel.receiveRequest(2000L);
            TestUtils$.MODULE$.waitUntilTrue(new SocketServerTest$$anonfun$testBrokerSendAfterChannelClosedUpdatesRequestMetrics$1(this, socketServer, receiveRequest), new SocketServerTest$$anonfun$testBrokerSendAfterChannelClosedUpdatesRequestMetrics$3(this, receiveRequest), TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4());
            RequestMetrics requestMetrics = (RequestMetrics) RequestMetrics$.MODULE$.metricsMap().apply(ApiKeys.forId(receiveRequest.requestId()).name);
            long kafka$network$SocketServerTest$$totalTimeHistCount$2 = kafka$network$SocketServerTest$$totalTimeHistCount$2(requestMetrics) + 1;
            processRequest(requestChannel, receiveRequest);
            TestUtils$.MODULE$.waitUntilTrue(new SocketServerTest$$anonfun$testBrokerSendAfterChannelClosedUpdatesRequestMetrics$2(this, requestMetrics, kafka$network$SocketServerTest$$totalTimeHistCount$2), new SocketServerTest$$anonfun$testBrokerSendAfterChannelClosedUpdatesRequestMetrics$4(this, requestMetrics, kafka$network$SocketServerTest$$totalTimeHistCount$2), TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4());
        } finally {
            socketServer.shutdown();
            metrics.close();
        }
    }

    @Test
    public void testMetricCollectionAfterShutdown() {
        server().shutdown();
        Assert.assertEquals(Predef$.MODULE$.Map().empty(), (Map) ((TraversableLike) ((MapLike) JavaConverters$.MODULE$.mapAsScalaMapConverter(com.yammer.metrics.Metrics.defaultRegistry().allMetrics()).asScala()).filterKeys(new SocketServerTest$$anonfun$8(this)).collect(new SocketServerTest$$anonfun$1(this), Map$.MODULE$.canBuildFrom())).filter(new SocketServerTest$$anonfun$9(this)));
    }

    @Test
    public void testProcessorMetricsTags() {
        Set set = (Set) ((TraversableLike) JavaConverters$.MODULE$.asScalaSetConverter(metrics().metrics().keySet()).asScala()).filter(new SocketServerTest$$anonfun$10(this));
        Assert.assertFalse(set.isEmpty());
        set.foreach(new SocketServerTest$$anonfun$testProcessorMetricsTags$1(this, Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"PLAINTEXT", "TRACE"}))));
        Iterable iterable = (Iterable) ((MapLike) JavaConverters$.MODULE$.mapAsScalaMapConverter(com.yammer.metrics.Metrics.defaultRegistry().allMetrics()).asScala()).filterKeys(new SocketServerTest$$anonfun$11(this)).collect(new SocketServerTest$$anonfun$2(this), Iterable$.MODULE$.canBuildFrom());
        Assert.assertFalse(iterable.isEmpty());
        iterable.foreach(new SocketServerTest$$anonfun$testProcessorMetricsTags$2(this));
    }

    public final RequestChannel.Request kafka$network$SocketServerTest$$sendTwoRequestsReceiveOne$1(SocketServer socketServer, Socket socket, byte[] bArr) {
        sendRequest(socket, bArr, sendRequest$default$3(), false);
        sendRequest(socket, bArr, sendRequest$default$3(), true);
        return receiveRequest(socketServer.requestChannel(), receiveRequest$default$2());
    }

    public final long kafka$network$SocketServerTest$$totalTimeHistCount$1(RequestMetrics requestMetrics) {
        return requestMetrics.totalTimeHist().count();
    }

    public final long kafka$network$SocketServerTest$$totalTimeHistCount$2(RequestMetrics requestMetrics) {
        return requestMetrics.totalTimeHist().count();
    }

    public SocketServerTest() {
        props().put("listeners", "PLAINTEXT://localhost:0,TRACE://localhost:0");
        props().put("num.network.threads", "1");
        props().put("socket.send.buffer.bytes", "300000");
        props().put("socket.receive.buffer.bytes", "300000");
        props().put("queued.max.requests", "50");
        props().put("socket.request.max.bytes", "50");
        props().put("max.connections.per.ip", "5");
        props().put("connections.max.idle.ms", "60000");
        this.config = KafkaConfig$.MODULE$.fromProps(props());
        this.metrics = new Metrics();
        this.credentialProvider = new CredentialProvider(config().saslEnabledMechanisms());
        ((IterableLike) JavaConverters$.MODULE$.asScalaSetConverter(com.yammer.metrics.Metrics.defaultRegistry().allMetrics().keySet()).asScala()).foreach(new SocketServerTest$$anonfun$3(this));
        this.server = new SocketServer(config(), metrics(), Time.SYSTEM, credentialProvider());
        server().startup();
        this.sockets = new ArrayBuffer<>();
    }
}
