package com.marklogic.hub.legacy.flow.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.marklogic.client.DatabaseClient;
import com.marklogic.client.FailedRequestException;
import com.marklogic.client.datamovement.DataMovementManager;
import com.marklogic.client.datamovement.JobTicket;
import com.marklogic.client.datamovement.QueryBatcher;
import com.marklogic.client.datamovement.impl.JobTicketImpl;
import com.marklogic.client.extensions.ResourceManager;
import com.marklogic.hub.DatabaseKind;
import com.marklogic.hub.HubConfig;
import com.marklogic.hub.legacy.collector.DiskQueue;
import com.marklogic.hub.legacy.collector.LegacyCollector;
import com.marklogic.hub.legacy.flow.CodeFormat;
import com.marklogic.hub.legacy.flow.LegacyFlow;
import com.marklogic.hub.legacy.flow.LegacyFlowFinishedListener;
import com.marklogic.hub.legacy.flow.LegacyFlowItemCompleteListener;
import com.marklogic.hub.legacy.flow.LegacyFlowItemFailureListener;
import com.marklogic.hub.legacy.flow.LegacyFlowRunner;
import com.marklogic.hub.legacy.flow.LegacyFlowStatusListener;
import com.marklogic.hub.legacy.flow.RunFlowResponse;
import com.marklogic.hub.legacy.job.Job;
import com.marklogic.hub.legacy.job.JobStatus;
import com.marklogic.hub.legacy.job.LegacyJobManager;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;

/* loaded from: input_file:com/marklogic/hub/legacy/flow/impl/LegacyFlowRunnerImpl.class */
public class LegacyFlowRunnerImpl implements LegacyFlowRunner {
    private static final int DEFAULT_BATCH_SIZE = 100;
    private static final int DEFAULT_THREAD_COUNT = 4;
    private static final int MAX_ERROR_MESSAGES = 10;
    private LegacyFlow flow;
    private DatabaseClient stagingClient;
    private String destinationDatabase;
    private Map<String, Object> options;
    private int previousPercentComplete;
    private HubConfig hubConfig;
    private int batchSize = 100;
    private int threadCount = 4;
    private boolean stopOnFailure = false;
    private List<LegacyFlowItemCompleteListener> flowItemCompleteListeners = new ArrayList();
    private List<LegacyFlowItemFailureListener> flowItemFailureListeners = new ArrayList();
    private List<LegacyFlowStatusListener> flowStatusListeners = new ArrayList();
    private List<LegacyFlowFinishedListener> flowFinishedListeners = new ArrayList();
    private Thread runningThread = null;

    /* loaded from: input_file:com/marklogic/hub/legacy/flow/impl/LegacyFlowRunnerImpl$FlowResource.class */
    class FlowResource extends ResourceManager {
        private DatabaseClient srcClient;
        private String targetDatabase;
        private LegacyFlow flow;

        public FlowResource(DatabaseClient databaseClient, String str, LegacyFlow legacyFlow) {
            this.flow = legacyFlow;
            this.srcClient = databaseClient;
            this.targetDatabase = str;
            this.srcClient.init(legacyFlow.getCodeFormat().equals(CodeFormat.JAVASCRIPT) ? "mlSjsFlow" : "mlFlow", this);
        }

        public RunFlowResponse run(String str, String[] strArr) {
            return run(str, strArr, null);
        }

