package uk.gov.dstl.baleen.services;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.annot8.api.components.Resource;
import io.annot8.api.pipelines.ErrorConfiguration;
import io.annot8.api.pipelines.PipelineDescriptor;
import io.annot8.common.components.logging.Logging;
import io.annot8.common.components.metering.Metering;
import io.annot8.implementations.pipeline.InMemoryPipelineRunner;
import io.annot8.implementations.reference.factories.DefaultItemFactory;
import io.annot8.implementations.support.context.SimpleContext;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import uk.gov.dstl.annot8.baleen.MutablePipelineDescriptor;
import uk.gov.dstl.annot8.baleen.RestApi;
import uk.gov.dstl.annot8.baleen.RestApiQueue;
import uk.gov.dstl.annot8.baleen.SubmittedData;
import uk.gov.dstl.baleen.data.PipelineHolder;
import uk.gov.dstl.baleen.data.PipelineMetadata;
import uk.gov.dstl.baleen.exceptions.AlreadyExistsException;
import uk.gov.dstl.baleen.exceptions.BadRequestException;
import uk.gov.dstl.baleen.exceptions.PipelineNotFoundException;
import uk.gov.dstl.baleen.logging.BaleenLoggerFactory;

@Service
/* loaded from: input_file:uk/gov/dstl/baleen/services/PipelineService.class */
public class PipelineService {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipelineService.class);

    @Value("${baleen.persistence}")
    private File persistenceFolder;

    @Value("${baleen.persistence}/${baleen.stopped}")
    private File stoppedState;

    @Value("${baleen.logging.max}")
    private Integer loggingMax;

    @Value("${baleen.pipeline.delay}")
    private Long pipelineDelay;

    @Autowired
    private ObjectMapper objectMapper;

    @Autowired
    private Annot8ComponentService annot8ComponentService;
    private WatchService watchService;
    private final Map<String, PipelineHolder> pipelines = new HashMap();
    private final Map<String, RestApiQueue> queues = new HashMap();
    private final Map<String, File> persistedPipelines = new HashMap();

    public PipelineService() {
        this.watchService = null;
        try {
            this.watchService = FileSystems.getDefault().newWatchService();
        } catch (IOException e) {
            LOGGER.warn("Unable to create WatchService - files added/removed to the persistence folder will not be detected", e);
        }
    }

    @PostConstruct
    public void startPersistedPipelines() {
        if (!this.persistenceFolder.exists()) {
            LOGGER.info("Creating persistence folder {}", this.persistenceFolder);
            if (!this.persistenceFolder.mkdirs()) {
                LOGGER.warn("Unable to create persistence folder {}", this.persistenceFolder);
                return;
            }
        }
        if (!this.persistenceFolder.isDirectory()) {
            LOGGER.warn("Persistence folder {} is not a directory", this.persistenceFolder);
            return;
        }
        if (!this.persistenceFolder.canRead()) {
            LOGGER.warn("Can not read from persistence folder {}", this.persistenceFolder);
            return;
        }
        if (this.watchService != null) {
            try {
                this.persistenceFolder.toPath().register(this.watchService, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE);
            } catch (IOException e) {
                LOGGER.warn("Unable to create WatchKey - files added/removed to the persistence folder will not be detected", e);
            }
        }
        if (!this.persistenceFolder.canWrite()) {
            LOGGER.warn("Can not write to persistence folder {} - existing pipelines will be created but new pipelines will not be persisted", this.persistenceFolder);
        }
        File[] listFiles = this.persistenceFolder.listFiles((file, str) -> {
            return str.toLowerCase().endsWith(".json");
        });
        if (listFiles == null) {
            LOGGER.warn("Can not retrieve JSON files from persistence folder {}", this.persistenceFolder);
            return;
        }
        for (File file2 : listFiles) {
            LOGGER.info("Recreating persisted pipeline from file {}", file2);
            createPipelineFromFile(file2);
        }
    }

    private boolean createPipelineFromFile(File file) {
        try {
            FileReader fileReader = new FileReader(file);
            try {
                PipelineDescriptor pipelineDescriptor = (PipelineDescriptor) this.objectMapper.readValue(fileReader, MutablePipelineDescriptor.class);
                fileReader.close();
                try {
                    createPipeline(pipelineDescriptor);
                    this.persistedPipelines.put(pipelineDescriptor.getName(), file.getCanonicalFile());
                    return true;
                } catch (Exception e) {
                    LOGGER.error("Unable to create pipeline {} from file {}", new Object[]{pipelineDescriptor.getName(), file, e});
                    return false;
                }
            } finally {
            }
        } catch (Exception e2) {
            LOGGER.error("Unable to parse file {}", file, e2);
            return false;
        }
    }

    @Scheduled(fixedDelay = 5000)
    public void detectChanges() {
        if (this.watchService == null) {
            return;
        }
        LOGGER.debug("Checking persistence folder for changes");
        while (true) {
            WatchKey poll = this.watchService.poll();
            if (poll == null) {
                return;
            }
            for (WatchEvent<?> watchEvent : poll.pollEvents()) {
                Path path = (Path) watchEvent.context();
                if (path.toString().toLowerCase().endsWith(".json")) {
                    LOGGER.info("{} event detected on path {}", watchEvent.kind(), path);
                    File file = new File(this.persistenceFolder, path.getFileName().toString());
                    if (StandardWatchEventKinds.ENTRY_MODIFY.equals(watchEvent.kind())) {
                        String pipelineName = getPipelineName(file);
                        if (pipelineName != null && this.pipelines.containsKey(pipelineName)) {
                            LOGGER.info("Stopping existing pipeline {} for modified file {}", pipelineName, file.getName());
                            stopPipeline(pipelineName);
                        }
                        LOGGER.info("Creating pipeline for file {}", file.getName());
                        createPipelineFromFile(file);
                    } else if (StandardWatchEventKinds.ENTRY_DELETE.equals(watchEvent.kind())) {
                        String pipelineName2 = getPipelineName(file);
                        if (pipelineName2 == null) {
                            LOGGER.info("Could not determine pipeline name for file {}", file.getName());
                        } else {
                            LOGGER.info("Stopping pipeline {} as persisted file {} was deleted", pipelineName2, file.getName());
                            this.persistedPipelines.remove(pipelineName2);
                            stopPipeline(pipelineName2);
                        }
                    }
                }
            }
            poll.reset();
        }
    }

    private String getPipelineName(File file) {
        for (Map.Entry<String, File> entry : this.persistedPipelines.entrySet()) {
            if (entry.getValue().getName().equals(file.getName())) {
                return entry.getKey();
            }
        }
        return null;
    }

    public void createPipeline(PipelineDescriptor pipelineDescriptor) {
        if (this.pipelines.containsKey(pipelineDescriptor.getName())) {
            throw new AlreadyExistsException("Pipeline '" + pipelineDescriptor.getName() + "' already exists");
        }
        LOGGER.info("Creating pipeline {}", pipelineDescriptor.getName());
        this.pipelines.put(pipelineDescriptor.getName(), new PipelineHolder(pipelineDescriptor, this.loggingMax.intValue()));
        if (getPipelineState().contains(pipelineDescriptor.getName())) {
            LOGGER.info("Pipeline {} was previously in stopped state, and will not be started", pipelineDescriptor.getName());
        } else {
            startPipeline(pipelineDescriptor.getName());
        }
    }

    public Set<String> getPipelineNames() {
        return new TreeSet(this.pipelines.keySet());
    }

    public List<PipelineMetadata> getPipelinesMetadata() {
        return (List) this.pipelines.values().stream().map(PipelineMetadata::new).sorted(Comparator.comparing((v0) -> {
            return v0.getName();
        })).collect(Collectors.toList());
    }

    public PipelineHolder getPipeline(String str) {
        if (this.pipelines.containsKey(str)) {
            return this.pipelines.get(str);
        }
        throw new PipelineNotFoundException();
    }

    public void startPipeline(String str) {
        SimpleContext simpleContext;
        if (!this.pipelines.containsKey(str)) {
            throw new PipelineNotFoundException();
        }
        PipelineHolder pipelineHolder = this.pipelines.get(str);
        if (pipelineHolder.isRunning()) {
            return;
        }
        LOGGER.info("Starting pipeline {}", str);
        PipelineDescriptor descriptor = pipelineHolder.getDescriptor();
        LOGGER.debug("Creating resources and context for pipeline {}", descriptor.getName());
        DefaultItemFactory defaultItemFactory = new DefaultItemFactory(this.annot8ComponentService.getContentBuilderFactoryRegistry());
        Resource useILoggerFactory = Logging.useILoggerFactory(new BaleenLoggerFactory(pipelineHolder.getLogEntries()));
        Resource useMeterRegistry = Metering.useMeterRegistry(pipelineHolder.getMeterRegistry(), (String) null);
        if (descriptor.getSources().stream().anyMatch(sourceDescriptor -> {
            return sourceDescriptor instanceof RestApi;
        })) {
            RestApiQueue restApiQueue = new RestApiQueue();
            this.queues.put(descriptor.getName(), restApiQueue);
            simpleContext = new SimpleContext(new Resource[]{useILoggerFactory, useMeterRegistry, restApiQueue});
        } else {
            simpleContext = new SimpleContext(new Resource[]{useILoggerFactory, useMeterRegistry});
        }
        LOGGER.debug("Determining error configuration for pipeline {}", descriptor.getName());
        ErrorConfiguration errorConfiguration = descriptor.getErrorConfiguration();
        if (errorConfiguration == null) {
            errorConfiguration = new ErrorConfiguration();
        }
        LOGGER.info("Error configuration for pipeline {} on source errors is {}", descriptor.getName(), errorConfiguration.getOnSourceError());
        LOGGER.info("Error configuration for pipeline {} on processing errors is {}", descriptor.getName(), errorConfiguration.getOnProcessorError());
        LOGGER.info("Error configuration for pipeline {} on item errors is {}", descriptor.getName(), errorConfiguration.getOnItemError());
        LOGGER.debug("Creating runner for pipeline {}", descriptor.getName());
        InMemoryPipelineRunner build = new InMemoryPipelineRunner.Builder().withPipelineDescriptor(descriptor).withItemFactory(defaultItemFactory).withContext(simpleContext).withDelay(this.pipelineDelay.longValue()).withErrorConfiguration(errorConfiguration).build();
        pipelineHolder.setPipelineRunner(build);
        Thread thread = new Thread((Runnable) build);
        thread.start();
        LOGGER.info("Pipeline {} started on thread {}", descriptor.getName(), thread.getName());
        updatePipelineState();
    }

    public void stopPipeline(String str) {
        if (!this.pipelines.containsKey(str)) {
            throw new PipelineNotFoundException();
        }
        PipelineHolder pipelineHolder = this.pipelines.get(str);
        if (pipelineHolder.isRunning()) {
            LOGGER.info("Stopping pipeline {}", str);
            pipelineHolder.getPipelineRunner().stop();
            pipelineHolder.setPipelineRunner(null);
            this.queues.remove(str);
            pipelineHolder.getLogEntries().clear();
            pipelineHolder.getMeterRegistry().clear();
            LOGGER.info("Pipeline {} stopped", str);
            updatePipelineState();
        }
    }

    public void deletePipeline(String str) {
        LOGGER.info("Deleting pipeline {}", str);
        stopPipeline(str);
        this.pipelines.remove(str);
        if (this.persistedPipelines.containsKey(str) && !this.persistedPipelines.remove(str).delete()) {
            LOGGER.warn("Failed to delete persisted file for {} - file may have already been deleted", str);
        }
        LOGGER.info("Pipeline {} deleted", str);
        updatePipelineState();
    }

    public boolean pipelineExists(String str) {
        return this.pipelines.containsKey(str);
    }

    public boolean save(PipelineDescriptor pipelineDescriptor) {
        if (!this.persistenceFolder.exists()) {
            LOGGER.info("Creating persistence folder {}", this.persistenceFolder);
            if (!this.persistenceFolder.mkdirs()) {
                LOGGER.error("Unable to create persistence folder {} - pipeline {} will not be persisted", this.persistenceFolder, pipelineDescriptor.getName());
                return false;
            }
        }
        if (!this.persistenceFolder.canWrite()) {
            LOGGER.error("Can not write to persistence folder {} - pipeline {} will not be persisted", this.persistenceFolder, pipelineDescriptor.getName());
            return false;
        }
        try {
            String writeValueAsString = this.objectMapper.writeValueAsString(pipelineDescriptor);
            LOGGER.info("Persisting pipeline {}", pipelineDescriptor.getName());
            try {
                File file = new File(this.persistenceFolder, UUID.randomUUID().toString() + ".json");
                Files.writeString(file.toPath(), writeValueAsString, StandardCharsets.UTF_8, new OpenOption[0]);
                LOGGER.info("Pipeline {} persisted to {}", pipelineDescriptor.getName(), file.getPath());
                this.persistedPipelines.put(pipelineDescriptor.getName(), file.getCanonicalFile());
                return true;
            } catch (IOException e) {
                LOGGER.error("Can not persist pipeline {}", pipelineDescriptor.getName());
                return false;
            }
        } catch (JsonProcessingException e2) {
            LOGGER.error("Could not serialize pipeline", e2);
            return false;
        }
    }

    public void submitData(String str, SubmittedData submittedData) {
        if (!this.pipelines.containsKey(str)) {
            throw new PipelineNotFoundException();
        }
        if (!this.queues.containsKey(str)) {
            throw new BadRequestException("Pipeline is not configured to support REST API");
        }
        LOGGER.debug("Data received via REST API for pipeline {}", str);
        this.queues.get(str).addToQueue(submittedData);
    }

    private List<String> getPipelineState() {
        try {
            return Files.readAllLines(this.stoppedState.toPath());
        } catch (FileNotFoundException e) {
            return Collections.emptyList();
        } catch (IOException e2) {
            LOGGER.error("Could not read pipeline state from disk", e2);
            return Collections.emptyList();
        }
    }

    private void updatePipelineState() {
        try {
            Files.write(this.stoppedState.toPath(), (List) this.pipelines.entrySet().stream().filter(entry -> {
                return !((PipelineHolder) entry.getValue()).isRunning();
            }).map((v0) -> {
                return v0.getKey();
            }).collect(Collectors.toList()), StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE);
        } catch (IOException e) {
            LOGGER.error("Unable to persist pipeline state to disk", e);
        }
    }
}
