package com.netflix.conductor.dao.es5.index;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.netflix.conductor.annotations.Trace;
import com.netflix.conductor.common.metadata.events.EventExecution;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskExecLog;
import com.netflix.conductor.common.run.SearchResult;
import com.netflix.conductor.common.run.TaskSummary;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.run.WorkflowSummary;
import com.netflix.conductor.common.utils.RetryUtil;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.execution.ApplicationException;
import com.netflix.conductor.dao.IndexDAO;
import com.netflix.conductor.dao.es5.index.query.parser.Expression;
import com.netflix.conductor.dao.es5.index.query.parser.ParserException;
import com.netflix.conductor.metrics.Monitors;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.get.GetField;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@Trace
/* loaded from: input_file:com/netflix/conductor/dao/es5/index/ElasticSearchDAO.class */
public class ElasticSearchDAO implements IndexDAO {
    private static final String WORKFLOW_DOC_TYPE = "workflow";
    private static final String TASK_DOC_TYPE = "task";
    private static final String LOG_DOC_TYPE = "task";
    private static final String EVENT_DOC_TYPE = "event";
    private static final String MSG_DOC_TYPE = "message";
    private static final int RETRY_COUNT = 3;
    private String indexName;
    private String logIndexName;
    private String logIndexPrefix;
    private ObjectMapper objectMapper;
    private Client elasticSearchClient;
    private final ExecutorService executorService;
    private static Logger logger = LoggerFactory.getLogger(ElasticSearchDAO.class);
    private static final String className = ElasticSearchDAO.class.getSimpleName();
    private static final TimeZone GMT = TimeZone.getTimeZone("GMT");
    private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyyMMww");

    @Inject
    public ElasticSearchDAO(Client client, Configuration configuration, ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
        this.elasticSearchClient = client;
        this.indexName = configuration.getProperty("workflow.elasticsearch.index.name", (String) null);
        try {
            initIndex();
            updateIndexName(configuration);
            Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> {
                updateIndexName(configuration);
            }, 0L, 1L, TimeUnit.HOURS);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        this.executorService = new ThreadPoolExecutor(6, 12, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue());
    }

    private void updateIndexName(Configuration configuration) {
        this.logIndexPrefix = configuration.getProperty("workflow.elasticsearch.tasklog.index.name", "task_log");
        this.logIndexName = this.logIndexPrefix + "_" + SIMPLE_DATE_FORMAT.format(new Date());
        try {
            this.elasticSearchClient.admin().indices().prepareGetIndex().addIndices(new String[]{this.logIndexName}).execute().actionGet();
        } catch (IndexNotFoundException e) {
            try {
                this.elasticSearchClient.admin().indices().prepareCreate(this.logIndexName).execute().actionGet();
            } catch (Exception e2) {
                logger.error("Failed to update log index name: {}", this.logIndexName, e2);
            } catch (ResourceAlreadyExistsException e3) {
            }
        }
    }

    private void initIndex() throws Exception {
        if (((GetIndexTemplatesResponse) this.elasticSearchClient.admin().indices().prepareGetTemplates(new String[]{"wfe_template"}).execute().actionGet()).getIndexTemplates().isEmpty()) {
            logger.info("Creating the index template 'wfe_template'");
            try {
                this.elasticSearchClient.admin().indices().preparePutTemplate("wfe_template").setSource(IOUtils.toByteArray(ElasticSearchDAO.class.getResourceAsStream("/template.json")), XContentType.JSON).execute().actionGet();
            } catch (Exception e) {
                logger.error("Failed to init index template", e);
            }
        }
        try {
            this.elasticSearchClient.admin().indices().prepareGetIndex().addIndices(new String[]{this.indexName}).execute().actionGet();
        } catch (IndexNotFoundException e2) {
            try {
                this.elasticSearchClient.admin().indices().prepareCreate(this.indexName).execute().actionGet();
            } catch (ResourceAlreadyExistsException e3) {
            }
        }
        if (((GetMappingsResponse) this.elasticSearchClient.admin().indices().prepareGetMappings(new String[]{this.indexName}).addTypes(new String[]{WORKFLOW_DOC_TYPE}).execute().actionGet()).mappings().isEmpty()) {
            logger.info("Adding the workflow type mappings");
            try {
                this.elasticSearchClient.admin().indices().preparePutMapping(new String[]{this.indexName}).setType(WORKFLOW_DOC_TYPE).setSource(new String(IOUtils.toByteArray(ElasticSearchDAO.class.getResourceAsStream("/wfe_type.json")))).execute().actionGet();
            } catch (Exception e4) {
                logger.error("Failed to init index mappings", e4);
            }
        }
    }

    public void indexWorkflow(Workflow workflow) {
        try {
            String workflowId = workflow.getWorkflowId();
            byte[] writeValueAsBytes = this.objectMapper.writeValueAsBytes(new WorkflowSummary(workflow));
            UpdateRequest updateRequest = new UpdateRequest(this.indexName, WORKFLOW_DOC_TYPE, workflowId);
            updateRequest.doc(writeValueAsBytes, XContentType.JSON);
            updateRequest.upsert(writeValueAsBytes, XContentType.JSON);
            updateRequest.retryOnConflict(5);
            updateWithRetry(updateRequest, "Index workflow into doc_type workflow");
        } catch (Throwable th) {
            logger.error("Failed to index workflow: {}", workflow.getWorkflowId(), th);
        }
    }

