package com.netflix.conductor.dao.es5.index;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.MapType;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.google.common.base.Predicate;
import com.netflix.conductor.annotations.Trace;
import com.netflix.conductor.common.metadata.events.EventExecution;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskExecLog;
import com.netflix.conductor.common.run.SearchResult;
import com.netflix.conductor.common.run.TaskSummary;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.run.WorkflowSummary;
import com.netflix.conductor.common.utils.RetryUtil;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.execution.ApplicationException;
import com.netflix.conductor.dao.IndexDAO;
import com.netflix.conductor.dao.es5.index.query.parser.Expression;
import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration;
import com.netflix.conductor.elasticsearch.query.parser.ParserException;
import com.netflix.conductor.metrics.Monitors;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TimeZone;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@Trace
/* loaded from: input_file:com/netflix/conductor/dao/es5/index/ElasticSearchDAOV5.class */
public class ElasticSearchDAOV5 implements IndexDAO {
    private static final String WORKFLOW_DOC_TYPE = "workflow";
    private static final String TASK_DOC_TYPE = "task";
    private static final String LOG_DOC_TYPE = "task_log";
    private static final String EVENT_DOC_TYPE = "event";
    private static final String MSG_DOC_TYPE = "message";
    private static final int RETRY_COUNT = 3;
    private final String indexName;
    private String logIndexName;
    private final String logIndexPrefix;
    private final ObjectMapper objectMapper;
    private final Client elasticSearchClient;
    private final ExecutorService executorService;
    private final int archiveSearchBatchSize;
    private static Logger logger = LoggerFactory.getLogger(ElasticSearchDAOV5.class);
    private static final String className = ElasticSearchDAOV5.class.getSimpleName();
    private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyyMMww");
    private static final TimeZone GMT = TimeZone.getTimeZone("GMT");

    @Inject
    public ElasticSearchDAOV5(Client client, ElasticSearchConfiguration elasticSearchConfiguration, ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
        this.elasticSearchClient = client;
        this.indexName = elasticSearchConfiguration.getIndexName();
        this.logIndexPrefix = elasticSearchConfiguration.getTasklogIndexName();
        this.archiveSearchBatchSize = elasticSearchConfiguration.getArchiveSearchBatchSize();
        this.executorService = new ThreadPoolExecutor(6, elasticSearchConfiguration.getAsyncMaxPoolSize(), 1L, TimeUnit.MINUTES, new LinkedBlockingQueue(elasticSearchConfiguration.getAsyncWorkerQueueSize()), (runnable, threadPoolExecutor) -> {
            logger.warn("Request  {} to async dao discarded in executor {}", runnable, threadPoolExecutor);
        });
    }

