package de.julielab.jcore.consumer.es;

import com.google.gson.Gson;
import de.julielab.jcore.consumer.es.preanalyzed.Document;
import de.julielab.jcore.utility.JCoReTools;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.apache.uima.UimaContext;
import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
import org.apache.uima.fit.descriptor.ConfigurationParameter;
import org.apache.uima.fit.descriptor.ResourceMetaData;
import org.apache.uima.jcas.JCas;
import org.apache.uima.resource.ResourceInitializationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ResourceMetaData(name = "JCore ElasticSearch Consumer")
/* loaded from: input_file:de/julielab/jcore/consumer/es/ElasticSearchConsumer.class */
public class ElasticSearchConsumer extends AbstractCasToJsonConsumer {
    public static final String PARAM_URLS = "urls";
    public static final String PARAM_INDEX_NAME = "indexName";
    public static final String PARAM_TYPE = "type";
    public static final String PARAM_BATCH_SIZE = "batchSize";

    @ConfigurationParameter(name = PARAM_URLS, description = "A list of URLs pointing to different nodes of the ElasticSearch cluster, e.g. http://localhost:9300/. Documents will be sent bulk-wise to the nodes in a round-robin fashion.")
    private String[] urls;

    @ConfigurationParameter(name = PARAM_INDEX_NAME, description = "The ElasticSearch index name to send the created documents to.")
    private String indexName;

    @ConfigurationParameter(name = PARAM_TYPE, mandatory = false, description = "The index type the generated documents should have. The types are removed from ElasticSearch with version 7 and should omitted for ES >= 7.")
    private String type;

    @ConfigurationParameter(name = PARAM_BATCH_SIZE, mandatory = false, description = "The number of documents to be sent to ElasticSearch in a single batch. Defaults to 50.")
    private int batchSize;
    private List<String> bulkCommand;
    private HttpPost[] indexPosts;
    private HttpClient httpclient;
    final Logger log = LoggerFactory.getLogger(ElasticSearchConsumer.class);
    private int urlIndex = 0;
    private int docNum = 0;

    @Override // de.julielab.jcore.consumer.es.AbstractCasToJsonConsumer
    public void initialize(UimaContext uimaContext) throws ResourceInitializationException {
        super.initialize(uimaContext);
        this.urls = (String[]) getContext().getConfigParameterValue(PARAM_URLS);
        this.indexName = (String) getContext().getConfigParameterValue(PARAM_INDEX_NAME);
        this.type = (String) getContext().getConfigParameterValue(PARAM_TYPE);
        this.batchSize = ((Integer) Optional.ofNullable((Integer) getContext().getConfigParameterValue(PARAM_BATCH_SIZE)).orElse(50)).intValue();
        this.bulkCommand = new ArrayList(4000);
        this.httpclient = HttpClientBuilder.create().build();
        if (this.urls != null) {
            this.indexPosts = new HttpPost[this.urls.length];
            for (int i = 0; i < this.urls.length; i++) {
                String str = this.urls[i];
                if (null != str && !str.endsWith("/_bulk")) {
                    str = str + "/_bulk";
                }
                this.indexPosts[i] = new HttpPost(str);
                this.indexPosts[i].addHeader("Content-Type", "application/x-ndjson");
            }
        }
        if (this.log.isInfoEnabled()) {
            this.log.info("{}: {}", PARAM_URLS, Arrays.toString(this.urls));
            this.log.info("{}: {}", PARAM_INDEX_NAME, this.indexName);
            this.log.info("{}: {}", PARAM_TYPE, this.type);
        }
    }

    public void process(JCas jCas) throws AnalysisEngineProcessException {
        try {
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();
            Gson gson = new Gson();
            Document convertCasToDocument = convertCasToDocument(jCas);
            if (null != convertCasToDocument && !convertCasToDocument.isEmpty()) {
                addIndexAction(this.indexName, convertCasToDocument.getId(), convertCasToDocument.getParentId(), gson);
                addIndexSource(convertCasToDocument);
            }
            List<Document> convertCasToDocuments = convertCasToDocuments(jCas);
            if (convertCasToDocuments != null) {
                for (Document document : convertCasToDocuments) {
                    addIndexAction(document.getIndex() != null ? document.getIndex() : this.indexName, document.getId(), document.getParentId(), gson);
                    addIndexSource(document);
                }
            }
            stopWatch.stop();
            this.docNum++;
            if (this.docNum % this.batchSize == 0) {
                customBatchProcessComplete();
            }
        } catch (Exception e) {
            this.log.error("Error with document ID {}.", JCoReTools.getDocId(jCas));
            throw new AnalysisEngineProcessException(e);
        }
    }

