package net.morimekta.providence.thrift.server;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import net.morimekta.providence.PApplicationException;
import net.morimekta.providence.PApplicationExceptionType;
import net.morimekta.providence.PProcessor;
import net.morimekta.providence.PServiceCall;
import net.morimekta.providence.PServiceCallType;
import net.morimekta.providence.serializer.BinarySerializer;
import net.morimekta.providence.serializer.Serializer;
import net.morimekta.providence.thrift.io.FramedBufferOutputStream;
import net.morimekta.providence.util.ServiceCallInstrumentation;
import net.morimekta.util.io.ByteBufferInputStream;
import org.apache.thrift.transport.TFramedTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/morimekta/providence/thrift/server/NonblockingSocketServer.class */
public class NonblockingSocketServer implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(NonblockingSocketServer.class);
    private static final long NS_IN_MILLIS = ServiceCallInstrumentation.NS_IN_MILLIS;
    private final Selector selector;
    private final PProcessor processor;
    private final Serializer serializer;
    private final ServiceCallInstrumentation instrumentation;
    private final ServerSocketChannel serverSocketChannel;
    private final ServerSocket serverSocket;
    private final ExecutorService receiverExecutor;
    private final ExecutorService workerExecutor;
    private final int maxFrameSizeInBytes;

    /* loaded from: input_file:net/morimekta/providence/thrift/server/NonblockingSocketServer$Builder.class */
    public static class Builder {
        private final PProcessor processor;
        private ServiceCallInstrumentation instrumentation;
        private int maxFrameSizeInBytes = 16384000;
        private int readTimeoutInMs = 60000;
        private int backlog = 50;
        private int workerThreads = 10;
        private InetSocketAddress bindAddress = new InetSocketAddress(0);
        private ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("providence-nonblocking-server-%d").setDaemon(true).build();
        private ThreadFactory receiverThreadFactory = this.workerThreadFactory;
        private Serializer serializer = new BinarySerializer();

        public Builder(@Nonnull PProcessor pProcessor) {
            this.processor = pProcessor;
        }

        public Builder withPort(int i) {
            if (i < 0) {
                throw new IllegalArgumentException();
            }
            this.bindAddress = new InetSocketAddress(i);
            return this;
        }

        public Builder withBindAddress(@Nonnull InetSocketAddress inetSocketAddress) {
            this.bindAddress = inetSocketAddress;
            return this;
        }

        public Builder withMaxBacklog(int i) {
            if (i < 0) {
                throw new IllegalArgumentException();
            }
            this.backlog = i;
            return this;
        }

        public Builder withMaxFrameSizeInBytes(int i) {
            if (i < 1024) {
                throw new IllegalArgumentException();
            }
            this.maxFrameSizeInBytes = i;
            return this;
        }

        public Builder withInstrumentation(@Nonnull ServiceCallInstrumentation serviceCallInstrumentation) {
            this.instrumentation = serviceCallInstrumentation;
            return this;
        }

        public Builder withReadTimeout(int i) {
            if (i < 1) {
                throw new IllegalArgumentException();
            }
            this.readTimeoutInMs = i;
            return this;
        }

        public Builder withWorkerThreads(int i) {
            if (i < 1) {
                throw new IllegalArgumentException();
            }
            this.workerThreads = i;
            return this;
        }

        public Builder withWorkerThreadFactory(ThreadFactory threadFactory) {
            this.workerThreadFactory = threadFactory;
            return this;
        }

        public Builder withReceiverThreadFactory(ThreadFactory threadFactory) {
            this.receiverThreadFactory = threadFactory;
            return this;
        }

        public Builder withSerializer(Serializer serializer) {
            this.serializer = serializer;
            return this;
        }

        public NonblockingSocketServer start() {
            return new NonblockingSocketServer(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/morimekta/providence/thrift/server/NonblockingSocketServer$Context.class */
    public class Context {
        final SocketChannel channel;
        final Queue<WriteEntry> writeQueue;
        final FramedBufferOutputStream out;
        final ByteBuffer sizeBuffer;
        final ByteBuffer readBuffer;
        int currentFrameSize;

        private Context(SocketChannel socketChannel) {
            this.channel = socketChannel;
            this.currentFrameSize = 0;
            this.sizeBuffer = ByteBuffer.allocate(4);
            this.readBuffer = ByteBuffer.allocateDirect(NonblockingSocketServer.this.maxFrameSizeInBytes);
            this.out = new FramedBufferOutputStream(socketChannel, NonblockingSocketServer.this.maxFrameSizeInBytes);
            this.writeQueue = new ConcurrentLinkedQueue();
        }

        void close() {
            try {
                this.channel.socket().close();
                this.channel.close();
            } catch (IOException e) {
                NonblockingSocketServer.LOGGER.warn("Exception closing channel: {}", e.getMessage(), e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/morimekta/providence/thrift/server/NonblockingSocketServer$WriteEntry.class */
    public class WriteEntry {
        long startTime;
        PServiceCall call;
        PServiceCall reply;

        WriteEntry(long j, PServiceCall pServiceCall, PServiceCall pServiceCall2) {
            this.startTime = j;
            this.call = pServiceCall;
            this.reply = pServiceCall2;
        }
    }

    public static Builder builder(@Nonnull PProcessor pProcessor) {
        return new Builder(pProcessor);
    }

    public int getPort() {
        if (this.receiverExecutor.isShutdown()) {
            return -1;
        }
        return this.serverSocket.getLocalPort();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws IOException {
        this.receiverExecutor.shutdown();
        this.workerExecutor.shutdown();
        try {
            try {
                this.serverSocket.close();
            } catch (IOException e) {
                e.printStackTrace();
                try {
                    this.workerExecutor.awaitTermination(10L, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e2) {
                    e2.printStackTrace();
                }
                try {
                    this.receiverExecutor.awaitTermination(10L, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e3) {
                    e3.printStackTrace();
                }
                this.receiverExecutor.shutdownNow();
                this.workerExecutor.shutdownNow();
            }
        } finally {
            try {
                this.workerExecutor.awaitTermination(10L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e4) {
                e4.printStackTrace();
            }
            try {
                this.receiverExecutor.awaitTermination(10L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e5) {
                e5.printStackTrace();
            }
            this.receiverExecutor.shutdownNow();
            this.workerExecutor.shutdownNow();
        }
    }

    private NonblockingSocketServer(Builder builder) {
        try {
            this.maxFrameSizeInBytes = builder.maxFrameSizeInBytes;
            this.serializer = builder.serializer;
            this.processor = builder.processor;
            this.instrumentation = builder.instrumentation != null ? builder.instrumentation : (d, pServiceCall, pServiceCall2) -> {
            };
            this.selector = Selector.open();
            this.serverSocketChannel = ServerSocketChannel.open();
            this.serverSocketChannel.configureBlocking(false);
            this.serverSocket = this.serverSocketChannel.socket();
            this.serverSocketChannel.socket().setSoTimeout(builder.readTimeoutInMs);
            this.serverSocket.setReuseAddress(true);
            this.serverSocket.bind(builder.bindAddress, builder.backlog);
            this.receiverExecutor = Executors.newSingleThreadExecutor(builder.receiverThreadFactory);
            this.workerExecutor = Executors.newFixedThreadPool(builder.workerThreads, builder.workerThreadFactory);
            this.serverSocketChannel.register(this.selector, 16);
            this.receiverExecutor.submit(this::selectLoop);
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private synchronized void selectLoop() {
        while (this.serverSocketChannel.isOpen()) {
            try {
                this.selector.select();
                Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
                while (it.hasNext()) {
                    SelectionKey next = it.next();
                    if (next.isValid()) {
                        if (next.isAcceptable()) {
                            accept();
                        } else if (next.isReadable()) {
                            handleRead(next, (Context) next.attachment());
                        } else if (next.isWritable()) {
                            handleWrite(next, (Context) next.attachment());
                        }
                        it.remove();
                    } else {
                        it.remove();
                    }
                }
            } catch (IOException e) {
                LOGGER.error("Exception in thread: " + e.getMessage(), e);
            }
            for (SelectionKey selectionKey : this.selector.keys()) {
                if (selectionKey.channel() != this.serverSocketChannel) {
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    if (!selectionKey.isValid() || !socketChannel.isOpen() || socketChannel.socket().isClosed()) {
                        try {
                            selectionKey.channel().close();
                        } catch (IOException e2) {
                            e2.printStackTrace();
                        }
                        selectionKey.cancel();
                    }
                }
            }
        }
    }

    private void accept() {
        while (true) {
            try {
                SocketChannel accept = this.serverSocketChannel.accept();
                if (accept == null) {
                    return;
                }
                accept.configureBlocking(false);
                accept.register(this.selector, 1, new Context(accept));
            } catch (IOException e) {
                LOGGER.error("Exception when accepting: {}", e.getMessage(), e);
                return;
            }
        }
    }

    private void handleRead(SelectionKey selectionKey, Context context) throws IOException {
        long nanoTime = System.nanoTime();
        if (context.currentFrameSize == 0) {
            try {
                if (context.channel.read(context.sizeBuffer) < 0) {
                    context.close();
                    selectionKey.cancel();
                    return;
                }
                if (context.sizeBuffer.position() < 4) {
                    return;
                }
                context.currentFrameSize = TFramedTransport.decodeFrameSize(context.sizeBuffer.array());
                context.sizeBuffer.rewind();
                if (context.currentFrameSize > this.maxFrameSizeInBytes) {
                    LOGGER.warn("Attempting message of " + context.currentFrameSize + " > " + this.maxFrameSizeInBytes);
                    context.close();
                    selectionKey.cancel();
                    return;
                } else {
                    if (context.currentFrameSize < 1) {
                        LOGGER.warn("Attempting message of " + context.currentFrameSize);
                        context.close();
                        selectionKey.cancel();
                        return;
                    }
                    context.readBuffer.rewind();
                    context.readBuffer.limit(context.currentFrameSize);
                }
            } catch (IOException e) {
                context.close();
                selectionKey.cancel();
                return;
            }
        }
        try {
            if (context.channel.read(context.readBuffer) < 0) {
                LOGGER.warn("Closed connection while reading frame");
                context.close();
                selectionKey.cancel();
            } else {
                if (context.readBuffer.position() < context.readBuffer.limit()) {
                    return;
                }
                try {
                    context.currentFrameSize = 0;
                    context.readBuffer.flip();
                    PServiceCall deserialize = this.serializer.deserialize(new ByteBufferInputStream(context.readBuffer), this.processor.getDescriptor());
                    context.readBuffer.clear();
                    this.workerExecutor.submit(() -> {
                        PServiceCall pServiceCall;
                        try {
                            pServiceCall = this.processor.handleCall(deserialize);
                        } catch (Exception e2) {
                            pServiceCall = new PServiceCall(deserialize.getMethod(), PServiceCallType.EXCEPTION, deserialize.getSequence(), PApplicationException.builder().setMessage(e2.getMessage()).setId(PApplicationExceptionType.INTERNAL_ERROR).initCause(e2).build());
                        }
                        synchronized (context.writeQueue) {
                            context.writeQueue.offer(new WriteEntry(nanoTime, deserialize, pServiceCall));
                            selectionKey.interestOps(selectionKey.interestOps() | 4);
                            this.selector.wakeup();
                        }
                    });
                } catch (IOException e2) {
                    this.instrumentation.onTransportException(e2, (System.nanoTime() - nanoTime) / NS_IN_MILLIS, (PServiceCall) null, (PServiceCall) null);
                }
            }
        } catch (IOException e3) {
            LOGGER.warn("Exception reading frame: {}", e3.getMessage(), e3);
            context.close();
            selectionKey.cancel();
        }
    }

    /* JADX WARN: Finally extract failed */
    private void handleWrite(SelectionKey selectionKey, Context context) {
        while (true) {
            WriteEntry poll = context.writeQueue.poll();
            if (poll == null) {
                break;
            }
            try {
                this.serializer.serialize(context.out, poll.reply);
                try {
                    try {
                        context.out.completeFrame();
                        context.out.flush();
                        double nanoTime = (System.nanoTime() - poll.startTime) / NS_IN_MILLIS;
                        if (0 == 0) {
                            this.instrumentation.onComplete(nanoTime, poll.call, poll.reply);
                        } else {
                            this.instrumentation.onTransportException((Exception) null, nanoTime, poll.call, poll.reply);
                        }
                    } catch (IOException e) {
                        LOGGER.error("Failed to write frame: {}", e.getMessage(), e);
                        context.close();
                        selectionKey.cancel();
                        double nanoTime2 = (System.nanoTime() - poll.startTime) / NS_IN_MILLIS;
                        if (0 == 0) {
                            this.instrumentation.onComplete(nanoTime2, poll.call, poll.reply);
                        } else {
                            this.instrumentation.onTransportException((Exception) null, nanoTime2, poll.call, poll.reply);
                        }
                    }
                } catch (Throwable th) {
                    double nanoTime3 = (System.nanoTime() - poll.startTime) / NS_IN_MILLIS;
                    if (0 == 0) {
                        this.instrumentation.onComplete(nanoTime3, poll.call, poll.reply);
                    } else {
                        this.instrumentation.onTransportException((Exception) null, nanoTime3, poll.call, poll.reply);
                    }
                    throw th;
                }
            } catch (IOException e2) {
                try {
                    try {
                        context.out.completeFrame();
                        context.out.flush();
                        double nanoTime4 = (System.nanoTime() - poll.startTime) / NS_IN_MILLIS;
                        if (e2 == null) {
                            this.instrumentation.onComplete(nanoTime4, poll.call, poll.reply);
                        } else {
                            this.instrumentation.onTransportException(e2, nanoTime4, poll.call, poll.reply);
                        }
                    } catch (IOException e3) {
                        LOGGER.error("Failed to write frame: {}", e3.getMessage(), e3);
                        context.close();
                        selectionKey.cancel();
                        double nanoTime5 = (System.nanoTime() - poll.startTime) / NS_IN_MILLIS;
                        if (e2 == null) {
                            this.instrumentation.onComplete(nanoTime5, poll.call, poll.reply);
                        } else {
                            this.instrumentation.onTransportException(e2, nanoTime5, poll.call, poll.reply);
                        }
                    }
                } finally {
                    double nanoTime6 = (System.nanoTime() - poll.startTime) / NS_IN_MILLIS;
                    if (e2 == null) {
                        this.instrumentation.onComplete(nanoTime6, poll.call, poll.reply);
                    } else {
                        this.instrumentation.onTransportException(e2, nanoTime6, poll.call, poll.reply);
                    }
                }
            } catch (Throwable th2) {
                try {
                    try {
                        context.out.completeFrame();
                        context.out.flush();
                        double nanoTime7 = (System.nanoTime() - poll.startTime) / NS_IN_MILLIS;
                        if (0 == 0) {
                            this.instrumentation.onComplete(nanoTime7, poll.call, poll.reply);
                        } else {
                            this.instrumentation.onTransportException((Exception) null, nanoTime7, poll.call, poll.reply);
                        }
                    } catch (IOException e4) {
                        LOGGER.error("Failed to write frame: {}", e4.getMessage(), e4);
                        context.close();
                        selectionKey.cancel();
                        double nanoTime8 = (System.nanoTime() - poll.startTime) / NS_IN_MILLIS;
                        if (0 == 0) {
                            this.instrumentation.onComplete(nanoTime8, poll.call, poll.reply);
                        } else {
                            this.instrumentation.onTransportException((Exception) null, nanoTime8, poll.call, poll.reply);
                        }
                    }
                    throw th2;
                } catch (Throwable th3) {
                    double nanoTime9 = (System.nanoTime() - poll.startTime) / NS_IN_MILLIS;
                    if (0 == 0) {
                        this.instrumentation.onComplete(nanoTime9, poll.call, poll.reply);
                    } else {
                        this.instrumentation.onTransportException((Exception) null, nanoTime9, poll.call, poll.reply);
                    }
                    throw th3;
                }
            }
        }
        synchronized (context.writeQueue) {
            if (context.writeQueue.isEmpty()) {
                selectionKey.interestOps(selectionKey.interestOps() & (-5));
            }
        }
    }
}