    public void setup() throws Exception {
        this.elasticSearchClient.admin().cluster().prepareHealth(new String[0]).setWaitForGreenStatus().execute().get();
        try {
            initIndex();
            updateLogIndexName();
            Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> {
                updateLogIndexName();
            }, 0L, 1L, TimeUnit.HOURS);
        } catch (Exception e) {
            logger.error("Error creating index templates", e);
        }
        addIndex(this.indexName);
        addMappingToIndex(this.indexName, WORKFLOW_DOC_TYPE, "/mappings_docType_workflow.json");
        addMappingToIndex(this.indexName, TASK_DOC_TYPE, "/mappings_docType_task.json");
    }

    private void addIndex(String str) {
        try {
            this.elasticSearchClient.admin().indices().prepareGetIndex().addIndices(new String[]{str}).execute().actionGet();
        } catch (IndexNotFoundException e) {
            try {
                this.elasticSearchClient.admin().indices().prepareCreate(str).execute().actionGet();
            } catch (ResourceAlreadyExistsException e2) {
            }
        }
    }

    private void addMappingToIndex(String str, String str2, String str3) throws IOException {
        if (((GetMappingsResponse) this.elasticSearchClient.admin().indices().prepareGetMappings(new String[]{str}).addTypes(new String[]{str2}).execute().actionGet()).mappings().isEmpty()) {
            logger.info("Adding the workflow type mappings");
            try {
                this.elasticSearchClient.admin().indices().preparePutMapping(new String[]{str}).setType(str2).setSource(new String(IOUtils.toByteArray(ElasticSearchDAOV5.class.getResourceAsStream(str3)))).execute().actionGet();
            } catch (Exception e) {
                logger.error("Failed to init index mappings", e);
            }
        }
    }

    private void updateLogIndexName() {
        this.logIndexName = this.logIndexPrefix + "_" + SIMPLE_DATE_FORMAT.format(new Date());
        try {
            this.elasticSearchClient.admin().indices().prepareGetIndex().addIndices(new String[]{this.logIndexName}).execute().actionGet();
        } catch (IndexNotFoundException e) {
            try {
                this.elasticSearchClient.admin().indices().prepareCreate(this.logIndexName).execute().actionGet();
            } catch (ResourceAlreadyExistsException e2) {
            } catch (Exception e3) {
                logger.error("Failed to update log index name: {}", this.logIndexName, e3);
            }
        }
    }

    private void initIndex() throws Exception {
        if (((GetIndexTemplatesResponse) this.elasticSearchClient.admin().indices().prepareGetTemplates(new String[]{"tasklog_template"}).execute().actionGet()).getIndexTemplates().isEmpty()) {
            logger.info("Creating the index template 'tasklog_template'");
            try {
                this.elasticSearchClient.admin().indices().preparePutTemplate("tasklog_template").setSource(IOUtils.toByteArray(ElasticSearchDAOV5.class.getResourceAsStream("/template_tasklog.json")), XContentType.JSON).execute().actionGet();
            } catch (Exception e) {
                logger.error("Failed to init tasklog_template", e);
            }
        }
    }

    public void indexWorkflow(Workflow workflow) {
        try {
            String workflowId = workflow.getWorkflowId();
            byte[] writeValueAsBytes = this.objectMapper.writeValueAsBytes(new WorkflowSummary(workflow));
            UpdateRequest updateRequest = new UpdateRequest(this.indexName, WORKFLOW_DOC_TYPE, workflowId);
            updateRequest.doc(writeValueAsBytes, XContentType.JSON);
            updateRequest.upsert(writeValueAsBytes, XContentType.JSON);
            updateRequest.retryOnConflict(5);
            updateWithRetry(updateRequest, "Index workflow into doc_type workflow");
        } catch (Exception e) {
            logger.error("Failed to index workflow: {}", workflow.getWorkflowId(), e);
        }
    }

    public CompletableFuture<Void> asyncIndexWorkflow(Workflow workflow) {
        return CompletableFuture.runAsync(() -> {
            indexWorkflow(workflow);
        }, this.executorService);
    }

    public void indexTask(Task task) {
        try {
            String taskId = task.getTaskId();
            byte[] writeValueAsBytes = this.objectMapper.writeValueAsBytes(new TaskSummary(task));
            UpdateRequest updateRequest = new UpdateRequest(this.indexName, TASK_DOC_TYPE, taskId);
            updateRequest.doc(writeValueAsBytes, XContentType.JSON);
            updateRequest.upsert(writeValueAsBytes, XContentType.JSON);
            updateWithRetry(updateRequest, "Index workflow into doc_type workflow");
        } catch (Exception e) {
            logger.error("Failed to index task: {}", task.getTaskId(), e);
        }
    }

    public CompletableFuture<Void> asyncIndexTask(Task task) {
        return CompletableFuture.runAsync(() -> {
            indexTask(task);
        }, this.executorService);
    }

    public void addTaskExecutionLogs(List<TaskExecLog> list) {
        if (list.isEmpty()) {
            return;
        }
        try {
            BulkRequestBuilder prepareBulk = this.elasticSearchClient.prepareBulk();
            for (TaskExecLog taskExecLog : list) {
                IndexRequest indexRequest = new IndexRequest(this.logIndexName, "task_log");
                indexRequest.source(this.objectMapper.writeValueAsBytes(taskExecLog), XContentType.JSON);
                prepareBulk.add(indexRequest);
            }
            new RetryUtil().retryOnException(() -> {
                return (BulkResponse) prepareBulk.execute().actionGet();
            }, (Predicate) null, (v0) -> {
                return v0.hasFailures();
            }, RETRY_COUNT, "Indexing all execution logs into doc_type task", "addTaskExecutionLogs");
        } catch (Exception e) {
            logger.error("Failed to index task execution logs for tasks: {}", (List) list.stream().map((v0) -> {
                return v0.getTaskId();
            }).collect(Collectors.toList()), e);
        }
    }

    public CompletableFuture<Void> asyncAddTaskExecutionLogs(List<TaskExecLog> list) {
        return CompletableFuture.runAsync(() -> {
            addTaskExecutionLogs(list);
        }, this.executorService);
    }

    public List<TaskExecLog> getTaskExecutionLogs(String str) {
        try {
            return (List) Arrays.stream(((SearchResponse) this.elasticSearchClient.prepareSearch(new String[]{this.logIndexPrefix + "*"}).setQuery(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery("*")).must(QueryBuilders.boolQuery().must(Expression.fromString("taskId='" + str + "'").getFilterBuilder()))).setTypes(new String[]{"task_log"}).addSort(SortBuilders.fieldSort("createdTime").order(SortOrder.ASC)).execute().actionGet()).getHits().getHits()).map(searchHit -> {
                String sourceAsString = searchHit.getSourceAsString();
                try {
                    return (TaskExecLog) this.objectMapper.readValue(sourceAsString, TaskExecLog.class);
                } catch (IOException e) {
                    logger.error("exception deserializing taskExecLog: {}", sourceAsString);
                    return null;
                }
            }).filter(taskExecLog -> {
                return Objects.nonNull(taskExecLog);
            }).collect(Collectors.toList());
        } catch (Exception e) {
            logger.error("Failed to get task execution logs for task: {}", str, e);
            return null;
        }
    }

    public void addMessage(String str, Message message) {
        HashMap hashMap = new HashMap();
        hashMap.put("messageId", message.getId());
        hashMap.put("payload", message.getPayload());
        hashMap.put("queue", str);
        hashMap.put("created", Long.valueOf(System.currentTimeMillis()));
        IndexRequest indexRequest = new IndexRequest(this.logIndexName, MSG_DOC_TYPE);
        indexRequest.source(hashMap);
        try {
            new RetryUtil().retryOnException(() -> {
                return (IndexResponse) this.elasticSearchClient.index(indexRequest).actionGet();
            }, (Predicate) null, (Predicate) null, RETRY_COUNT, "Indexing document in  for docType: message", "addMessage");
        } catch (Exception e) {
            logger.error("Failed to index message: {}", message.getId(), e);
        }
    }

    public void addEventExecution(EventExecution eventExecution) {
        try {
            byte[] writeValueAsBytes = this.objectMapper.writeValueAsBytes(eventExecution);
            UpdateRequest updateRequest = new UpdateRequest(this.logIndexName, EVENT_DOC_TYPE, eventExecution.getName() + "." + eventExecution.getEvent() + "." + eventExecution.getMessageId() + "." + eventExecution.getId());
            updateRequest.doc(writeValueAsBytes, XContentType.JSON);
            updateRequest.upsert(writeValueAsBytes, XContentType.JSON);
            updateRequest.retryOnConflict(5);
            updateWithRetry(updateRequest, "Update Event execution for doc_type event");
        } catch (Exception e) {
            logger.error("Failed to index event execution: {}", eventExecution.getId(), e);
        }
    }

    public CompletableFuture<Void> asyncAddEventExecution(EventExecution eventExecution) {
        return CompletableFuture.runAsync(() -> {
            addEventExecution(eventExecution);
        }, this.executorService);
    }

    private void updateWithRetry(UpdateRequest updateRequest, String str) {
        try {
            new RetryUtil().retryOnException(() -> {
                return (UpdateResponse) this.elasticSearchClient.update(updateRequest).actionGet();
            }, (Predicate) null, (Predicate) null, RETRY_COUNT, str, "updateWithRetry");
        } catch (Exception e) {
            Monitors.error(className, "index");
            logger.error("Failed to index {} for request type: {}", new Object[]{updateRequest.index(), updateRequest.type(), e});
        }
    }

    public SearchResult<String> searchWorkflows(String str, String str2, int i, int i2, List<String> list) {
        return search(this.indexName, str, i, i2, list, str2, WORKFLOW_DOC_TYPE);
    }

    public SearchResult<String> searchTasks(String str, String str2, int i, int i2, List<String> list) {
        return search(this.indexName, str, i, i2, list, str2, TASK_DOC_TYPE);
    }

    public void removeWorkflow(String str) {
        try {
            if (((DeleteResponse) this.elasticSearchClient.delete(new DeleteRequest(this.indexName, WORKFLOW_DOC_TYPE, str)).actionGet()).getResult() == DocWriteResponse.Result.DELETED) {
                logger.error("Index removal failed - document not found by id: {}", str);
            }
        } catch (Exception e) {
            logger.error("Failed to remove workflow {} from index", str, e);
            Monitors.error(className, "remove");
        }
    }

    public CompletableFuture<Void> asyncRemoveWorkflow(String str) {
        return CompletableFuture.runAsync(() -> {
            removeWorkflow(str);
        }, this.executorService);
    }

    public void updateWorkflow(String str, String[] strArr, Object[] objArr) {
        if (strArr.length != objArr.length) {
            throw new ApplicationException(ApplicationException.Code.INVALID_INPUT, "Number of keys and values do not match");
        }
        UpdateRequest updateRequest = new UpdateRequest(this.indexName, WORKFLOW_DOC_TYPE, str);
        Map map = (Map) IntStream.range(0, strArr.length).boxed().collect(Collectors.toMap(num -> {
            return strArr[num.intValue()];
        }, num2 -> {
            return objArr[num2.intValue()];
        }));
        updateRequest.doc(map);
        logger.debug("Updating workflow {} with {}", str, map);
        new RetryUtil().retryOnException(() -> {
            return this.elasticSearchClient.update(updateRequest);
        }, (Predicate) null, (Predicate) null, RETRY_COUNT, "Updating index for doc_type workflow", "updateWorkflow");
    }

    public CompletableFuture<Void> asyncUpdateWorkflow(String str, String[] strArr, Object[] objArr) {
        return CompletableFuture.runAsync(() -> {
            updateWorkflow(str, strArr, objArr);
        }, this.executorService);
    }

    public String get(String str, String str2) {
        GetResponse getResponse = (GetResponse) this.elasticSearchClient.get(new GetRequest(this.indexName, WORKFLOW_DOC_TYPE, str).fetchSourceContext(new FetchSourceContext(true, new String[]{str2}, Strings.EMPTY_ARRAY))).actionGet();
        if (getResponse.isExists()) {
            Map sourceAsMap = getResponse.getSourceAsMap();
            if (sourceAsMap.containsKey(str2)) {
                return sourceAsMap.get(str2).toString();
            }
        }
        logger.debug("Unable to find Workflow: {} in ElasticSearch index: {}.", str, this.indexName);
        return null;
    }

    private SearchResult<String> search(String str, String str2, int i, int i2, List<String> list, String str3, String str4) {
        try {
            QueryBuilder matchAllQuery = QueryBuilders.matchAllQuery();
            if (StringUtils.isNotEmpty(str2)) {
                matchAllQuery = Expression.fromString(str2).getFilterBuilder();
            }
            SearchRequestBuilder size = this.elasticSearchClient.prepareSearch(new String[]{str}).setQuery(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(str3)).must(QueryBuilders.boolQuery().must(matchAllQuery))).setTypes(new String[]{str4}).storedFields(new String[]{"_id"}).setFrom(i).setSize(i2);
            if (list != null) {
                list.forEach(str5 -> {
                    addSortOptionToSearchRequest(size, str5);
                });
            }
            SearchResponse searchResponse = size.get();
            return new SearchResult<>(searchResponse.getHits().getTotalHits(), (LinkedList) StreamSupport.stream(searchResponse.getHits().spliterator(), false).map((v0) -> {
                return v0.getId();
            }).collect(Collectors.toCollection(LinkedList::new)));
        } catch (ParserException e) {
            throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, e.getMessage(), e);
        }
    }

    private void addSortOptionToSearchRequest(SearchRequestBuilder searchRequestBuilder, String str) {
        SortOrder sortOrder = SortOrder.ASC;
        String str2 = str;
        int indexOf = str.indexOf(58);
        if (indexOf > 0) {
            str2 = str.substring(0, indexOf);
            sortOrder = SortOrder.valueOf(str.substring(indexOf + 1));
        }
        searchRequestBuilder.addSort(str2, sortOrder);
    }

    public List<String> searchArchivableWorkflows(String str, long j) {
        SearchHits hits = ((SearchResponse) this.elasticSearchClient.prepareSearch(new String[]{str}).setTypes(new String[]{WORKFLOW_DOC_TYPE}).setQuery(QueryBuilders.boolQuery().should(QueryBuilders.termQuery("status", "COMPLETED")).should(QueryBuilders.termQuery("status", "FAILED")).should(QueryBuilders.termQuery("status", "TIMED_OUT")).should(QueryBuilders.termQuery("status", "TERMINATED")).mustNot(QueryBuilders.existsQuery("archived")).minimumShouldMatch(1)).addSort("endTime", SortOrder.ASC).setSize(this.archiveSearchBatchSize).execute().actionGet()).getHits();
        logger.info("Archive search totalHits - {}", Long.valueOf(hits.getTotalHits()));
        return (List) Arrays.stream(hits.getHits()).map(searchHit -> {
            return searchHit.getId();
        }).collect(Collectors.toCollection(LinkedList::new));
    }

    public List<String> searchRecentRunningWorkflows(int i, int i2) {
        DateTime dateTime = new DateTime();
        return (List) StreamSupport.stream(((SearchResponse) this.elasticSearchClient.prepareSearch(new String[]{this.indexName}).setTypes(new String[]{WORKFLOW_DOC_TYPE}).setQuery(QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery("updateTime").gt(dateTime.minusHours(i))).must(QueryBuilders.rangeQuery("updateTime").lt(dateTime.minusHours(i2))).must(QueryBuilders.termQuery("status", "RUNNING"))).setSize(ElasticSearchConfiguration.ELASTIC_SEARCH_ARCHIVE_SEARCH_BATCH_SIZE_DEFAULT_VALUE).addSort("updateTime", SortOrder.ASC).execute().actionGet()).getHits().spliterator(), false).map(searchHit -> {
            return searchHit.getId();
        }).collect(Collectors.toCollection(LinkedList::new));
    }

    public List<Message> getMessages(String str) {
        try {
            return mapGetMessagesResponse((SearchResponse) this.elasticSearchClient.prepareSearch(new String[]{this.logIndexPrefix + "*"}).setQuery(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery("*")).must(QueryBuilders.boolQuery().must(Expression.fromString("queue='" + str + "'").getFilterBuilder()))).setTypes(new String[]{MSG_DOC_TYPE}).addSort(SortBuilders.fieldSort("created").order(SortOrder.ASC)).execute().actionGet());
        } catch (Exception e) {
            logger.error("Failed to get messages for queue: {}", str, e);
            throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, e.getMessage(), e);
        }
    }

    private List<Message> mapGetMessagesResponse(SearchResponse searchResponse) throws IOException {
        SearchHit[] hits = searchResponse.getHits().getHits();
        MapType constructMapType = TypeFactory.defaultInstance().constructMapType(HashMap.class, String.class, String.class);
        ArrayList arrayList = new ArrayList(hits.length);
        for (SearchHit searchHit : hits) {
            Map map = (Map) this.objectMapper.readValue(searchHit.getSourceAsString(), constructMapType);
            arrayList.add(new Message((String) map.get("messageId"), (String) map.get("payload"), (String) null));
        }
        return arrayList;
    }

    public List<EventExecution> getEventExecutions(String str) {
        try {
            return mapEventExecutionsResponse((SearchResponse) this.elasticSearchClient.prepareSearch(new String[]{this.logIndexPrefix + "*"}).setQuery(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery("*")).must(QueryBuilders.boolQuery().must(Expression.fromString("event='" + str + "'").getFilterBuilder()))).setTypes(new String[]{EVENT_DOC_TYPE}).addSort(SortBuilders.fieldSort("created").order(SortOrder.ASC)).execute().actionGet());
        } catch (Exception e) {
            logger.error("Failed to get executions for event: {}", str, e);
            throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, e.getMessage(), e);
        }
    }

    private List<EventExecution> mapEventExecutionsResponse(SearchResponse searchResponse) throws IOException {
        SearchHit[] hits = searchResponse.getHits().getHits();
        ArrayList arrayList = new ArrayList(hits.length);
        for (SearchHit searchHit : hits) {
            arrayList.add((EventExecution) this.objectMapper.readValue(searchHit.getSourceAsString(), EventExecution.class));
        }
        return arrayList;
    }

    static {
        SIMPLE_DATE_FORMAT.setTimeZone(GMT);
    }
}