    private void addIndexAction(String str, String str2, String str3, Gson gson) {
        if (str2 == null) {
            throw new IllegalArgumentException("The document ID was not specified.");
        }
        if (str == null) {
            throw new IllegalArgumentException("No target index was specified for document " + str2 + ".");
        }
        HashMap hashMap = new HashMap();
        hashMap.put("_index", str);
        if (this.type != null) {
            hashMap.put("_type", this.type);
        }
        hashMap.put("_id", str2);
        if (str3 != null && str3.trim().length() > 0) {
            hashMap.put("parent", str3);
        }
        HashMap hashMap2 = new HashMap();
        hashMap2.put("index", hashMap);
        this.bulkCommand.add(gson.toJson(hashMap2));
    }

    private void addIndexSource(Document document) {
        this.bulkCommand.add(createIndexSource(document));
    }

    protected String createIndexSource(Document document) {
        return this.gson.toJson(document);
    }

    public void customBatchProcessComplete() throws AnalysisEngineProcessException {
        super.batchProcessComplete();
        this.log.debug("Batch of {} documents is sent to ElasticSearch.", Integer.valueOf(this.docNum));
        this.docNum = 0;
        postBulkIndexAction();
    }

    public void collectionProcessComplete() throws AnalysisEngineProcessException {
        super.collectionProcessComplete();
        this.log.info("Collection complete.");
        postBulkIndexAction();
    }

    private void postBulkIndexAction() throws AnalysisEngineProcessException {
        List<String> subList;
        if (this.bulkCommand.isEmpty()) {
            return;
        }
        HttpPost httpPost = this.indexPosts[this.urlIndex];
        this.urlIndex = (this.urlIndex + 1) % this.indexPosts.length;
        try {
            int i = 0;
            do {
                try {
                    subList = this.bulkCommand.subList(i, Math.min(this.bulkCommand.size(), i + 1000));
                    if (!subList.isEmpty()) {
                        i += subList.size();
                        this.log.debug("Sending {} documents to index {}.", Integer.valueOf(subList.size() / 2), this.indexName);
                        long currentTimeMillis = System.currentTimeMillis();
                        String str = StringUtils.join(subList, "\n") + "\n";
                        httpPost.setEntity(new StringEntity(str, "UTF-8"));
                        HttpResponse execute = this.httpclient.execute(httpPost);
                        int statusCode = execute.getStatusLine().getStatusCode();
                        HttpEntity entity = execute.getEntity();
                        if (statusCode > 200) {
                            this.log.error("The server responded with a non-OK status code: {}", Integer.valueOf(statusCode));
                            this.log.error("Response status line: {}", execute.getStatusLine());
                            this.log.error("Response body: {}", EntityUtils.toString(entity));
                            this.log.error("Bulk command was: {}", str);
                        }
                        EntityUtils.consume(entity);
                        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                        this.log.debug("Sending took {}ms ({}s) and returned status code {}", new Object[]{Long.valueOf(currentTimeMillis2), Long.valueOf(currentTimeMillis2 / 1000), Integer.valueOf(statusCode)});
                    }
                    if (null == subList) {
                        break;
                    }
                } catch (IOException e) {
                    this.log.error("Error when sending data to ElasticSearch:", e);
                    throw new AnalysisEngineProcessException(e);
                }
            } while (!subList.isEmpty());
        } finally {
            httpPost.reset();
            this.bulkCommand.clear();
        }
    }

    private void printNonOkDocument(HttpEntity httpEntity) throws IOException {
        Iterator it = ((List) ((Map) this.gson.fromJson(EntityUtils.toString(httpEntity, "UTF-8"), Map.class)).get("items")).iterator();
        while (it.hasNext()) {
            Map map = (Map) ((Map) it.next()).get("index");
            if (!((Boolean) map.get("ok")).booleanValue()) {
                System.out.println(map);
                System.exit(23);
            }
        }
    }
}
