package io.annot8.components.elasticsearch.processors;

import io.annot8.api.components.responses.ProcessorResponse;
import io.annot8.api.data.Item;
import io.annot8.common.components.AbstractProcessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import org.apache.http.HttpHost;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;

/* loaded from: input_file:io/annot8/components/elasticsearch/processors/AbstractElasticsearchSink.class */
public abstract class AbstractElasticsearchSink extends AbstractProcessor {
    protected final RestHighLevelClient client;
    protected final String index;

    public AbstractElasticsearchSink(List<HttpHost> list, String str) {
        this.client = new RestHighLevelClient(RestClient.builder((HttpHost[]) list.toArray(new HttpHost[0])));
        this.index = str;
    }

    public void close() {
        if (this.client != null) {
            try {
                this.client.close();
            } catch (IOException e) {
                log().warn("Unable to close Elasticsearch client", e);
            }
        }
    }

    public ProcessorResponse process(Item item) {
        List<IndexRequest> itemToIndexRequests = itemToIndexRequests(item);
        if (itemToIndexRequests.isEmpty()) {
            log().debug("No index requests created for item {}", item.getId());
            return ProcessorResponse.ok();
        }
        BulkRequest bulkRequest = new BulkRequest();
        Objects.requireNonNull(bulkRequest);
        itemToIndexRequests.forEach(bulkRequest::add);
        ArrayList arrayList = new ArrayList();
        try {
            log().debug("Performing bulk request to index item {} ({} index requests)", item.getId(), Integer.valueOf(itemToIndexRequests.size()));
            Iterator it = this.client.bulk(bulkRequest, RequestOptions.DEFAULT).iterator();
            while (it.hasNext()) {
                BulkItemResponse bulkItemResponse = (BulkItemResponse) it.next();
                if (bulkItemResponse.isFailed()) {
                    BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
                    log().error("Failed to create/update document {} in index {}: {}", new Object[]{bulkItemResponse.getId(), bulkItemResponse.getIndex(), failure.getMessage(), failure.getCause()});
                    arrayList.add(failure.getCause());
                } else {
                    DocWriteResponse response = bulkItemResponse.getResponse();
                    if (response.getResult() == DocWriteResponse.Result.CREATED) {
                        log().debug("New document {} created in index {}", response.getId(), response.getIndex());
                    } else if (response.getResult() == DocWriteResponse.Result.UPDATED) {
                        log().debug("Existing document {} updated in index {}", response.getId(), response.getIndex());
                    } else {
                        log().error("Unexpected result returned whilst indexing document {} in index {}: {}", new Object[]{response.getId(), response.getIndex(), response.getResult().name()});
                    }
                }
            }
            return arrayList.isEmpty() ? ProcessorResponse.ok() : ProcessorResponse.itemError(arrayList);
        } catch (IOException e) {
            log().error("Exception thrown whilst performing bulk request: {}", e.getMessage());
            return ProcessorResponse.itemError(new Exception[]{e});
        }
    }

    protected abstract List<IndexRequest> itemToIndexRequests(Item item);
}