    public CompletableFuture<Void> asyncIndexWorkflow(Workflow workflow) {
        return CompletableFuture.runAsync(() -> {
            indexWorkflow(workflow);
        }, this.executorService);
    }

    public void indexTask(Task task) {
        try {
            String taskId = task.getTaskId();
            byte[] writeValueAsBytes = this.objectMapper.writeValueAsBytes(new TaskSummary(task));
            UpdateRequest updateRequest = new UpdateRequest(this.indexName, "task", taskId);
            updateRequest.doc(writeValueAsBytes, XContentType.JSON);
            updateRequest.upsert(writeValueAsBytes, XContentType.JSON);
            updateWithRetry(updateRequest, "Index workflow into doc_type workflow");
        } catch (Throwable th) {
            logger.error("Failed to index task: {}", task.getTaskId(), th);
        }
    }

    public CompletableFuture<Void> asyncIndexTask(Task task) {
        return CompletableFuture.runAsync(() -> {
            indexTask(task);
        }, this.executorService);
    }

    public void addTaskExecutionLogs(List<TaskExecLog> list) {
        if (list.isEmpty()) {
            return;
        }
        try {
            BulkRequestBuilder prepareBulk = this.elasticSearchClient.prepareBulk();
            for (TaskExecLog taskExecLog : list) {
                IndexRequest indexRequest = new IndexRequest(this.logIndexName, "task");
                indexRequest.source(this.objectMapper.writeValueAsBytes(taskExecLog), XContentType.JSON);
                prepareBulk.add(indexRequest);
            }
            new RetryUtil().retryOnException(() -> {
                return (BulkResponse) prepareBulk.execute().actionGet();
            }, (Predicate) null, (v0) -> {
                return v0.hasFailures();
            }, RETRY_COUNT, "Indexing all execution logs into doc_type task", "addTaskExecutionLogs");
        } catch (Throwable th) {
            logger.error("Failed to index task execution logs for tasks: ", (List) list.stream().map((v0) -> {
                return v0.getTaskId();
            }).collect(Collectors.toList()), th);
        }
    }

    public CompletableFuture<Void> asyncAddTaskExecutionLogs(List<TaskExecLog> list) {
        return CompletableFuture.runAsync(() -> {
            addTaskExecutionLogs(list);
        }, this.executorService);
    }

