package com.graphaware.module.es;

import com.graphaware.common.log.LoggerFactory;
import com.graphaware.module.es.executor.BulkOperationExecutor;
import com.graphaware.module.es.executor.BulkOperationExecutorFactory;
import com.graphaware.module.es.executor.OperationExecutor;
import com.graphaware.module.es.executor.OperationExecutorFactory;
import com.graphaware.module.es.executor.RequestPerOperationExecutorFactory;
import com.graphaware.module.es.mapping.Mapping;
import com.graphaware.module.es.search.Searcher;
import com.graphaware.writer.thirdparty.BaseThirdPartyWriter;
import com.graphaware.writer.thirdparty.WriteOperation;
import io.searchbox.action.BulkableAction;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestResult;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.http.HttpStatus;
import org.neo4j.logging.Log;
import org.springframework.util.Assert;

/* loaded from: input_file:com/graphaware/module/es/ElasticSearchWriter.class */
public class ElasticSearchWriter extends BaseThirdPartyWriter {
    private static final Log LOG = LoggerFactory.getLogger(ElasticSearchWriter.class);
    private JestClient client;
    private int consecutiveErrors;
    private final String protocol;
    private final String uri;
    private final String port;
    private final boolean retryOnError;
    private final int maxConsecutiveErrors;
    private final OperationExecutorFactory executorFactory;
    private final AtomicBoolean indexExists;
    private final String authUser;
    private final String authPassword;
    private final Mapping mapping;
    private final boolean async;
    private final int readTimeout;
    private final int connectionTimeout;
    private final int MAX_BULK_SIZE = 500;

    public ElasticSearchWriter(ElasticSearchConfiguration elasticSearchConfiguration) {
        super(elasticSearchConfiguration.getQueueCapacity());
        this.consecutiveErrors = 0;
        this.indexExists = new AtomicBoolean(false);
        this.MAX_BULK_SIZE = HttpStatus.SC_INTERNAL_SERVER_ERROR;
        Assert.notNull(elasticSearchConfiguration, "Configuration cannot be null");
        this.protocol = elasticSearchConfiguration.getProtocol();
        this.uri = elasticSearchConfiguration.getUri();
        this.port = elasticSearchConfiguration.getPort();
        this.retryOnError = elasticSearchConfiguration.isRetryOnError();
        this.maxConsecutiveErrors = elasticSearchConfiguration.getMaxConsecutiveErrors();
        this.executorFactory = elasticSearchConfiguration.isExecuteBulk() ? new BulkOperationExecutorFactory() : new RequestPerOperationExecutorFactory();
        this.authUser = elasticSearchConfiguration.getAuthUser();
        this.authPassword = elasticSearchConfiguration.getAuthPassword();
        this.mapping = elasticSearchConfiguration.getMapping();
        this.async = elasticSearchConfiguration.isAsyncIndexation();
        this.readTimeout = elasticSearchConfiguration.getReadTimeout();
        this.connectionTimeout = elasticSearchConfiguration.getConnectionTimeout();
    }

    public void start() {
        LOG.info("Starting Elasticsearch Writer...");
        super.start();
        this.client = createClient();
        createIndexIfNotExist();
        LOG.info("Started Elasticsearch Writer.");
    }

    public void stop() {
        LOG.info("Stopping Elasticsearch Writer...");
        super.stop();
        shutdownClient();
        LOG.info("Stopped Elasticsearch Writer.");
    }

    public void reloadMapping() {
        this.mapping.reload();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processOperations(List<Collection<WriteOperation<?>>> list) {
        createIndexIfNotExist();
        OperationExecutor newExecutor = this.executorFactory.newExecutor(this.client);
        newExecutor.start();
        int i = 0;
        Iterator<Collection<WriteOperation<?>>> it = list.iterator();
        while (it.hasNext()) {
            for (WriteOperation<?> writeOperation : it.next()) {
                List<BulkableAction<? extends JestResult>> actions = this.mapping.getActions(writeOperation);
                newExecutor.execute(actions, writeOperation);
                i += actions.size();
                if (i > 500) {
                    flushBatch(newExecutor);
                    i = 0;
                    newExecutor = this.executorFactory.newExecutor(this.client);
                    newExecutor.start();
                }
            }
        }
        if (i == 0) {
            return;
        }
        flushBatch(newExecutor);
    }

    protected void flushBatch(OperationExecutor operationExecutor) {
        List<WriteOperation<?>> flush = operationExecutor.flush();
        if (flush.isEmpty()) {
            this.consecutiveErrors = 0;
            return;
        }
        if (!this.retryOnError) {
            LOG.warn(flush.size() + " operations could not be replicated to Elasticsearch. These updates got lost.");
            return;
        }
        this.consecutiveErrors++;
        if (this.consecutiveErrors > this.maxConsecutiveErrors) {
            LOG.warn("" + flush.size() + " operations could not be replicated to Elasticsearch. Giving up after " + this.maxConsecutiveErrors + " consecutive errors. These updated got lost.");
            this.consecutiveErrors = 0;
            return;
        }
        LOG.warn("" + flush.size() + " operations could not be replicated to Elasticsearch. " + (this.consecutiveErrors > 1 ? this.consecutiveErrors + " consecutive errors. " : "") + "Will retry...");
        retry(Collections.singletonList(flush));
        try {
            LOG.info("Backing off for 2 seconds...");
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            LOG.warn("Wait interrupted", e);
        }
        if (operationExecutor instanceof BulkOperationExecutor) {
            ((BulkOperationExecutor) operationExecutor).reset();
        }
    }

    protected JestClient createClient() {
        return Searcher.createClient(this.protocol, this.uri, this.port, this.authUser, this.authPassword, this.readTimeout, this.connectionTimeout);
    }

    protected void shutdownClient() {
        LOG.info("Shutting down Jest Client...");
        if (this.client != null) {
            this.client.shutdownClient();
            this.client = null;
        }
        LOG.info("Shut down Jest Client.");
    }

    protected void createIndexIfNotExist() {
        if (this.indexExists.get()) {
            return;
        }
        synchronized (this) {
            if (this.indexExists.get()) {
                return;
            }
            try {
                this.mapping.createIndexAndMapping(this.client);
                this.indexExists.set(true);
            } catch (Exception e) {
                LOG.error("Failed to create Elasticsearch index.", e);
            }
        }
    }
}
