package org.apache.bookkeeper.proto;

import com.google.protobuf.ExtensionRegistry;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.nio.NioEventLoopGroup;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.proto.PerChannelBookieClient;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.SafeRunnable;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/proto/TestPerChannelBookieClient.class */
public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(TestPerChannelBookieClient.class);
    ExtensionRegistry extRegistry;
    ClientAuthProvider.Factory authProvider;

    public TestPerChannelBookieClient() throws Exception {
        super(1);
        this.extRegistry = ExtensionRegistry.newInstance();
        this.authProvider = AuthProviderFactoryFactory.newClientAuthProviderFactory(new ClientConfiguration());
    }

    @Test
    public void testConnectCloseRace() throws Exception {
        NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
        OrderedExecutor orderedSafeExecutor = getOrderedSafeExecutor();
        BookieSocketAddress bookie = getBookie(0);
        for (int i = 0; i < 1000; i++) {
            PerChannelBookieClient perChannelBookieClient = new PerChannelBookieClient(orderedSafeExecutor, nioEventLoopGroup, bookie, this.authProvider, this.extRegistry);
            perChannelBookieClient.connectIfNeededAndDoOp(new BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient>() { // from class: org.apache.bookkeeper.proto.TestPerChannelBookieClient.1
                public void operationComplete(int i2, PerChannelBookieClient perChannelBookieClient2) {
                }
            });
            perChannelBookieClient.close();
        }
        nioEventLoopGroup.shutdownGracefully();
        orderedSafeExecutor.shutdown();
    }

    public OrderedExecutor getOrderedSafeExecutor() {
        return OrderedExecutor.newBuilder().name("PCBC").numThreads(1).traceTaskExecution(true).traceTaskWarnTimeMicroSec(TimeUnit.MILLISECONDS.toMicros(100L)).build();
    }

    @Test
    public void testConnectRace() throws Exception {
        BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient> genericCallback = new BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient>() { // from class: org.apache.bookkeeper.proto.TestPerChannelBookieClient.2
            public void operationComplete(int i, PerChannelBookieClient perChannelBookieClient) {
            }
        };
        NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
        OrderedExecutor orderedSafeExecutor = getOrderedSafeExecutor();
        BookieSocketAddress bookie = getBookie(0);
        for (int i = 0; i < 100; i++) {
            PerChannelBookieClient perChannelBookieClient = new PerChannelBookieClient(orderedSafeExecutor, nioEventLoopGroup, bookie, this.authProvider, this.extRegistry);
            for (int i2 = i; i2 < 10; i2++) {
                perChannelBookieClient.connectIfNeededAndDoOp(genericCallback);
            }
            perChannelBookieClient.close();
        }
        nioEventLoopGroup.shutdownGracefully();
        orderedSafeExecutor.shutdown();
    }

    @Test
    public void testDisconnectRace() throws Exception {
        final BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient> genericCallback = new BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient>() { // from class: org.apache.bookkeeper.proto.TestPerChannelBookieClient.3
            public void operationComplete(int i, PerChannelBookieClient perChannelBookieClient) {
            }
        };
        NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
        OrderedExecutor orderedSafeExecutor = getOrderedSafeExecutor();
        final PerChannelBookieClient perChannelBookieClient = new PerChannelBookieClient(orderedSafeExecutor, nioEventLoopGroup, getBookie(0), this.authProvider, this.extRegistry);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean(true);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread thread = new Thread() { // from class: org.apache.bookkeeper.proto.TestPerChannelBookieClient.4
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    if (!countDownLatch.await(10L, TimeUnit.SECONDS)) {
                        TestPerChannelBookieClient.LOG.error("Disconnect thread never started");
                        atomicBoolean.set(true);
                    }
                } catch (InterruptedException e) {
                    TestPerChannelBookieClient.LOG.error("Connect thread interrupted", e);
                    Thread.currentThread().interrupt();
                    atomicBoolean2.set(false);
                }
                for (int i = 0; i < 100000 && atomicBoolean2.get(); i++) {
                    perChannelBookieClient.connectIfNeededAndDoOp(genericCallback);
                }
                atomicBoolean2.set(false);
            }
        };
        Thread thread2 = new Thread() { // from class: org.apache.bookkeeper.proto.TestPerChannelBookieClient.5
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                countDownLatch.countDown();
                while (atomicBoolean2.get()) {
                    perChannelBookieClient.disconnect();
                }
            }
        };
        Thread thread3 = new Thread() { // from class: org.apache.bookkeeper.proto.TestPerChannelBookieClient.6
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (atomicBoolean2.get()) {
                    synchronized (perChannelBookieClient) {
                        PerChannelBookieClient.ConnectionState connectionState = perChannelBookieClient.state;
                        Channel channel = perChannelBookieClient.channel;
                        if ((connectionState == PerChannelBookieClient.ConnectionState.CONNECTED && (channel == null || !channel.isActive())) || (connectionState != PerChannelBookieClient.ConnectionState.CONNECTED && channel != null && channel.isActive())) {
                            TestPerChannelBookieClient.LOG.error("State({}) and channel({}) inconsistent " + channel, connectionState, channel == null ? null : Boolean.valueOf(channel.isActive()));
                            atomicBoolean.set(true);
                            atomicBoolean2.set(false);
                        }
                    }
                }
            }
        };
        thread.start();
        thread2.start();
        thread3.start();
        thread.join();
        thread2.join();
        thread3.join();
        Assert.assertFalse("Failure in threads, check logs", atomicBoolean.get());
        perChannelBookieClient.close();
        nioEventLoopGroup.shutdownGracefully();
        orderedSafeExecutor.shutdown();
    }

    @Test
    public void testRequestCompletesAfterDisconnectRace() throws Exception {
        ServerConfiguration killBookie = killBookie(0);
        Bookie bookie = new Bookie(killBookie) { // from class: org.apache.bookkeeper.proto.TestPerChannelBookieClient.7
            public ByteBuf readEntry(long j, long j2) throws IOException, Bookie.NoLedgerException {
                try {
                    Thread.sleep(3000L);
                    return super.readEntry(j, j2);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Interrupted waiting", e);
                }
            }
        };
        this.bsConfs.add(killBookie);
        this.bs.add(startBookie(killBookie, bookie));
        NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
        final OrderedExecutor orderedSafeExecutor = getOrderedSafeExecutor();
        final PerChannelBookieClient perChannelBookieClient = new PerChannelBookieClient(orderedSafeExecutor, nioEventLoopGroup, getBookie(0), this.authProvider, this.extRegistry);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final BookkeeperInternalCallbacks.ReadEntryCallback readEntryCallback = new BookkeeperInternalCallbacks.ReadEntryCallback() { // from class: org.apache.bookkeeper.proto.TestPerChannelBookieClient.8
            public void readEntryComplete(int i, long j, long j2, ByteBuf byteBuf, Object obj) {
                countDownLatch.countDown();
            }
        };
        perChannelBookieClient.connectIfNeededAndDoOp(new BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient>() { // from class: org.apache.bookkeeper.proto.TestPerChannelBookieClient.9
            public void operationComplete(final int i, PerChannelBookieClient perChannelBookieClient2) {
                if (i != 0) {
                    orderedSafeExecutor.executeOrdered(1, new SafeRunnable() { // from class: org.apache.bookkeeper.proto.TestPerChannelBookieClient.9.1
                        public void safeRun() {
                            readEntryCallback.readEntryComplete(i, 1L, 1L, (ByteBuf) null, (Object) null);
                        }
                    });
                } else {
                    perChannelBookieClient.readEntry(1L, 1L, readEntryCallback, (Object) null, 1, "00000111112222233333".getBytes(), false);
                }
            }
        });
        Thread.sleep(1000L);
        perChannelBookieClient.disconnect();
        perChannelBookieClient.close();
        Assert.assertTrue("Request should have completed", countDownLatch.await(5L, TimeUnit.SECONDS));
        nioEventLoopGroup.shutdownGracefully();
        orderedSafeExecutor.shutdown();
    }
}
