package io.annot8.components.elasticsearch.processors;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ErrorCause;
import co.elastic.clients.elasticsearch._types.mapping.Property;
import co.elastic.clients.elasticsearch._types.mapping.TypeMapping;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
import co.elastic.clients.elasticsearch.core.bulk.OperationType;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import io.annot8.api.components.responses.ProcessorResponse;
import io.annot8.api.data.Item;
import io.annot8.api.exceptions.BadConfigurationException;
import io.annot8.api.exceptions.ProcessingException;
import io.annot8.common.components.AbstractProcessor;
import io.annot8.components.elasticsearch.ElasticsearchSettings;
import java.io.IOException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.http.HttpHost;
import org.apache.http.client.CredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;

/* loaded from: input_file:io/annot8/components/elasticsearch/processors/AbstractElasticsearchSink.class */
public abstract class AbstractElasticsearchSink extends AbstractProcessor {
    protected final ElasticsearchClient client;
    protected final String index;
    protected final boolean forceString;

    public AbstractElasticsearchSink(ElasticsearchSettings elasticsearchSettings) {
        this(List.of(elasticsearchSettings.host()), elasticsearchSettings.getIndex(), elasticsearchSettings.isDeleteIndex(), elasticsearchSettings.isForceString(), elasticsearchSettings.credentials());
    }

    public AbstractElasticsearchSink(List<HttpHost> list, String str, boolean z, boolean z2, CredentialsProvider credentialsProvider) {
        RestClientBuilder builder = RestClient.builder((HttpHost[]) list.toArray(new HttpHost[0]));
        if (credentialsProvider != null) {
            builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
                return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            });
        }
        this.client = new ElasticsearchClient(new RestClientTransport(builder.build(), new JacksonJsonpMapper()));
        this.index = str;
        this.forceString = z2;
        try {
            if (!this.client.ping().value()) {
                throw new BadConfigurationException("Could not connect to Elasticsearch - ping returned false");
            }
            if (z) {
                try {
                    if (this.client.indices().exists(builder2 -> {
                        return builder2.index(str, new String[0]);
                    }).value()) {
                        log().info("Deleting index {}", str);
                        this.client.indices().delete(builder3 -> {
                            return builder3.index(str, new String[0]);
                        });
                    }
                } catch (IOException e) {
                    log().error("An exception occurred whilst deleting index {}", str, e);
                }
            }
            try {
                if (this.client.indices().exists(builder4 -> {
                    return builder4.index(str, new String[0]);
                }).value()) {
                    log().warn("Index {} already exists - existing mapping will be used", str);
                } else {
                    Optional<TypeMapping> typeMapping = getTypeMapping();
                    if (typeMapping.isPresent()) {
                        log().info("Creating index {} with mapping", str);
                        this.client.indices().create(builder5 -> {
                            return builder5.index(str).mappings((TypeMapping) typeMapping.get());
                        });
                    }
                }
            } catch (IOException e2) {
                log().error("An exception occurred whilst creating index {}", str, e2);
            }
        } catch (IOException e3) {
            throw new BadConfigurationException("Could not connect to Elasticsearch", e3);
        }
    }

    public ProcessorResponse process(Item item) {
        try {
            List<IndexOperation<?>> itemToIndexRequests = itemToIndexRequests(item);
            if (itemToIndexRequests.isEmpty()) {
                log().debug("No index requests created for item {}", item.getId());
                return ProcessorResponse.ok();
            }
            BulkRequest build = new BulkRequest.Builder().operations((List) itemToIndexRequests.stream().map(indexOperation -> {
                return (BulkOperation) new BulkOperation.Builder().index(indexOperation).build();
            }).collect(Collectors.toList())).build();
            ArrayList arrayList = new ArrayList();
            try {
                log().debug("Performing bulk request to index item {} ({} index requests)", item.getId(), Integer.valueOf(itemToIndexRequests.size()));
                for (BulkResponseItem bulkResponseItem : this.client.bulk(build).items()) {
                    ErrorCause error = bulkResponseItem.error();
                    if (error != null) {
                        log().error("Failed to create/update document {} in index {}: {}", new Object[]{bulkResponseItem.id(), bulkResponseItem.index(), error.reason()});
                        arrayList.add(new ProcessingException(error.reason()));
                    } else if (bulkResponseItem.operationType() == OperationType.Index || bulkResponseItem.operationType() == OperationType.Create) {
                        log().debug("New document {} indexed in index {}", bulkResponseItem.id(), bulkResponseItem.index());
                    } else if (bulkResponseItem.operationType() == OperationType.Update) {
                        log().debug("Existing document {} updated in index {}", bulkResponseItem.id(), bulkResponseItem.index());
                    } else {
                        log().error("Unexpected result returned whilst indexing document {} in index {}: {}", new Object[]{bulkResponseItem.id(), bulkResponseItem.index(), bulkResponseItem.operationType().name()});
                    }
                }
                return arrayList.isEmpty() ? ProcessorResponse.ok() : ProcessorResponse.itemError(arrayList);
            } catch (ConnectException e) {
                log().error("Unable to connect to Elasticsearch whilst performing bulk request: {}", e.getMessage());
                return ProcessorResponse.processingError(new Exception[]{e});
            } catch (IOException e2) {
                log().error("Exception thrown whilst performing bulk request: {}", e2.getMessage());
                return ProcessorResponse.itemError(new Exception[]{e2});
            }
        } catch (Exception e3) {
            log().error("Unable to serialize item {}: {}", item.getId(), e3.getMessage());
            return ProcessorResponse.itemError(new Exception[]{e3});
        }
    }

    protected Optional<Map<String, Property>> getMapping() {
        return Optional.empty();
    }

    protected Optional<TypeMapping> getTypeMapping() {
        Optional<Map<String, Property>> mapping = getMapping();
        return mapping.isEmpty() ? Optional.empty() : Optional.of(TypeMapping.of(builder -> {
            return builder.properties((Map) mapping.get());
        }));
    }

    protected abstract List<IndexOperation<?>> itemToIndexRequests(Item item);
}