        /* JADX WARN: Removed duplicated region for block: B:12:0x00d1 A[Catch: Exception -> 0x00ef, TryCatch #0 {Exception -> 0x00ef, blocks: (B:3:0x000c, B:5:0x0054, B:6:0x0061, B:25:0x0086, B:27:0x009c, B:12:0x00d1, B:9:0x0090, B:21:0x00e2, B:23:0x00eb), top: B:2:0x000c, inners: #1 }] */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public com.marklogic.hub.legacy.flow.RunFlowResponse run(java.lang.String r7, java.lang.String[] r8, java.util.Map<java.lang.String, java.lang.Object> r9) {
            /*
                Method dump skipped, instructions count: 255
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: com.marklogic.hub.legacy.flow.impl.LegacyFlowRunnerImpl.FlowResource.run(java.lang.String, java.lang.String[], java.util.Map):com.marklogic.hub.legacy.flow.RunFlowResponse");
        }
    }

    public LegacyFlowRunnerImpl(HubConfig hubConfig) {
        this.hubConfig = hubConfig;
        this.stagingClient = hubConfig.newStagingClient();
        this.destinationDatabase = hubConfig.getDbName(DatabaseKind.FINAL);
    }

    protected LegacyFlowRunnerImpl() {
    }

    @Override // com.marklogic.hub.legacy.flow.LegacyFlowRunner
    public LegacyFlowRunner withFlow(LegacyFlow legacyFlow) {
        this.flow = legacyFlow;
        return this;
    }

    @Override // com.marklogic.hub.legacy.flow.LegacyFlowRunner
    public LegacyFlowRunner withBatchSize(int i) {
        this.batchSize = i;
        return this;
    }

    @Override // com.marklogic.hub.legacy.flow.LegacyFlowRunner
    public LegacyFlowRunner withThreadCount(int i) {
        this.threadCount = i;
        return this;
    }

    @Override // com.marklogic.hub.legacy.flow.LegacyFlowRunner
    public LegacyFlowRunner withSourceClient(DatabaseClient databaseClient) {
        this.stagingClient = databaseClient;
        return this;
    }

    @Override // com.marklogic.hub.legacy.flow.LegacyFlowRunner
    public LegacyFlowRunner withDestinationDatabase(String str) {
        this.destinationDatabase = str;
        return this;
    }

    @Override // com.marklogic.hub.legacy.flow.LegacyFlowRunner
    public LegacyFlowRunner withStopOnFailure(boolean z) {
        this.stopOnFailure = z;
        return this;
    }

    @Override // com.marklogic.hub.legacy.flow.LegacyFlowRunner
    public LegacyFlowRunner withOptions(Map<String, Object> map) {
        this.options = map;
        return this;
    }

    @Override // com.marklogic.hub.legacy.flow.LegacyFlowRunner
    public LegacyFlowRunner onItemComplete(LegacyFlowItemCompleteListener legacyFlowItemCompleteListener) {
        this.flowItemCompleteListeners.add(legacyFlowItemCompleteListener);
        return this;
    }

    @Override // com.marklogic.hub.legacy.flow.LegacyFlowRunner
    public LegacyFlowRunner onItemFailed(LegacyFlowItemFailureListener legacyFlowItemFailureListener) {
        this.flowItemFailureListeners.add(legacyFlowItemFailureListener);
        return this;
    }

    @Override // com.marklogic.hub.legacy.flow.LegacyFlowRunner
    public LegacyFlowRunner onStatusChanged(LegacyFlowStatusListener legacyFlowStatusListener) {
        this.flowStatusListeners.add(legacyFlowStatusListener);
        return this;
    }

    @Override // com.marklogic.hub.legacy.flow.LegacyFlowRunner
    public LegacyFlowRunner onFinished(LegacyFlowFinishedListener legacyFlowFinishedListener) {
        this.flowFinishedListeners.add(legacyFlowFinishedListener);
        return this;
    }

    @Override // com.marklogic.hub.legacy.flow.LegacyFlowRunner
    public void awaitCompletion() {
        try {
            awaitCompletion(Long.MAX_VALUE, TimeUnit.DAYS);
        } catch (InterruptedException e) {
        }
    }

    @Override // com.marklogic.hub.legacy.flow.LegacyFlowRunner
    public void awaitCompletion(long j, TimeUnit timeUnit) throws InterruptedException {
        if (this.runningThread != null) {
            this.runningThread.join(timeUnit.convert(j, TimeUnit.MILLISECONDS));
        }
    }

    @Override // com.marklogic.hub.legacy.flow.LegacyFlowRunner
    public JobTicket run() {
        String uuid = UUID.randomUUID().toString();
        LegacyJobManager create = LegacyJobManager.create(this.hubConfig.newJobDbClient());
        Job withJobId = Job.withFlow(this.flow).withJobId(uuid);
        create.saveJob(withJobId);
        LegacyCollector collector = this.flow.getCollector();
        collector.setHubConfig(this.hubConfig);
        collector.setClient(this.stagingClient);
        AtomicLong atomicLong = new AtomicLong(0L);
        AtomicLong atomicLong2 = new AtomicLong(0L);
        AtomicLong atomicLong3 = new AtomicLong(0L);
        AtomicLong atomicLong4 = new AtomicLong(0L);
        if (this.options == null) {
            this.options = new HashMap();
        }
        this.options.put("entity", this.flow.getEntityName());
        this.options.put("flow", this.flow.getName());
        this.options.put("flowType", this.flow.getType().toString());
        this.flowStatusListeners.forEach(legacyFlowStatusListener -> {
            legacyFlowStatusListener.onStatusChange(uuid, 0, "running collector");
        });
        create.saveJob(withJobId.withStatus(JobStatus.RUNNING_COLLECTOR));
        try {
            DiskQueue<String> run = collector.run(uuid, this.flow.getEntityName(), this.flow.getName(), this.threadCount, this.options);
            this.flowStatusListeners.forEach(legacyFlowStatusListener2 -> {
                legacyFlowStatusListener2.onStatusChange(uuid, 0, "starting harmonization");
            });
            Vector vector = new Vector();
            DataMovementManager newDataMovementManager = this.stagingClient.newDataMovementManager();
            int size = run.size();
            double ceil = Math.ceil(run.size() / this.batchSize);
            HashMap hashMap = new HashMap();
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            QueryBatcher onQueryFailure = newDataMovementManager.newQueryBatcher(run.iterator()).withBatchSize(this.batchSize).withThreadCount(this.threadCount).withJobId(uuid).onUrisReady(queryBatch -> {
                FlowResource flowResource;
                JobTicket jobTicket;
                try {
                    if (concurrentHashMap.containsKey(queryBatch.getClient())) {
                        flowResource = (FlowResource) concurrentHashMap.get(queryBatch.getClient());
                    } else {
                        flowResource = new FlowResource(queryBatch.getClient(), this.destinationDatabase, this.flow);
                        concurrentHashMap.put(queryBatch.getClient(), flowResource);
                    }
                    RunFlowResponse run2 = flowResource.run(uuid, (String[]) queryBatch.getItems(), this.options);
                    atomicLong2.addAndGet(run2.errorCount);
                    atomicLong.addAndGet(run2.totalCount - run2.errorCount);
                    if (run2.errors != null && vector.size() < MAX_ERROR_MESSAGES) {
                        vector.addAll((Collection) run2.errors.stream().map(jsonNode -> {
                            return jsonToString(jsonNode);
                        }).collect(Collectors.toList()));
                    }
                    if (run2.errorCount < run2.totalCount) {
                        atomicLong3.addAndGet(1L);
                    } else {
                        atomicLong4.addAndGet(1L);
                    }
                    int i = (int) ((atomicLong3.get() / ceil) * 100.0d);
                    if (i != this.previousPercentComplete && i % 5 == 0) {
                        this.previousPercentComplete = i;
                        this.flowStatusListeners.forEach(legacyFlowStatusListener3 -> {
                            legacyFlowStatusListener3.onStatusChange(uuid, i, "");
                        });
                    }
                    if (this.flowItemCompleteListeners.size() > 0) {
                        run2.completedItems.forEach(str -> {
                            this.flowItemCompleteListeners.forEach(legacyFlowItemCompleteListener -> {
                                legacyFlowItemCompleteListener.processCompletion(uuid, str);
                            });
                        });
                    }
                    if (this.flowItemFailureListeners.size() > 0) {
                        run2.failedItems.forEach(str2 -> {
                            this.flowItemFailureListeners.forEach(legacyFlowItemFailureListener -> {
                                legacyFlowItemFailureListener.processFailure(uuid, str2);
                            });
                        });
                    }
                    if (this.stopOnFailure && run2.errorCount > 0 && (jobTicket = (JobTicket) hashMap.get("jobTicket")) != null) {
                        newDataMovementManager.stopJob(jobTicket);
                    }
                } catch (Exception e) {
                    if (vector.size() < MAX_ERROR_MESSAGES) {
                        vector.add(e.toString());
                    }
                }
            }).onQueryFailure(queryBatchException -> {
                atomicLong4.addAndGet(1L);
                atomicLong2.addAndGet(this.batchSize);
            });
            JobTicket startJob = newDataMovementManager.startJob(onQueryFailure);
            hashMap.put("jobTicket", startJob);
            create.saveJob(withJobId.withStatus(JobStatus.RUNNING_HARMONIZE));
            this.runningThread = new Thread(() -> {
                onQueryFailure.awaitCompletion();
                this.flowStatusListeners.forEach(legacyFlowStatusListener3 -> {
                    legacyFlowStatusListener3.onStatusChange(uuid, 100, "");
                });
                this.flowFinishedListeners.forEach((v0) -> {
                    v0.onFlowFinished();
                });
                newDataMovementManager.stopJob(onQueryFailure);
                withJobId.setCounts(atomicLong.get(), atomicLong2.get(), atomicLong3.get(), atomicLong4.get()).withStatus((atomicLong2.get() <= 0 || !this.stopOnFailure) ? atomicLong2.get() + atomicLong.get() != ((long) size) ? JobStatus.CANCELED : (atomicLong2.get() <= 0 || atomicLong.get() <= 0) ? ((atomicLong2.get() != 0 || atomicLong.get() <= 0) && size != 0) ? JobStatus.FAILED : JobStatus.FINISHED : JobStatus.FINISHED_WITH_ERRORS : JobStatus.STOP_ON_ERROR).withEndTime(new Date());
                if (vector.size() > 0) {
                    withJobId.withJobOutput(vector);
                }
                create.saveJob(withJobId);
            });
            this.runningThread.start();
            return startJob;
        } catch (Exception e) {
            withJobId.setCounts(0L, 0L, 0L, 0L).withStatus(JobStatus.FAILED).withEndTime(new Date());
            StringWriter stringWriter = new StringWriter();
            e.printStackTrace(new PrintWriter(stringWriter));
            withJobId.withJobOutput(stringWriter.toString());
            create.saveJob(withJobId);
            return new JobTicketImpl(uuid, JobTicket.JobType.QUERY_BATCHER);
        }
    }

    private String jsonToString(JsonNode jsonNode) {
        try {
            return new ObjectMapper().writeValueAsString(jsonNode);
        } catch (JsonProcessingException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    protected RunFlowResponse handleFlowRunnerException(Exception exc) {
        ObjectMapper objectMapper = new ObjectMapper();
        if (!(exc instanceof FailedRequestException) || !StringUtils.containsIgnoreCase(((FailedRequestException) exc).getFailedRequest().getStatus(), "Plugin error")) {
            throw new RuntimeException(exc);
        }
        try {
            return (RunFlowResponse) objectMapper.readValue(((FailedRequestException) exc).getFailedRequest().getMessage(), RunFlowResponse.class);
        } catch (IOException e) {
            throw new RuntimeException("Unexpected IO error while parsing exception from running flow; original exception: " + exc.getMessage());
        }
    }
}