    public List<TaskExecLog> getTaskExecutionLogs(String str) {
        try {
            SearchHit[] hits = ((SearchResponse) this.elasticSearchClient.prepareSearch(new String[]{this.indexName}).setQuery(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery("*")).must(QueryBuilders.boolQuery().must(Expression.fromString("taskId='" + str + "'").getFilterBuilder()))).setTypes(new String[]{"task"}).execute().actionGet()).getHits().getHits();
            ArrayList arrayList = new ArrayList(hits.length);
            for (SearchHit searchHit : hits) {
                arrayList.add((TaskExecLog) this.objectMapper.readValue(searchHit.getSourceAsString(), TaskExecLog.class));
            }
            return arrayList;
        } catch (Exception e) {
            logger.error("Failed to get task execution logs for task: {}", str, e);
            return null;
        }
    }

    public void addMessage(String str, Message message) {
        HashMap hashMap = new HashMap();
        hashMap.put("messageId", message.getId());
        hashMap.put("payload", message.getPayload());
        hashMap.put("queue", str);
        hashMap.put("created", Long.valueOf(System.currentTimeMillis()));
        IndexRequest indexRequest = new IndexRequest(this.logIndexName, MSG_DOC_TYPE);
        indexRequest.source(hashMap);
        try {
            new RetryUtil().retryOnException(() -> {
                return (IndexResponse) this.elasticSearchClient.index(indexRequest).actionGet();
            }, (Predicate) null, (Predicate) null, RETRY_COUNT, "Indexing document in  for docType: message", "addMessage");
        } catch (Throwable th) {
            logger.error("Failed to index message: {}", message.getId(), th);
        }
    }

    public void addEventExecution(EventExecution eventExecution) {
        try {
            byte[] writeValueAsBytes = this.objectMapper.writeValueAsBytes(eventExecution);
            UpdateRequest updateRequest = new UpdateRequest(this.logIndexName, EVENT_DOC_TYPE, eventExecution.getName() + "." + eventExecution.getEvent() + "." + eventExecution.getMessageId() + "." + eventExecution.getId());
            updateRequest.doc(writeValueAsBytes, XContentType.JSON);
            updateRequest.upsert(writeValueAsBytes, XContentType.JSON);
            updateRequest.retryOnConflict(5);
            updateWithRetry(updateRequest, "Update Event execution for doc_type event");
        } catch (Throwable th) {
            logger.error("Failed to index event execution: {}", eventExecution.getId(), th);
        }
    }

    public CompletableFuture<Void> asyncAddEventExecution(EventExecution eventExecution) {
        return CompletableFuture.runAsync(() -> {
            addEventExecution(eventExecution);
        }, this.executorService);
    }

    private void updateWithRetry(UpdateRequest updateRequest, String str) {
        try {
            new RetryUtil().retryOnException(() -> {
                return (UpdateResponse) this.elasticSearchClient.update(updateRequest).actionGet();
            }, (Predicate) null, (Predicate) null, RETRY_COUNT, str, "updateWithRetry");
        } catch (Exception e) {
            Monitors.error(className, "index");
            logger.error("Failed to index {} for request type: {}", new Object[]{updateRequest.index(), updateRequest.type(), e});
        }
    }

    public SearchResult<String> searchWorkflows(String str, String str2, int i, int i2, List<String> list) {
        try {
            return search(str, i, i2, list, str2, WORKFLOW_DOC_TYPE);
        } catch (ParserException e) {
            throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, e.getMessage(), e);
        }
    }

    public SearchResult<String> searchTasks(String str, String str2, int i, int i2, List<String> list) {
        try {
            return search(str, i, i2, list, str2, "task");
        } catch (ParserException e) {
            throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, e.getMessage(), e);
        }
    }

    public void removeWorkflow(String str) {
        try {
            if (((DeleteResponse) this.elasticSearchClient.delete(new DeleteRequest(this.indexName, WORKFLOW_DOC_TYPE, str)).actionGet()).getResult() == DocWriteResponse.Result.DELETED) {
                logger.error("Index removal failed - document not found by id: {}", str);
            }
        } catch (Throwable th) {
            logger.error("Failed to remove workflow {} from index", str, th);
            Monitors.error(className, "remove");
        }
    }

    public CompletableFuture<Void> asyncRemoveWorkflow(String str) {
        return CompletableFuture.runAsync(() -> {
            removeWorkflow(str);
        }, this.executorService);
    }

    public void updateWorkflow(String str, String[] strArr, Object[] objArr) {
        if (strArr.length != objArr.length) {
            throw new IllegalArgumentException("Number of keys and values should be same.");
        }
        UpdateRequest updateRequest = new UpdateRequest(this.indexName, WORKFLOW_DOC_TYPE, str);
        Map map = (Map) IntStream.range(0, strArr.length).boxed().collect(Collectors.toMap(num -> {
            return strArr[num.intValue()];
        }, num2 -> {
            return objArr[num2.intValue()];
        }));
        updateRequest.doc(map);
        logger.debug("Updating workflow {} with {}", str, map);
        new RetryUtil().retryOnException(() -> {
            return this.elasticSearchClient.update(updateRequest);
        }, (Predicate) null, (Predicate) null, RETRY_COUNT, "Updating index for doc_type workflow", "updateWorkflow");
    }

    public CompletableFuture<Void> asyncUpdateWorkflow(String str, String[] strArr, Object[] objArr) {
        return CompletableFuture.runAsync(() -> {
            updateWorkflow(str, strArr, objArr);
        }, this.executorService);
    }

    public String get(String str, String str2) {
        Object obj = null;
        Map fields = ((GetResponse) this.elasticSearchClient.get(new GetRequest(this.indexName, WORKFLOW_DOC_TYPE, str).storedFields(new String[]{str2})).actionGet()).getFields();
        if (fields == null) {
            return null;
        }
        GetField getField = (GetField) fields.get(str2);
        if (getField != null) {
            obj = getField.getValue();
        }
        if (obj != null) {
            return obj.toString();
        }
        return null;
    }

    private SearchResult<String> search(String str, int i, int i2, List<String> list, String str2, String str3) throws ParserException {
        QueryBuilder matchAllQuery = QueryBuilders.matchAllQuery();
        if (StringUtils.isNotEmpty(str)) {
            matchAllQuery = Expression.fromString(str).getFilterBuilder();
        }
        SearchRequestBuilder size = this.elasticSearchClient.prepareSearch(new String[]{this.indexName}).setQuery(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(str2)).must(QueryBuilders.boolQuery().must(matchAllQuery))).setTypes(new String[]{str3}).storedFields(new String[]{"_id"}).setFrom(i).setSize(i2);
        if (list != null) {
            list.forEach(str4 -> {
                SortOrder sortOrder = SortOrder.ASC;
                String str4 = str4;
                int indexOf = str4.indexOf(58);
                if (indexOf > 0) {
                    str4 = str4.substring(0, indexOf);
                    sortOrder = SortOrder.valueOf(str4.substring(indexOf + 1));
                }
                size.addSort(str4, sortOrder);
            });
        }
        LinkedList linkedList = new LinkedList();
        SearchResponse searchResponse = size.get();
        searchResponse.getHits().forEach(searchHit -> {
            linkedList.add(searchHit.getId());
        });
        return new SearchResult<>(searchResponse.getHits().getTotalHits(), linkedList);
    }

    static {
        SIMPLE_DATE_FORMAT.setTimeZone(GMT);
    }
}
