package tech.ytsaurus.client.rpc;

import com.google.protobuf.CodedInputStream;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.ScheduledFuture;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ytsaurus.client.bus.Bus;
import tech.ytsaurus.client.bus.BusConnector;
import tech.ytsaurus.client.bus.BusDeliveryTracking;
import tech.ytsaurus.client.bus.BusListener;
import tech.ytsaurus.core.GUID;
import tech.ytsaurus.rpc.TRequestCancelationHeader;
import tech.ytsaurus.rpc.TRequestHeader;
import tech.ytsaurus.rpc.TResponseHeader;
import tech.ytsaurus.rpc.TStreamingFeedbackHeader;
import tech.ytsaurus.rpc.TStreamingParameters;
import tech.ytsaurus.rpc.TStreamingPayloadHeader;

/* loaded from: input_file:tech/ytsaurus/client/rpc/DefaultRpcBusClient.class */
public class DefaultRpcBusClient implements RpcClient {
    private static final Logger logger = LoggerFactory.getLogger(DefaultRpcBusClient.class);
    private final BusConnector busConnector;
    private final SocketAddress address;
    private final String addressString;
    private final Lock sessionLock;
    private Session currentSession;
    private boolean closed;
    private final String destinationName;
    private final String name;
    private final DefaultRpcBusClientMetricsHolder metricsHolder;
    private final AtomicInteger referenceCounter;
    private final Statistics stats;

    /* loaded from: input_file:tech/ytsaurus/client/rpc/DefaultRpcBusClient$Request.class */
    private static class Request extends RequestBase {
        protected final RpcClientResponseHandler handler;

        Request(RpcClient rpcClient, Session session, RpcRequest<?> rpcRequest, RpcClientResponseHandler rpcClientResponseHandler, RpcOptions rpcOptions, Statistics statistics) {
            super(rpcClient, session, rpcRequest, rpcOptions, statistics);
            this.handler = (RpcClientResponseHandler) Objects.requireNonNull(rpcClientResponseHandler);
        }

        @Override // tech.ytsaurus.client.rpc.DefaultRpcBusClient.RequestBase
        public void handleError(Throwable th) {
            this.handler.onError(th);
        }

        @Override // tech.ytsaurus.client.rpc.DefaultRpcBusClient.RequestBase
        void handleCancellation(CancellationException cancellationException) {
            this.handler.onCancel(cancellationException);
        }

