package net.ravendb.client.documents.subscriptions;

import com.fasterxml.jackson.core.JsonParser;
import java.io.IOException;
import java.net.Socket;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import net.ravendb.client.documents.DocumentStore;
import net.ravendb.client.documents.queries.moreLikeThis.MoreLikeThisOptions;
import net.ravendb.client.documents.subscriptions.SubscriptionConnectionClientMessage;
import net.ravendb.client.documents.subscriptions.SubscriptionConnectionServerMessage;
import net.ravendb.client.exceptions.AllTopologyNodesDownException;
import net.ravendb.client.exceptions.database.DatabaseDoesNotExistException;
import net.ravendb.client.exceptions.documents.subscriptions.SubscriberErrorException;
import net.ravendb.client.exceptions.documents.subscriptions.SubscriptionChangeVectorUpdateConcurrencyException;
import net.ravendb.client.exceptions.documents.subscriptions.SubscriptionClosedException;
import net.ravendb.client.exceptions.documents.subscriptions.SubscriptionDoesNotBelongToNodeException;
import net.ravendb.client.exceptions.documents.subscriptions.SubscriptionDoesNotExistException;
import net.ravendb.client.exceptions.documents.subscriptions.SubscriptionInUseException;
import net.ravendb.client.exceptions.documents.subscriptions.SubscriptionInvalidStateException;
import net.ravendb.client.exceptions.security.AuthorizationException;
import net.ravendb.client.extensions.JsonExtensions;
import net.ravendb.client.http.RequestExecutor;
import net.ravendb.client.http.ServerNode;
import net.ravendb.client.primitives.CancellationTokenSource;
import net.ravendb.client.primitives.CleanCloseable;
import net.ravendb.client.primitives.EventHelper;
import net.ravendb.client.primitives.ExceptionsUtils;
import net.ravendb.client.serverwide.commands.GetTcpInfoCommand;
import net.ravendb.client.serverwide.tcp.TcpConnectionHeaderMessage;
import net.ravendb.client.serverwide.tcp.TcpConnectionHeaderResponse;
import net.ravendb.client.serverwide.tcp.TcpConnectionStatus;
import net.ravendb.client.serverwide.tcp.TcpNegotiateParameters;
import net.ravendb.client.serverwide.tcp.TcpNegotiation;
import net.ravendb.client.util.TcpUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:net/ravendb/client/documents/subscriptions/SubscriptionWorker.class */
public class SubscriptionWorker<T> implements CleanCloseable {
    private final Class<T> _clazz;
    private final boolean _revisions;
    private final Log _logger;
    private final DocumentStore _store;
    private final String _dbName;
    private final SubscriptionWorkerOptions _options;
    private Consumer<SubscriptionBatch<T>> _subscriber;
    private Socket _tcpClient;
    private JsonParser _parser;
    private boolean _disposed;
    private CompletableFuture<Void> _subscriptionTask;
    private List<Consumer<SubscriptionBatch<T>>> afterAcknowledgment;
    private List<Consumer<Exception>> onSubscriptionConnectionRetry;
    private ServerNode _redirectNode;
    private RequestExecutor _subscriptionLocalRequestExecutor;
    private Date lastConnectionFailure;
    private TcpConnectionHeaderMessage.SupportedFeatures _supportedFeatures;
    private final CancellationTokenSource _processingCts = new CancellationTokenSource();
    Consumer<SubscriptionWorker<T>> onClosed = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: net.ravendb.client.documents.subscriptions.SubscriptionWorker$1, reason: invalid class name */
    /* loaded from: input_file:net/ravendb/client/documents/subscriptions/SubscriptionWorker$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$net$ravendb$client$documents$subscriptions$SubscriptionConnectionServerMessage$ConnectionStatus;
        static final /* synthetic */ int[] $SwitchMap$net$ravendb$client$documents$subscriptions$SubscriptionConnectionServerMessage$MessageType = new int[SubscriptionConnectionServerMessage.MessageType.values().length];

