package org.apache.spark.rpc.netty;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.spark.SparkFunSuite;
import org.apache.spark.rpc.RpcAddress;
import org.apache.spark.rpc.TestRpcEndpoint;
import org.mockito.Mockito;
import org.scalactic.Bool$;
import org.scalactic.Equality$;
import org.scalactic.Prettifier$;
import org.scalactic.TripleEqualsSupport;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import scala.Predef$;
import scala.collection.immutable.Nil$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: InboxSuite.scala */
@ScalaSignature(bytes = "\u0006\u0005]1AAA\u0002\u0001\u001d!)1\u0003\u0001C\u0001)\tQ\u0011J\u001c2pqN+\u0018\u000e^3\u000b\u0005\u0011)\u0011!\u00028fiRL(B\u0001\u0004\b\u0003\r\u0011\bo\u0019\u0006\u0003\u0011%\tQa\u001d9be.T!AC\u0006\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005a\u0011aA8sO\u000e\u00011C\u0001\u0001\u0010!\t\u0001\u0012#D\u0001\b\u0013\t\u0011rAA\u0007Ta\u0006\u00148NR;o'VLG/Z\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003U\u0001\"A\u0006\u0001\u000e\u0003\r\u0001")
/* loaded from: input_file:org/apache/spark/rpc/netty/InboxSuite.class */
public class InboxSuite extends SparkFunSuite {
    public InboxSuite() {
        test("post", Nil$.MODULE$, () -> {
            TestRpcEndpoint testRpcEndpoint = new TestRpcEndpoint();
            Dispatcher dispatcher = (Dispatcher) Mockito.mock(Dispatcher.class);
            Inbox inbox = new Inbox("name", testRpcEndpoint);
            inbox.post(new OneWayMessage((RpcAddress) null, "hi"));
            inbox.process(dispatcher);
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(inbox, "isEmpty", inbox.isEmpty(), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InboxSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 38));
            testRpcEndpoint.verifySingleReceiveMessage("hi");
            inbox.stop();
            inbox.process(dispatcher);
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(inbox, "isEmpty", inbox.isEmpty(), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InboxSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 44));
            testRpcEndpoint.verifyStarted();
            testRpcEndpoint.verifyStopped();
        }, new Position("InboxSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 30));
        test("post: with reply", Nil$.MODULE$, () -> {
            TestRpcEndpoint testRpcEndpoint = new TestRpcEndpoint();
            Dispatcher dispatcher = (Dispatcher) Mockito.mock(Dispatcher.class);
            Inbox inbox = new Inbox("name", testRpcEndpoint);
            inbox.post(new RpcMessage((RpcAddress) null, "hi", (NettyRpcCallContext) null));
            inbox.process(dispatcher);
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(inbox, "isEmpty", inbox.isEmpty(), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InboxSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 57));
            testRpcEndpoint.verifySingleReceiveAndReplyMessage("hi");
        }, new Position("InboxSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 49));
        test("post: multiple threads", Nil$.MODULE$, () -> {
            final TestRpcEndpoint testRpcEndpoint = new TestRpcEndpoint();
            Dispatcher dispatcher = (Dispatcher) Mockito.mock(Dispatcher.class);
            final AtomicInteger atomicInteger = new AtomicInteger(0);
            final InboxSuite inboxSuite = null;
            Inbox inbox = new Inbox(inboxSuite, testRpcEndpoint, atomicInteger) { // from class: org.apache.spark.rpc.netty.InboxSuite$$anon$1
                private final AtomicInteger numDroppedMessages$1;

                public void onDrop(InboxMessage inboxMessage) {
                    this.numDroppedMessages$1.incrementAndGet();
                }

                {
                    this.numDroppedMessages$1 = atomicInteger;
                }
            };
            CountDownLatch countDownLatch = new CountDownLatch(10);
            RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 10).foreach$mVc$sp(i -> {
                final InboxSuite inboxSuite2 = null;
                new Thread(inboxSuite2, inbox, countDownLatch) { // from class: org.apache.spark.rpc.netty.InboxSuite$$anon$2
                    private final Inbox inbox$1;
                    private final CountDownLatch exitLatch$1;

                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 100).foreach$mVc$sp(i -> {
                            this.inbox$1.post(new OneWayMessage((RpcAddress) null, "hi"));
                            BoxedUnit boxedUnit = BoxedUnit.UNIT;
                        });
                        this.exitLatch$1.countDown();
                    }

                    {
                        this.inbox$1 = inbox;
                        this.exitLatch$1 = countDownLatch;
                    }
                }.start();
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            });
            inbox.process(dispatcher);
            inbox.stop();
            inbox.process(dispatcher);
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(inbox, "isEmpty", inbox.isEmpty(), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InboxSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 92));
            countDownLatch.await(30L, TimeUnit.SECONDS);
            TripleEqualsSupport.Equalizer convertToEqualizer = this.convertToEqualizer(BoxesRunTime.boxToInteger(1000));
            int numReceiveMessages = testRpcEndpoint.numReceiveMessages() + atomicInteger.get();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", BoxesRunTime.boxToInteger(numReceiveMessages), convertToEqualizer.$eq$eq$eq(BoxesRunTime.boxToInteger(numReceiveMessages), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InboxSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 96));
            testRpcEndpoint.verifyStarted();
            testRpcEndpoint.verifyStopped();
        }, new Position("InboxSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 62));
        test("post: Associated", Nil$.MODULE$, () -> {
            TestRpcEndpoint testRpcEndpoint = new TestRpcEndpoint();
            Dispatcher dispatcher = (Dispatcher) Mockito.mock(Dispatcher.class);
            RpcAddress rpcAddress = new RpcAddress("localhost", 11111);
            Inbox inbox = new Inbox("name", testRpcEndpoint);
            inbox.post(new RemoteProcessConnected(rpcAddress));
            inbox.process(dispatcher);
            testRpcEndpoint.verifySingleOnConnectedMessage(rpcAddress);
        }, new Position("InboxSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 101));
        test("post: Disassociated", Nil$.MODULE$, () -> {
            TestRpcEndpoint testRpcEndpoint = new TestRpcEndpoint();
            Dispatcher dispatcher = (Dispatcher) Mockito.mock(Dispatcher.class);
            RpcAddress rpcAddress = new RpcAddress("localhost", 11111);
            Inbox inbox = new Inbox("name", testRpcEndpoint);
            inbox.post(new RemoteProcessDisconnected(rpcAddress));
            inbox.process(dispatcher);
            testRpcEndpoint.verifySingleOnDisconnectedMessage(rpcAddress);
        }, new Position("InboxSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 113));
        test("post: AssociationError", Nil$.MODULE$, () -> {
            TestRpcEndpoint testRpcEndpoint = new TestRpcEndpoint();
            Dispatcher dispatcher = (Dispatcher) Mockito.mock(Dispatcher.class);
            RpcAddress rpcAddress = new RpcAddress("localhost", 11111);
            RuntimeException runtimeException = new RuntimeException("Oops");
            Inbox inbox = new Inbox("name", testRpcEndpoint);
            inbox.post(new RemoteProcessConnectionError(runtimeException, rpcAddress));
            inbox.process(dispatcher);
            testRpcEndpoint.verifySingleOnNetworkErrorMessage(runtimeException, rpcAddress);
        }, new Position("InboxSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 126));
        test("SPARK-32738: should reduce the number of active threads when fatal error happens", Nil$.MODULE$, () -> {
            TestRpcEndpoint testRpcEndpoint = (TestRpcEndpoint) Mockito.mock(TestRpcEndpoint.class);
            Mockito.when(testRpcEndpoint.receive()).thenThrow(new Throwable[]{new OutOfMemoryError()});
            Dispatcher dispatcher = (Dispatcher) Mockito.mock(Dispatcher.class);
            Inbox inbox = new Inbox("name", testRpcEndpoint);
            inbox.post(new OneWayMessage((RpcAddress) null, "hi"));
            this.intercept(() -> {
                inbox.process(dispatcher);
            }, ClassTag$.MODULE$.apply(OutOfMemoryError.class), new Position("InboxSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 147));
            int numActiveThreads = inbox.getNumActiveThreads();
            return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(BoxesRunTime.boxToInteger(numActiveThreads), "==", BoxesRunTime.boxToInteger(0), numActiveThreads == 0, Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InboxSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 150));
        }, new Position("InboxSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 140));
    }
}