        @Override // tech.ytsaurus.client.rpc.DefaultRpcBusClient.RequestBase
        public void response(TResponseHeader tResponseHeader, List<byte[]> list) {
            super.response(tResponseHeader, list);
            try {
                try {
                    this.handler.onResponse(this.sender, tResponseHeader, list);
                } catch (Throwable th) {
                    this.handler.onError(th);
                }
                this.session.unregister(this);
            } catch (Throwable th2) {
                this.session.unregister(this);
                throw th2;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:tech/ytsaurus/client/rpc/DefaultRpcBusClient$RequestBase.class */
    public static abstract class RequestBase implements RpcClientRequestControl {
        protected final Lock lock = new ReentrantLock();
        protected RequestState state = RequestState.INITIALIZING;
        protected final RpcClient sender;
        protected final Session session;
        protected final RpcRequest<?> rpcRequest;
        protected final TRequestHeader.Builder requestHeader;
        protected final GUID requestId;
        protected Instant started;
        protected final Statistics stat;
        protected final RpcOptions options;
        private final String description;
        protected ScheduledFuture<?> timeoutFuture;
        private ScheduledFuture<?> ackTimeoutFuture;

        RequestBase(RpcClient rpcClient, Session session, RpcRequest<?> rpcRequest, RpcOptions rpcOptions, Statistics statistics) {
            this.sender = (RpcClient) Objects.requireNonNull(rpcClient);
            this.session = (Session) Objects.requireNonNull(session);
            Objects.requireNonNull(rpcRequest);
            this.rpcRequest = (RpcRequest) Objects.requireNonNull(rpcRequest);
            this.requestHeader = rpcRequest.header.toBuilder();
            this.requestId = RpcUtil.fromProto(rpcRequest.header.getRequestId());
            this.stat = statistics;
            this.options = (RpcOptions) Objects.requireNonNull(rpcOptions);
            this.description = String.format("%s/%s/%s", this.requestHeader.getService(), this.requestHeader.getMethod(), this.requestId);
        }

        public String toString() {
            return this.description;
        }

        public void response(TResponseHeader tResponseHeader, List<byte[]> list) {
            Duration between = Duration.between(this.started, Instant.now());
            this.stat.updateResponse(between.toMillis());
            DefaultRpcBusClient.logger.debug("Request `{}` finished in {} ms Session: {}", new Object[]{this, Long.valueOf(between.toMillis()), this.session});
            this.lock.lock();
            try {
                if (this.state == RequestState.INITIALIZING) {
                    DefaultRpcBusClient.logger.error("Received response to {} before sending the request", this);
                    this.lock.unlock();
                } else {
                    if (this.state == RequestState.FINISHED) {
                        return;
                    }
                    finishLocked();
                    this.lock.unlock();
                }
            } finally {
                this.lock.unlock();
            }
        }

        public void streamingPayload(TStreamingPayloadHeader tStreamingPayloadHeader, List<byte[]> list) {
            throw new IllegalArgumentException();
        }

        public void streamingFeedback(TStreamingFeedbackHeader tStreamingFeedbackHeader, List<byte[]> list) {
            throw new IllegalArgumentException();
        }

        abstract void handleError(Throwable th);

        void handleAcknowledgement() {
            DefaultRpcBusClient.logger.trace("Ack {}", this.requestId);
            this.lock.lock();
            try {
                if (this.ackTimeoutFuture != null) {
                    this.ackTimeoutFuture.cancel(true);
                    this.ackTimeoutFuture = null;
                }
            } finally {
                this.lock.unlock();
            }
        }

        abstract void handleCancellation(CancellationException cancellationException);

        /* JADX WARN: Finally extract failed */
        public void start() {
            try {
                this.lock.lock();
                try {
                    if (this.state != RequestState.INITIALIZING) {
                        throw new IllegalStateException("Request has been started already");
                    }
                    this.state = RequestState.SENDING;
                    this.lock.unlock();
                    this.started = Instant.now();
                    this.requestHeader.setStartTime(RpcUtil.instantToMicros(this.started));
                    DefaultRpcBusClient.logger.debug("Sending request `{}` Session: {}", this, this.session);
                    this.session.register(this);
                    BusDeliveryTracking busDeliveryTracking = this.options.getDefaultRequestAck() ? BusDeliveryTracking.FULL : BusDeliveryTracking.SENT;
                    TRequestHeader build = this.requestHeader.build();
                    RpcRequestsTestingController rpcRequestsTestingController = this.options.getTestingOptions().getRpcRequestsTestingController();
                    if (rpcRequestsTestingController != null) {
                        rpcRequestsTestingController.addRequest(build, this.rpcRequest.body);
                    }
                    this.session.bus.send(this.rpcRequest.serialize(build), busDeliveryTracking).whenComplete((r9, th) -> {
                        Duration between = Duration.between(this.started, Instant.now());
                        this.stat.updateAck(between.toMillis());
                        if (th != null) {
                            error(th);
                            DefaultRpcBusClient.logger.debug("({}) request `{}` acked in {} ms with error `{}`", new Object[]{this.session, this, Long.valueOf(between.toMillis()), th.toString()});
                        } else {
                            ack();
                            DefaultRpcBusClient.logger.trace("Request `{}` acked in {} ms", this, Long.valueOf(between.toMillis()));
                        }
                    });
                    Duration timeout = RpcRequest.getTimeout(this.requestHeader);
                    Duration acknowledgementTimeout = this.options.getAcknowledgementTimeout();
                    this.lock.lock();
                    if (timeout != null) {
                        try {
                            if (this.state != RequestState.FINISHED) {
                                this.timeoutFuture = this.session.eventLoop().schedule(this::handleTimeout, timeout.toNanos(), TimeUnit.NANOSECONDS);
                            }
                        } catch (Throwable th2) {
                            throw th2;
                        }
                    }
                    if (acknowledgementTimeout != null && this.options.getDefaultRequestAck() && this.state.step < RequestState.ACKED.step) {
                        this.ackTimeoutFuture = this.session.eventLoop().schedule(this::onAcknowledgementTimeout, acknowledgementTimeout.toNanos(), TimeUnit.NANOSECONDS);
                    }
                    this.lock.unlock();
                } finally {
                    this.lock.unlock();
                }
            } catch (Throwable th3) {
                error(th3);
            }
        }

        protected void finishLocked() {
            this.state = RequestState.FINISHED;
            if (this.timeoutFuture != null) {
                this.timeoutFuture.cancel(false);
                this.timeoutFuture = null;
            }
            if (this.ackTimeoutFuture != null) {
                this.ackTimeoutFuture.cancel(false);
                this.ackTimeoutFuture = null;
            }
        }

        public CompletableFuture<Void> sendCancellation() {
            TRequestCancelationHeader.Builder newBuilder = TRequestCancelationHeader.newBuilder();
            newBuilder.setRequestId(this.requestHeader.getRequestId());
            newBuilder.setService(this.requestHeader.getService());
            newBuilder.setMethod(this.requestHeader.getMethod());
            if (this.requestHeader.hasRealmId()) {
                newBuilder.setRealmId(this.requestHeader.getRealmId());
            }
            DefaultRpcBusClient.logger.debug("Canceling request {}", this);
            return this.session.bus.send(RpcUtil.createCancelMessage(newBuilder.build()), BusDeliveryTracking.NONE);
        }

        public void handleTimeout() {
            timeout(new TimeoutException("Request timed out"));
        }

        @Override // tech.ytsaurus.client.rpc.RpcClientRequestControl
        public boolean cancel() {
            this.lock.lock();
            try {
                if (this.state == RequestState.INITIALIZING) {
                    throw new IllegalStateException("Request has not been started");
                }
                if (this.state == RequestState.FINISHED) {
                    return false;
                }
                finishLocked();
                try {
                    handleCancellation(new CancellationException());
                } finally {
                    if (this.session.unregister(this)) {
                        sendCancellation();
                    }
                }
            } finally {
                this.lock.unlock();
            }
        }

        public void ack() {
            this.lock.lock();
            try {
                if (this.state != RequestState.SENDING) {
                    return;
                }
                this.state = RequestState.ACKED;
                try {
                    handleAcknowledgement();
                } catch (Throwable th) {
                    error(th);
                }
            } finally {
                this.lock.unlock();
            }
        }

        public void error(Throwable th) {
            this.stat.incError();
            this.lock.lock();
            try {
                if (this.state == RequestState.FINISHED) {
                    return;
                }
                finishLocked();
                try {
                    handleError(th);
                } finally {
                    this.session.unregister(this);
                }
            } finally {
                this.lock.unlock();
            }
        }

        private void timeout(TimeoutException timeoutException) {
            DefaultRpcBusClient.logger.warn("{}; RequestId: {}", timeoutException.toString(), this.requestId);
            sendCancellation();
            error(timeoutException);
        }

        private void onAcknowledgementTimeout() {
            this.lock.lock();
            try {
                if (this.state.step >= RequestState.ACKED.step) {
                    return;
                }
                timeout(new AcknowledgementTimeoutException(String.format("Request acknowledgement timed out; requestId: %s; proxy: %s", this.requestId, this.sender.getAddressString())));
            } finally {
                this.lock.unlock();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:tech/ytsaurus/client/rpc/DefaultRpcBusClient$RequestState.class */
    public enum RequestState {
        INITIALIZING(0),
        SENDING(1),
        ACKED(2),
        FINISHED(3);

        final int step;

        RequestState(int i) {
            this.step = i;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:tech/ytsaurus/client/rpc/DefaultRpcBusClient$Session.class */
    public class Session implements BusListener {
        private final Bus bus;
        private final ConcurrentHashMap<GUID, RequestBase> activeRequests = new ConcurrentHashMap<>();
        private final String sessionName;

        Session() {
            this.sessionName = String.format("Session(%s@%s)", DefaultRpcBusClient.this.addressString, Integer.toHexString(hashCode()));
            this.bus = DefaultRpcBusClient.this.busConnector.connect(DefaultRpcBusClient.this.address, this);
        }

        public void start() {
            this.bus.disconnected().addListener(future -> {
                DefaultRpcBusClient.this.discardSession(this);
                failPending(future.isSuccess() ? new ClosedChannelException() : future.cause());
            });
        }

        public void stop() {
            this.bus.close();
        }

        public EventLoop eventLoop() {
            return this.bus.eventLoop();
        }

        @Override // tech.ytsaurus.client.bus.BusListener
        public void onMessage(Bus bus, List<byte[]> list) {
            if (list.size() < 1) {
                throw new IllegalStateException("Received an empty message");
            }
            byte[] bArr = list.get(0);
            try {
                RpcMessageType fromValue = RpcMessageType.fromValue(ByteBuffer.wrap(bArr).order(ByteOrder.LITTLE_ENDIAN).getInt());
                switch (fromValue) {
                    case RESPONSE:
                        try {
                            TResponseHeader parseFrom = TResponseHeader.parseFrom(CodedInputStream.newInstance(bArr, 4, bArr.length - 4));
                            GUID fromProto = RpcUtil.fromProto(parseFrom.getRequestId());
                            RequestBase requestBase = this.activeRequests.get(fromProto);
                            if (requestBase == null) {
                                DefaultRpcBusClient.logger.debug("Received response to an unknown request {}", fromProto);
                                return;
                            } else if (!parseFrom.hasError() || parseFrom.getError().getCode() == 0) {
                                requestBase.response(parseFrom, list.subList(1, list.size()));
                                return;
                            } else {
                                requestBase.error(new RpcError(parseFrom.getError()));
                                return;
                            }
                        } catch (IOException | RuntimeException e) {
                            throw new IllegalStateException("Failed to parse message header", e);
                        }
                    case STREAMING_PAYLOAD:
                        try {
                            TStreamingPayloadHeader parseFrom2 = TStreamingPayloadHeader.parseFrom(CodedInputStream.newInstance(bArr, 4, bArr.length - 4));
                            GUID fromProto2 = RpcUtil.fromProto(parseFrom2.getRequestId());
                            RequestBase requestBase2 = this.activeRequests.get(fromProto2);
                            if (requestBase2 == null) {
                                DefaultRpcBusClient.logger.debug("Received response to an unknown request {}", fromProto2);
                                return;
                            } else {
                                requestBase2.streamingPayload(parseFrom2, list.subList(1, list.size()));
                                return;
                            }
                        } catch (IOException | RuntimeException e2) {
                            throw new IllegalStateException("Failed to parse message header", e2);
                        }
                    case STREAMING_FEEDBACK:
                        try {
                            TStreamingFeedbackHeader parseFrom3 = TStreamingFeedbackHeader.parseFrom(CodedInputStream.newInstance(bArr, 4, bArr.length - 4));
                            GUID fromProto3 = RpcUtil.fromProto(parseFrom3.getRequestId());
                            RequestBase requestBase3 = this.activeRequests.get(fromProto3);
                            if (requestBase3 == null) {
                                DefaultRpcBusClient.logger.debug("Received response to an unknown request {}", fromProto3);
                                return;
                            } else {
                                requestBase3.streamingFeedback(parseFrom3, list.subList(1, list.size()));
                                return;
                            }
                        } catch (IOException | RuntimeException e3) {
                            throw new IllegalStateException("Failed to parse message header", e3);
                        }
                    case HANDSHAKE:
                        return;
                    default:
                        throw new IllegalStateException("Unexpected " + fromValue + " message in a client connection");
                }
            } catch (RuntimeException e4) {
                throw new IllegalStateException("Failed to read message type", e4);
            }
        }

        @Override // tech.ytsaurus.client.bus.BusListener
        public void onConnect(Bus bus) {
        }

        @Override // tech.ytsaurus.client.bus.BusListener
        public void onDisconnect(Bus bus) {
        }

        @Override // tech.ytsaurus.client.bus.BusListener
        public void onException(Bus bus, Throwable th) {
        }

        private void failPending(Throwable th) {
            Iterator<RequestBase> it = this.activeRequests.values().iterator();
            while (it.hasNext()) {
                try {
                    it.next().error(th);
                } catch (Throwable th2) {
                    DefaultRpcBusClient.logger.debug("Failed while failing an active request", th2);
                }
                it.remove();
            }
        }

        public void register(RequestBase requestBase) {
            this.activeRequests.put(requestBase.requestId, requestBase);
        }

        public boolean unregister(RequestBase requestBase) {
            DefaultRpcBusClient.logger.trace("Unregister request {}", requestBase.requestId);
            return this.activeRequests.remove(requestBase.requestId, requestBase);
        }

        public String toString() {
            return this.sessionName;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:tech/ytsaurus/client/rpc/DefaultRpcBusClient$Statistics.class */
    public final class Statistics {
        private final String name;

        Statistics(String str) {
            this.name = str;
        }

        void updateAck(long j) {
            DefaultRpcBusClient.this.metricsHolder.updateAck(this.name, j);
        }

        void updateResponse(long j) {
            DefaultRpcBusClient.this.metricsHolder.updateResponse(this.name, j);
        }

        void incError() {
            DefaultRpcBusClient.this.metricsHolder.incError();
        }
    }

    /* loaded from: input_file:tech/ytsaurus/client/rpc/DefaultRpcBusClient$StreamingRequest.class */
    private static class StreamingRequest extends RequestBase implements RpcClientStreamControl {
        final RpcStreamConsumer consumer;
        final AtomicInteger sequenceNumber;
        Duration readTimeout;
        Duration writeTimeout;
        ScheduledFuture<?> readTimeoutFuture;
        ScheduledFuture<?> writeTimeoutFuture;

        StreamingRequest(RpcClient rpcClient, Session session, RpcRequest<?> rpcRequest, RpcStreamConsumer rpcStreamConsumer, RpcOptions rpcOptions, Statistics statistics) {
            super(rpcClient, session, rpcRequest, rpcOptions, statistics);
            this.sequenceNumber = new AtomicInteger(0);
            this.readTimeoutFuture = null;
            this.writeTimeoutFuture = null;
            this.consumer = rpcStreamConsumer;
            this.readTimeout = rpcOptions.getStreamingReadTimeout();
            this.writeTimeout = rpcOptions.getStreamingWriteTimeout();
            resetWriteTimeout();
            resetReadTimeout();
            setStreamingOptions();
        }

        @Override // tech.ytsaurus.client.rpc.DefaultRpcBusClient.RequestBase
        public void start() {
            super.start();
            this.consumer.onStartStream(this);
        }

        private void setStreamingOptions() {
            this.requestHeader.clearTimeout();
            TStreamingParameters.Builder newBuilder = TStreamingParameters.newBuilder();
            if (this.readTimeout != null) {
                newBuilder.setReadTimeout(RpcUtil.durationToMicros(this.readTimeout));
            }
            if (this.writeTimeout != null) {
                newBuilder.setWriteTimeout(RpcUtil.durationToMicros(this.writeTimeout));
            }
            newBuilder.setWindowSize(this.options.getStreamingWindowSize());
            this.requestHeader.setServerAttachmentsStreamingParameters(newBuilder);
        }

        @Override // tech.ytsaurus.client.rpc.DefaultRpcBusClient.RequestBase
        void handleError(Throwable th) {
            DefaultRpcBusClient.logger.info("Error in RPC protocol: `{}`", th.getMessage(), th);
            this.lock.lock();
            try {
                this.consumer.onError(th);
            } catch (Throwable th2) {
                DefaultRpcBusClient.logger.error("Error", th2);
            } finally {
                this.lock.unlock();
            }
        }

        @Override // tech.ytsaurus.client.rpc.DefaultRpcBusClient.RequestBase
        void handleCancellation(CancellationException cancellationException) {
            this.lock.lock();
            try {
                this.consumer.onCancel(cancellationException);
            } catch (Throwable th) {
                DefaultRpcBusClient.logger.error("Error", th);
            } finally {
                this.lock.unlock();
            }
        }

        private void clearReadTimeout() {
            if (this.readTimeoutFuture != null) {
                this.readTimeoutFuture.cancel(false);
            }
            this.readTimeout = null;
        }

        private void clearWriteTimeout() {
            if (this.writeTimeoutFuture != null) {
                this.writeTimeoutFuture.cancel(false);
            }
            this.writeTimeout = null;
        }

        private void resetReadTimeout() {
            if (this.readTimeout != null) {
                if (this.readTimeoutFuture != null) {
                    this.readTimeoutFuture.cancel(false);
                }
                this.readTimeoutFuture = this.session.eventLoop().schedule(this::handleTimeout, this.readTimeout.toNanos(), TimeUnit.NANOSECONDS);
            }
        }

        private void resetWriteTimeout() {
            if (this.writeTimeout != null) {
                if (this.writeTimeoutFuture != null) {
                    this.writeTimeoutFuture.cancel(false);
                }
                this.writeTimeoutFuture = this.session.eventLoop().schedule(this::handleTimeout, this.writeTimeout.toNanos(), TimeUnit.NANOSECONDS);
            }
        }

        @Override // tech.ytsaurus.client.rpc.RpcClientStreamControl
        public Compression getExpectedPayloadCompression() {
            return this.requestHeader.hasRequestCodec() ? Compression.fromValue(this.requestHeader.getRequestCodec()) : Compression.None;
        }

        @Override // tech.ytsaurus.client.rpc.DefaultRpcBusClient.RequestBase
        public void streamingPayload(TStreamingPayloadHeader tStreamingPayloadHeader, List<byte[]> list) {
            this.stat.updateResponse(Duration.between(this.started, Instant.now()).toMillis());
            this.lock.lock();
            try {
                if (this.state == RequestState.INITIALIZING) {
                    DefaultRpcBusClient.logger.error("Received response to {} before sending the request", this);
                    resetWriteTimeout();
                    this.lock.unlock();
                    return;
                }
                if (this.state == RequestState.FINISHED) {
                    return;
                }
                resetWriteTimeout();
                this.lock.unlock();
                try {
                    try {
                        this.lock.lock();
                        this.consumer.onPayload(this.sender, tStreamingPayloadHeader, list);
                        this.lock.unlock();
                    } catch (Throwable th) {
                        handleError(th);
                        this.session.unregister(this);
                        this.lock.unlock();
                    }
                } catch (Throwable th2) {
                    this.lock.unlock();
                    throw th2;
                }
            } finally {
                resetWriteTimeout();
                this.lock.unlock();
            }
        }

        @Override // tech.ytsaurus.client.rpc.DefaultRpcBusClient.RequestBase
        public void streamingFeedback(TStreamingFeedbackHeader tStreamingFeedbackHeader, List<byte[]> list) {
            this.stat.updateResponse(Duration.between(this.started, Instant.now()).toMillis());
            this.lock.lock();
            try {
                if (this.state == RequestState.INITIALIZING) {
                    DefaultRpcBusClient.logger.error("Received response to {} before sending the request", this);
                    resetReadTimeout();
                    this.lock.unlock();
                    return;
                }
                if (this.state == RequestState.FINISHED) {
                    return;
                }
                resetReadTimeout();
                this.lock.unlock();
                try {
                    try {
                        this.lock.lock();
                        this.consumer.onFeedback(this.sender, tStreamingFeedbackHeader, list);
                        this.lock.unlock();
                    } catch (Throwable th) {
                        handleError(th);
                        this.session.unregister(this);
                        this.lock.unlock();
                    }
                } catch (Throwable th2) {
                    this.lock.unlock();
                    throw th2;
                }
            } finally {
                resetReadTimeout();
                this.lock.unlock();
            }
        }

        @Override // tech.ytsaurus.client.rpc.DefaultRpcBusClient.RequestBase
        protected void finishLocked() {
            clearReadTimeout();
            clearWriteTimeout();
            this.state = RequestState.FINISHED;
        }

        @Override // tech.ytsaurus.client.rpc.DefaultRpcBusClient.RequestBase
        public void response(TResponseHeader tResponseHeader, List<byte[]> list) {
            super.response(tResponseHeader, list);
            try {
                try {
                    this.lock.lock();
                    this.consumer.onResponse(this.sender, tResponseHeader, list);
                    this.session.unregister(this);
                    this.lock.unlock();
                } catch (Throwable th) {
                    handleError(th);
                    this.session.unregister(this);
                    this.lock.unlock();
                }
            } catch (Throwable th2) {
                this.session.unregister(this);
                this.lock.unlock();
                throw th2;
            }
        }

        @Override // tech.ytsaurus.client.rpc.RpcClientStreamControl
        public CompletableFuture<Void> feedback(long j) {
            TStreamingFeedbackHeader.Builder newBuilder = TStreamingFeedbackHeader.newBuilder();
            newBuilder.setRequestId(this.requestHeader.getRequestId());
            newBuilder.setService(this.requestHeader.getService());
            newBuilder.setMethod(this.requestHeader.getMethod());
            if (this.requestHeader.hasRealmId()) {
                newBuilder.setRealmId(this.requestHeader.getRealmId());
            }
            newBuilder.setReadPosition(j);
            return this.session.bus.send(Collections.singletonList(RpcUtil.createMessageHeader(RpcMessageType.STREAMING_FEEDBACK, newBuilder.build())), BusDeliveryTracking.NONE);
        }

        @Override // tech.ytsaurus.client.rpc.RpcClientStreamControl
        public CompletableFuture<Void> sendEof() {
            TStreamingPayloadHeader.Builder newBuilder = TStreamingPayloadHeader.newBuilder();
            newBuilder.setRequestId(this.requestHeader.getRequestId());
            newBuilder.setService(this.requestHeader.getService());
            newBuilder.setMethod(this.requestHeader.getMethod());
            newBuilder.setSequenceNumber(this.sequenceNumber.getAndIncrement());
            if (this.requestHeader.hasRealmId()) {
                newBuilder.setRealmId(this.requestHeader.getRealmId());
            }
            return this.session.bus.send(RpcUtil.createEofMessage(newBuilder.build()), BusDeliveryTracking.NONE).thenAccept(r3 -> {
                this.lock.lock();
                try {
                    clearReadTimeout();
                } finally {
                    this.lock.unlock();
                }
            });
        }

        private byte[] preparePayloadHeader() {
            TStreamingPayloadHeader.Builder newBuilder = TStreamingPayloadHeader.newBuilder();
            newBuilder.setRequestId(this.requestHeader.getRequestId());
            newBuilder.setService(this.requestHeader.getService());
            newBuilder.setMethod(this.requestHeader.getMethod());
            newBuilder.setSequenceNumber(this.sequenceNumber.getAndIncrement());
            newBuilder.setCodec(this.requestHeader.getRequestCodec());
            if (this.requestHeader.hasRealmId()) {
                newBuilder.setRealmId(this.requestHeader.getRealmId());
            }
            return RpcUtil.createMessageHeader(RpcMessageType.STREAMING_PAYLOAD, newBuilder.build());
        }

        @Override // tech.ytsaurus.client.rpc.RpcClientStreamControl
        public CompletableFuture<Void> sendPayload(List<byte[]> list) {
            ArrayList arrayList = new ArrayList(1 + list.size());
            arrayList.add(preparePayloadHeader());
            arrayList.addAll(list);
            return this.session.bus.send(arrayList, BusDeliveryTracking.NONE).thenAccept(r3 -> {
                this.lock.lock();
                try {
                    resetReadTimeout();
                } finally {
                    this.lock.unlock();
                }
            });
        }

        private void doConsumerWakeup() {
            try {
                this.consumer.onWakeup();
            } catch (Throwable th) {
                error(th);
            }
        }

        @Override // tech.ytsaurus.client.rpc.RpcClientStreamControl
        public void wakeUp() {
            this.session.eventLoop().schedule(this::doConsumerWakeup, 0L, TimeUnit.MILLISECONDS);
        }

        @Override // tech.ytsaurus.client.rpc.RpcClientStreamControl
        public String getRpcProxyAddress() {
            return this.sender.getAddressString();
        }
    }

    public DefaultRpcBusClient(BusConnector busConnector, InetSocketAddress inetSocketAddress) {
        this(busConnector, inetSocketAddress, inetSocketAddress.getHostName());
    }

    public DefaultRpcBusClient(BusConnector busConnector, InetSocketAddress inetSocketAddress, String str) {
        this(busConnector, inetSocketAddress, inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort(), str);
    }

    public DefaultRpcBusClient(BusConnector busConnector, SocketAddress socketAddress, String str) {
        this(busConnector, socketAddress, socketAddress.toString(), str);
    }

    public DefaultRpcBusClient(BusConnector busConnector, SocketAddress socketAddress, String str, String str2) {
        this.sessionLock = new ReentrantLock();
        this.metricsHolder = new DefaultRpcBusClientMetricsHolderImpl();
        this.referenceCounter = new AtomicInteger(1);
        this.busConnector = (BusConnector) Objects.requireNonNull(busConnector);
        this.address = (SocketAddress) Objects.requireNonNull(socketAddress);
        this.addressString = str;
        this.destinationName = str2;
        this.name = String.format("%s@%d", str2, Integer.valueOf(System.identityHashCode(this)));
        this.stats = new Statistics(destinationName());
    }

    private void discardSession(Session session) {
        this.sessionLock.lock();
        try {
            if (this.currentSession == session) {
                this.currentSession = null;
            }
        } finally {
            this.sessionLock.unlock();
        }
    }

    @Override // tech.ytsaurus.client.rpc.RpcClient
    public String destinationName() {
        return this.destinationName;
    }

    @Override // tech.ytsaurus.client.rpc.RpcClient
    public String getAddressString() {
        return this.addressString;
    }

    public String toString() {
        return this.name;
    }

    private Session getSession() {
        this.sessionLock.lock();
        try {
            if (this.closed) {
                throw new IllegalStateException("Client is closed");
            }
            Session session = this.currentSession;
            if (session == null) {
                session = new Session();
                this.currentSession = session;
                this.currentSession.start();
            }
            return session;
        } finally {
            this.sessionLock.unlock();
        }
    }

    @Override // tech.ytsaurus.client.rpc.RpcClient
    public void ref() {
        if (this.referenceCounter.getAndIncrement() <= 0) {
            throw new IllegalStateException("Trying to ref dead object");
        }
    }

    @Override // tech.ytsaurus.client.rpc.RpcClient
    public void unref() {
        int decrementAndGet = this.referenceCounter.decrementAndGet();
        if (decrementAndGet < 0) {
            throw new IllegalStateException("Trying to unref dead object");
        }
        if (decrementAndGet == 0) {
            close();
        }
    }

    @Override // tech.ytsaurus.client.rpc.RpcClient, java.lang.AutoCloseable
    public void close() {
        logger.debug("Closing RpcClient: {}", this);
        this.sessionLock.lock();
        try {
            this.closed = true;
            if (this.currentSession != null) {
                this.currentSession.stop();
                this.currentSession = null;
            }
        } finally {
            this.sessionLock.unlock();
        }
    }

    @Override // tech.ytsaurus.client.rpc.RpcClient
    public RpcClientRequestControl send(RpcClient rpcClient, RpcRequest<?> rpcRequest, RpcClientResponseHandler rpcClientResponseHandler, RpcOptions rpcOptions) {
        Request request = new Request(rpcClient, getSession(), rpcRequest, rpcClientResponseHandler, rpcOptions, this.stats);
        request.start();
        return request;
    }

    @Override // tech.ytsaurus.client.rpc.RpcClient
    public RpcClientStreamControl startStream(RpcClient rpcClient, RpcRequest<?> rpcRequest, RpcStreamConsumer rpcStreamConsumer, RpcOptions rpcOptions) {
        StreamingRequest streamingRequest = new StreamingRequest(rpcClient, getSession(), rpcRequest, rpcStreamConsumer, rpcOptions, this.stats);
        streamingRequest.start();
        return streamingRequest;
    }

    @Override // tech.ytsaurus.client.rpc.RpcClient
    public ScheduledExecutorService executor() {
        return getSession().eventLoop();
    }
}