        static {
            try {
                $SwitchMap$net$ravendb$client$documents$subscriptions$SubscriptionConnectionServerMessage$MessageType[SubscriptionConnectionServerMessage.MessageType.DATA.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$net$ravendb$client$documents$subscriptions$SubscriptionConnectionServerMessage$MessageType[SubscriptionConnectionServerMessage.MessageType.INCLUDES.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$net$ravendb$client$documents$subscriptions$SubscriptionConnectionServerMessage$MessageType[SubscriptionConnectionServerMessage.MessageType.END_OF_BATCH.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$net$ravendb$client$documents$subscriptions$SubscriptionConnectionServerMessage$MessageType[SubscriptionConnectionServerMessage.MessageType.CONFIRM.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$net$ravendb$client$documents$subscriptions$SubscriptionConnectionServerMessage$MessageType[SubscriptionConnectionServerMessage.MessageType.CONNECTION_STATUS.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$net$ravendb$client$documents$subscriptions$SubscriptionConnectionServerMessage$MessageType[SubscriptionConnectionServerMessage.MessageType.ERROR.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            $SwitchMap$net$ravendb$client$documents$subscriptions$SubscriptionConnectionServerMessage$ConnectionStatus = new int[SubscriptionConnectionServerMessage.ConnectionStatus.values().length];
            try {
                $SwitchMap$net$ravendb$client$documents$subscriptions$SubscriptionConnectionServerMessage$ConnectionStatus[SubscriptionConnectionServerMessage.ConnectionStatus.ACCEPTED.ordinal()] = 1;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$net$ravendb$client$documents$subscriptions$SubscriptionConnectionServerMessage$ConnectionStatus[SubscriptionConnectionServerMessage.ConnectionStatus.IN_USE.ordinal()] = 2;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$net$ravendb$client$documents$subscriptions$SubscriptionConnectionServerMessage$ConnectionStatus[SubscriptionConnectionServerMessage.ConnectionStatus.CLOSED.ordinal()] = 3;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$net$ravendb$client$documents$subscriptions$SubscriptionConnectionServerMessage$ConnectionStatus[SubscriptionConnectionServerMessage.ConnectionStatus.INVALID.ordinal()] = 4;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$net$ravendb$client$documents$subscriptions$SubscriptionConnectionServerMessage$ConnectionStatus[SubscriptionConnectionServerMessage.ConnectionStatus.NOT_FOUND.ordinal()] = 5;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$net$ravendb$client$documents$subscriptions$SubscriptionConnectionServerMessage$ConnectionStatus[SubscriptionConnectionServerMessage.ConnectionStatus.REDIRECT.ordinal()] = 6;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$net$ravendb$client$documents$subscriptions$SubscriptionConnectionServerMessage$ConnectionStatus[SubscriptionConnectionServerMessage.ConnectionStatus.CONCURRENCY_RECONNECT.ordinal()] = 7;
            } catch (NoSuchFieldError e13) {
            }
            $SwitchMap$net$ravendb$client$serverwide$tcp$TcpConnectionStatus = new int[TcpConnectionStatus.values().length];
            try {
                $SwitchMap$net$ravendb$client$serverwide$tcp$TcpConnectionStatus[TcpConnectionStatus.OK.ordinal()] = 1;
            } catch (NoSuchFieldError e14) {
            }
            try {
                $SwitchMap$net$ravendb$client$serverwide$tcp$TcpConnectionStatus[TcpConnectionStatus.AUTHORIZATION_FAILED.ordinal()] = 2;
            } catch (NoSuchFieldError e15) {
            }
            try {
                $SwitchMap$net$ravendb$client$serverwide$tcp$TcpConnectionStatus[TcpConnectionStatus.TCP_VERSION_MISMATCH.ordinal()] = 3;
            } catch (NoSuchFieldError e16) {
            }
        }
    }

    public void addAfterAcknowledgmentListener(Consumer<SubscriptionBatch<T>> consumer) {
        this.afterAcknowledgment.add(consumer);
    }

    public void removeAfterAcknowledgmentListener(Consumer<SubscriptionBatch<T>> consumer) {
        this.afterAcknowledgment.remove(consumer);
    }

    public void addOnSubscriptionConnectionRetry(Consumer<Exception> consumer) {
        this.onSubscriptionConnectionRetry.add(consumer);
    }

    public void removeOnSubscriptionConnectionRetry(Consumer<Exception> consumer) {
        this.onSubscriptionConnectionRetry.remove(consumer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public SubscriptionWorker(Class<?> cls, SubscriptionWorkerOptions subscriptionWorkerOptions, boolean z, DocumentStore documentStore, String str) {
        this._clazz = cls;
        this._options = subscriptionWorkerOptions;
        this._revisions = z;
        if (StringUtils.isEmpty(subscriptionWorkerOptions.getSubscriptionName())) {
            throw new IllegalArgumentException("SubscriptionConnectionOptions must specify the subscriptionName");
        }
        this._store = documentStore;
        this._dbName = (String) ObjectUtils.firstNonNull(new String[]{str, documentStore.getDatabase()});
        this._logger = LogFactory.getLog(SubscriptionWorker.class);
        this.afterAcknowledgment = new ArrayList();
        this.onSubscriptionConnectionRetry = new ArrayList();
    }

    @Override // net.ravendb.client.primitives.CleanCloseable, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        close(true);
    }

    public void close(boolean z) {
        try {
            try {
                if (this._disposed) {
                    if (this.onClosed != null) {
                        this.onClosed.accept(this);
                        return;
                    }
                    return;
                }
                this._disposed = true;
                this._processingCts.cancel();
                closeTcpClient();
                if (this._subscriptionTask != null && z) {
                    try {
                        this._subscriptionTask.get();
                    } catch (Exception e) {
                    }
                }
                if (this.onClosed != null) {
                    this.onClosed.accept(this);
                }
            } catch (Exception e2) {
                if (this._logger.isDebugEnabled()) {
                    this._logger.debug("Error during close of subscription: " + e2.getMessage(), e2);
                }
                if (this.onClosed != null) {
                    this.onClosed.accept(this);
                }
            }
        } catch (Throwable th) {
            if (this.onClosed != null) {
                this.onClosed.accept(this);
            }
            throw th;
        }
    }

    public CompletableFuture<Void> run(Consumer<SubscriptionBatch<T>> consumer) {
        if (consumer == null) {
            throw new IllegalArgumentException("ProcessDocuments cannot be null");
        }
        this._subscriber = consumer;
        return run();
    }

    private CompletableFuture<Void> run() {
        if (this._subscriptionTask != null) {
            throw new IllegalStateException("The subscription is already running");
        }
        CompletableFuture<Void> runSubscriptionAsync = runSubscriptionAsync();
        this._subscriptionTask = runSubscriptionAsync;
        return runSubscriptionAsync;
    }

    public String getCurrentNodeTag() {
        if (this._redirectNode != null) {
            return this._redirectNode.getClusterTag();
        }
        return null;
    }

    public String getSubscriptionName() {
        if (this._options != null) {
            return this._options.getSubscriptionName();
        }
        return null;
    }

    private Socket connectToServer() throws IOException, GeneralSecurityException {
        GetTcpInfoCommand getTcpInfoCommand = new GetTcpInfoCommand("Subscription/" + this._dbName, this._dbName);
        RequestExecutor requestExecutor = this._store.getRequestExecutor(this._dbName);
        if (this._redirectNode != null) {
            try {
                requestExecutor.execute(this._redirectNode, null, getTcpInfoCommand, false, null);
            } catch (Exception e) {
                this._redirectNode = null;
                throw new RuntimeException(e);
            }
        } else {
            requestExecutor.execute(getTcpInfoCommand);
        }
        this._tcpClient = TcpUtils.connect(getTcpInfoCommand.getResult().getUrl(), getTcpInfoCommand.getResult().getCertificate(), this._store.getCertificate());
        this._tcpClient.setTcpNoDelay(true);
        this._tcpClient.setSendBufferSize(32768);
        this._tcpClient.setReceiveBufferSize(4096);
        String str = (String) ObjectUtils.firstNonNull(new String[]{this._dbName, this._store.getDatabase()});
        TcpNegotiateParameters tcpNegotiateParameters = new TcpNegotiateParameters();
        tcpNegotiateParameters.setDatabase(str);
        tcpNegotiateParameters.setOperation(TcpConnectionHeaderMessage.OperationTypes.SUBSCRIPTION);
        tcpNegotiateParameters.setVersion(40);
        tcpNegotiateParameters.setReadResponseAndGetVersionCallback(this::readServerResponseAndGetVersion);
        tcpNegotiateParameters.setDestinationNodeTag(getCurrentNodeTag());
        tcpNegotiateParameters.setDestinationUrl(getTcpInfoCommand.getResult().getUrl());
        this._supportedFeatures = TcpNegotiation.negotiateProtocolVersion(this._tcpClient.getOutputStream(), tcpNegotiateParameters);
        if (this._supportedFeatures.protocolVersion <= 0) {
            throw new IllegalStateException(this._options.getSubscriptionName() + " : TCP negotiation resulted with an invalid protocol version: " + this._supportedFeatures.protocolVersion);
        }
        this._tcpClient.getOutputStream().write(JsonExtensions.getDefaultMapper().writeValueAsBytes(this._options));
        this._tcpClient.getOutputStream().flush();
        if (this._subscriptionLocalRequestExecutor != null) {
            this._subscriptionLocalRequestExecutor.close();
        }
        this._subscriptionLocalRequestExecutor = RequestExecutor.createForSingleNodeWithoutConfigurationUpdates(getTcpInfoCommand.getRequestedNode().getUrl(), this._dbName, requestExecutor.getCertificate(), this._store.getExecutorService(), this._store.getConventions());
        return this._tcpClient;
    }

    private void ensureParser() throws IOException {
        if (this._parser == null) {
            this._parser = JsonExtensions.getDefaultMapper().getFactory().createParser(this._tcpClient.getInputStream());
            this._parser.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
        }
    }

    private int readServerResponseAndGetVersion(String str) {
        try {
            ensureParser();
            TcpConnectionHeaderResponse tcpConnectionHeaderResponse = (TcpConnectionHeaderResponse) JsonExtensions.getDefaultMapper().treeToValue(this._parser.readValueAsTree(), TcpConnectionHeaderResponse.class);
            switch (tcpConnectionHeaderResponse.getStatus()) {
                case OK:
                    return tcpConnectionHeaderResponse.getVersion();
                case AUTHORIZATION_FAILED:
                    throw new AuthorizationException("Cannot access database " + this._dbName + " because " + tcpConnectionHeaderResponse.getMessage());
                case TCP_VERSION_MISMATCH:
                    if (tcpConnectionHeaderResponse.getVersion() != -1) {
                        return tcpConnectionHeaderResponse.getVersion();
                    }
                    sendDropMessage(tcpConnectionHeaderResponse);
                    throw new IllegalStateException("Can't connect to database " + this._dbName + " because: " + tcpConnectionHeaderResponse.getMessage());
                default:
                    return tcpConnectionHeaderResponse.getVersion();
            }
        } catch (IOException e) {
            throw ExceptionsUtils.unwrapException(e);
        }
    }

    private void sendDropMessage(TcpConnectionHeaderResponse tcpConnectionHeaderResponse) throws IOException {
        TcpConnectionHeaderMessage tcpConnectionHeaderMessage = new TcpConnectionHeaderMessage();
        tcpConnectionHeaderMessage.setOperation(TcpConnectionHeaderMessage.OperationTypes.DROP);
        tcpConnectionHeaderMessage.setDatabaseName(this._dbName);
        tcpConnectionHeaderMessage.setOperationVersion(40);
        tcpConnectionHeaderMessage.setInfo("Couldn't agree on subscription tcp version ours: 40 theirs: " + tcpConnectionHeaderResponse.getVersion());
        this._tcpClient.getOutputStream().write(JsonExtensions.getDefaultMapper().writeValueAsBytes(tcpConnectionHeaderMessage));
        this._tcpClient.getOutputStream().flush();
    }

    private void assertConnectionState(SubscriptionConnectionServerMessage subscriptionConnectionServerMessage) {
        if (subscriptionConnectionServerMessage.getType() == SubscriptionConnectionServerMessage.MessageType.ERROR && subscriptionConnectionServerMessage.getException().contains("DatabaseDoesNotExistException")) {
            throw new DatabaseDoesNotExistException(this._dbName + " does not exists. " + subscriptionConnectionServerMessage.getMessage());
        }
        if (subscriptionConnectionServerMessage.getType() != SubscriptionConnectionServerMessage.MessageType.CONNECTION_STATUS) {
            throw new IllegalStateException("Server returned illegal type message when expecting connection status, was:" + subscriptionConnectionServerMessage.getType());
        }
        switch (AnonymousClass1.$SwitchMap$net$ravendb$client$documents$subscriptions$SubscriptionConnectionServerMessage$ConnectionStatus[subscriptionConnectionServerMessage.getStatus().ordinal()]) {
            case 1:
                return;
            case 2:
                throw new SubscriptionInUseException("Subscription with id " + this._options.getSubscriptionName() + " cannot be opened, because it's in use and the connection strategy is " + this._options.getStrategy());
            case 3:
                throw new SubscriptionClosedException("Subscription with id " + this._options.getSubscriptionName() + " was closed. " + subscriptionConnectionServerMessage.getException());
            case 4:
                throw new SubscriptionInvalidStateException("Subscription with id " + this._options.getSubscriptionName() + " cannot be opened, because it is in invalid state. " + subscriptionConnectionServerMessage.getException());
            case MoreLikeThisOptions.DEFAULT_MINIMUM_DOCUMENT_FREQUENCY /* 5 */:
                throw new SubscriptionDoesNotExistException("Subscription with id " + this._options.getSubscriptionName() + " cannot be opened, because it does not exist. " + subscriptionConnectionServerMessage.getException());
            case 6:
                String asText = subscriptionConnectionServerMessage.getData().get("RedirectedTag").asText();
                SubscriptionDoesNotBelongToNodeException subscriptionDoesNotBelongToNodeException = new SubscriptionDoesNotBelongToNodeException("Subscription with id " + this._options.getSubscriptionName() + " cannot be processed by current node, it will be redirected to " + asText);
                subscriptionDoesNotBelongToNodeException.setAppropriateNode(asText);
                throw subscriptionDoesNotBelongToNodeException;
            case 7:
                throw new SubscriptionChangeVectorUpdateConcurrencyException(subscriptionConnectionServerMessage.getMessage());
            default:
                throw new IllegalStateException("Subscription " + this._options.getSubscriptionName() + " could not be opened, reason: " + subscriptionConnectionServerMessage.getStatus());
        }
    }

    /* JADX WARN: Finally extract failed */
    /* JADX WARN: Removed duplicated region for block: B:90:0x0189  */
    /* JADX WARN: Removed duplicated region for block: B:92:? A[RETURN, SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void processSubscription() throws java.lang.Exception {
        /*
            Method dump skipped, instructions count: 396
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: net.ravendb.client.documents.subscriptions.SubscriptionWorker.processSubscription():void");
    }

    private BatchFromServer readSingleSubscriptionBatchFromServer(Socket socket, SubscriptionBatch<T> subscriptionBatch) throws IOException {
        SubscriptionConnectionServerMessage readNextObject;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        boolean z = false;
        while (!z && !this._processingCts.getToken().isCancellationRequested() && (readNextObject = readNextObject(socket)) != null && !this._processingCts.getToken().isCancellationRequested()) {
            switch (AnonymousClass1.$SwitchMap$net$ravendb$client$documents$subscriptions$SubscriptionConnectionServerMessage$MessageType[readNextObject.getType().ordinal()]) {
                case 1:
                    arrayList.add(readNextObject);
                    break;
                case 2:
                    arrayList2.add(readNextObject.getIncludes());
                    break;
                case 3:
                    z = true;
                    break;
                case 4:
                    EventHelper.invoke(this.afterAcknowledgment, subscriptionBatch);
                    arrayList.clear();
                    subscriptionBatch.getItems().clear();
                    break;
                case MoreLikeThisOptions.DEFAULT_MINIMUM_DOCUMENT_FREQUENCY /* 5 */:
                    assertConnectionState(readNextObject);
                    break;
                case 6:
                    throwSubscriptionError(readNextObject);
                    break;
                default:
                    throwInvalidServerResponse(readNextObject);
                    break;
            }
        }
        BatchFromServer batchFromServer = new BatchFromServer();
        batchFromServer.setMessages(arrayList);
        batchFromServer.setIncludes(arrayList2);
        return batchFromServer;
    }

    private static void throwInvalidServerResponse(SubscriptionConnectionServerMessage subscriptionConnectionServerMessage) {
        throw new IllegalArgumentException("Unrecognized message " + subscriptionConnectionServerMessage.getType() + " type received from server");
    }

    private static void throwSubscriptionError(SubscriptionConnectionServerMessage subscriptionConnectionServerMessage) {
        throw new IllegalStateException("Connected terminated by server. Exception: " + ((String) ObjectUtils.firstNonNull(new String[]{subscriptionConnectionServerMessage.getException(), "None"})));
    }

    private SubscriptionConnectionServerMessage readNextObject(Socket socket) throws IOException {
        if (this._processingCts.getToken().isCancellationRequested() || !this._tcpClient.isConnected() || this._disposed) {
            return null;
        }
        return (SubscriptionConnectionServerMessage) JsonExtensions.getDefaultMapper().treeToValue(this._parser.readValueAsTree(), SubscriptionConnectionServerMessage.class);
    }

    private void sendAck(String str, Socket socket) throws IOException {
        SubscriptionConnectionClientMessage subscriptionConnectionClientMessage = new SubscriptionConnectionClientMessage();
        subscriptionConnectionClientMessage.setChangeVector(str);
        subscriptionConnectionClientMessage.setType(SubscriptionConnectionClientMessage.MessageType.ACKNOWLEDGE);
        socket.getOutputStream().write(JsonExtensions.getDefaultMapper().writeValueAsBytes(subscriptionConnectionClientMessage));
        socket.getOutputStream().flush();
    }

    private CompletableFuture<Void> runSubscriptionAsync() {
        return CompletableFuture.runAsync(() -> {
            while (!this._processingCts.getToken().isCancellationRequested()) {
                try {
                    closeTcpClient();
                    if (this._logger.isInfoEnabled()) {
                        this._logger.info("Subscription " + this._options.getSubscriptionName() + ". Connecting to server...");
                    }
                    processSubscription();
                } catch (Exception e) {
                    try {
                        if (this._processingCts.getToken().isCancellationRequested()) {
                            if (!this._disposed) {
                                throw e;
                            }
                            return;
                        }
                        if (this._logger.isInfoEnabled()) {
                            this._logger.info("Subscription " + this._options.getSubscriptionName() + ". Pulling task threw the following exception", e);
                        }
                        if (!shouldTryToReconnect(e)) {
                            if (this._logger.isErrorEnabled()) {
                                this._logger.error("Connection to subscription " + this._options.getSubscriptionName() + " have been shut down because of an error", e);
                            }
                            throw e;
                        }
                        Thread.sleep(this._options.getTimeToWaitBeforeConnectionRetry().toMillis());
                        EventHelper.invoke(this.onSubscriptionConnectionRetry, e);
                    } catch (Exception e2) {
                        throw ExceptionsUtils.unwrapException(e2);
                    }
                }
            }
        }, this._store.getExecutorService());
    }

    private void assertLastConnectionFailure() {
        if (this.lastConnectionFailure == null) {
            this.lastConnectionFailure = new Date();
        } else if (new Date().getTime() - this.lastConnectionFailure.getTime() > this._options.getMaxErroneousPeriod().toMillis()) {
            throw new SubscriptionInvalidStateException("Subscription connection was in invalid state for more than " + this._options.getMaxErroneousPeriod() + " and therefore will be terminated");
        }
    }

    private boolean shouldTryToReconnect(Exception exc) {
        RuntimeException unwrapException = ExceptionsUtils.unwrapException(exc);
        if (unwrapException instanceof SubscriptionDoesNotBelongToNodeException) {
            SubscriptionDoesNotBelongToNodeException subscriptionDoesNotBelongToNodeException = (SubscriptionDoesNotBelongToNodeException) unwrapException;
            assertLastConnectionFailure();
            RequestExecutor requestExecutor = this._store.getRequestExecutor(this._dbName);
            if (subscriptionDoesNotBelongToNodeException.getAppropriateNode() == null) {
                return true;
            }
            ServerNode orElse = requestExecutor.getTopologyNodes().stream().filter(serverNode -> {
                return serverNode.getClusterTag().equals(subscriptionDoesNotBelongToNodeException.getAppropriateNode());
            }).findFirst().orElse(null);
            if (orElse == null) {
                throw new IllegalStateException("Could not redirect to " + subscriptionDoesNotBelongToNodeException.getAppropriateNode() + ", because it was not found in local topology, even after retrying");
            }
            this._redirectNode = orElse;
            return true;
        }
        if (unwrapException instanceof SubscriptionChangeVectorUpdateConcurrencyException) {
            return true;
        }
        if ((unwrapException instanceof SubscriptionInUseException) || (unwrapException instanceof SubscriptionDoesNotExistException) || (unwrapException instanceof SubscriptionClosedException) || (unwrapException instanceof SubscriptionInvalidStateException) || (unwrapException instanceof DatabaseDoesNotExistException) || (unwrapException instanceof AuthorizationException) || (unwrapException instanceof AllTopologyNodesDownException) || (unwrapException instanceof SubscriberErrorException)) {
            this._processingCts.cancel();
            return false;
        }
        assertLastConnectionFailure();
        return true;
    }

    private void closeTcpClient() {
        if (this._parser != null) {
            IOUtils.closeQuietly(this._parser);
            this._parser = null;
        }
        if (this._tcpClient != null) {
            IOUtils.closeQuietly(this._tcpClient);
            this._tcpClient = null;
        }
    }
}
