package io.annot8.implementations.pipeline;

import io.annot8.api.components.responses.ProcessorResponse;
import io.annot8.api.components.responses.SourceResponse;
import io.annot8.api.context.Context;
import io.annot8.api.data.Item;
import io.annot8.api.data.ItemFactory;
import io.annot8.api.exceptions.IncompleteException;
import io.annot8.api.pipelines.ErrorConfiguration;
import io.annot8.api.pipelines.ItemStatus;
import io.annot8.api.pipelines.NoOpItemState;
import io.annot8.api.pipelines.Pipeline;
import io.annot8.api.pipelines.PipelineDescriptor;
import io.annot8.api.pipelines.PipelineItemState;
import io.annot8.api.pipelines.PipelineRunner;
import io.annot8.common.components.logging.Logging;
import io.annot8.common.components.metering.Metering;
import io.annot8.common.components.metering.Metrics;
import io.annot8.common.components.metering.NoOpMetrics;
import io.annot8.implementations.pipeline.SimplePipeline;
import io.annot8.implementations.support.factories.QueueItemFactory;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/annot8/implementations/pipeline/InMemoryPipelineRunner.class */
public class InMemoryPipelineRunner implements PipelineRunner {
    private static final long DEFAULT_DELAY = 0;
    private final Pipeline pipeline;
    private final Logger logger;
    private final Metrics metrics;
    private final QueueItemFactory itemFactory;
    private final long delay;
    private final PipelineItemState itemState;
    private AtomicBoolean running;
    private long pipelineFinished;

    /* loaded from: input_file:io/annot8/implementations/pipeline/InMemoryPipelineRunner$Builder.class */
    public static class Builder {
        private long delay = InMemoryPipelineRunner.DEFAULT_DELAY;
        private ErrorConfiguration errorConfiguration = new ErrorConfiguration();
        private Context context = null;
        private PipelineItemState itemState = NoOpItemState.getInstance();
        private ItemFactory itemFactory;
        private PipelineDescriptor pipelineDescriptor;

        public Builder withDelay(long j) {
            this.delay = j;
            return this;
        }

        public Builder withErrorConfiguration(ErrorConfiguration errorConfiguration) {
            this.errorConfiguration = errorConfiguration;
            return this;
        }

        public Builder withContext(Context context) {
            this.context = context;
            return this;
        }

        public Builder withItemState(PipelineItemState pipelineItemState) {
            this.itemState = pipelineItemState;
            return this;
        }

        public Builder withItemFactory(ItemFactory itemFactory) {
            this.itemFactory = itemFactory;
            return this;
        }

        public Builder withPipelineDescriptor(PipelineDescriptor pipelineDescriptor) {
            this.pipelineDescriptor = pipelineDescriptor;
            return this;
        }

        public InMemoryPipelineRunner build() {
            if (this.itemFactory == null) {
                throw new IncompleteException("ItemFactory must be provided");
            }
            if (this.pipelineDescriptor == null) {
                throw new IncompleteException("PipelineDescriptor must be provided");
            }
            return new InMemoryPipelineRunner(new SimplePipeline.Builder().m9from(this.pipelineDescriptor).m2withErrorConfiguration(this.errorConfiguration).m3withContext(this.context).build(), this.itemFactory, this.delay, this.itemState);
        }
    }

    public InMemoryPipelineRunner(Pipeline pipeline, ItemFactory itemFactory) {
        this(pipeline, itemFactory, DEFAULT_DELAY);
    }

    public InMemoryPipelineRunner(Pipeline pipeline, ItemFactory itemFactory, long j) {
        this(pipeline, itemFactory, j, NoOpItemState.getInstance());
    }

    public InMemoryPipelineRunner(Pipeline pipeline, ItemFactory itemFactory, long j, PipelineItemState pipelineItemState) {
        this.running = new AtomicBoolean(true);
        this.pipelineFinished = -1L;
        this.pipeline = pipeline;
        this.logger = (Logger) pipeline.getContext().getResource(Logging.class).map(logging -> {
            return logging.getLogger(InMemoryPipelineRunner.class);
        }).orElse(LoggerFactory.getLogger(InMemoryPipelineRunner.class));
        this.metrics = (Metrics) pipeline.getContext().getResource(Metering.class).map(metering -> {
            return metering.getMetrics("pipeline");
        }).orElseGet(NoOpMetrics::instance);
        this.itemFactory = new QueueItemFactory(itemFactory);
        this.itemFactory.register(item -> {
            this.logger.debug("Item {} added to queue", item.getId());
        });
        this.itemFactory.register(item2 -> {
            this.metrics.counter("itemsCreated", new String[0]).increment();
        });
        this.metrics.gauge("queueSize", this.itemFactory, (v0) -> {
            return v0.getQueueSize();
        });
        this.delay = j;
        this.itemState = pipelineItemState;
    }

    public void run() {
        this.logger.info("Pipeline {} started", this.pipeline.getName());
        this.running.set(true);
        this.logger.debug("Pipeline {} started at {}", this.pipeline.getName(), (Long) this.metrics.gauge("runTime", Long.valueOf(System.currentTimeMillis()), l -> {
            return this.running.get() ? (System.currentTimeMillis() - l.longValue()) / 1000.0d : (this.pipelineFinished - l.longValue()) / 1000.0d;
        }));
        while (this.running.get()) {
            SourceResponse read = this.pipeline.read(this.itemFactory);
            while (this.running.get() && !this.itemFactory.isEmpty()) {
                Optional next = this.itemFactory.next();
                if (next.isPresent()) {
                    Item item = (Item) next.get();
                    this.itemState.setItemStatus(item.getId(), ItemStatus.PROCESSING);
                    ProcessorResponse processorResponse = (ProcessorResponse) this.metrics.timer("itemProcessingTime", new String[0]).record(() -> {
                        return this.pipeline.process(item);
                    });
                    this.metrics.counter("itemsProcessed", new String[0]).increment();
                    if (processorResponse.getStatus().equals(ProcessorResponse.Status.OK)) {
                        this.itemState.setItemStatus(item.getId(), ItemStatus.PROCESSED_OK);
                        this.metrics.counter("itemsProcessed.ok", new String[0]).increment();
                    } else if (processorResponse.getStatus().equals(ProcessorResponse.Status.PROCESSOR_ERROR)) {
                        this.itemState.setItemStatus(item.getId(), ItemStatus.PROCESSED_PROCESSOR_ERROR);
                        this.metrics.counter("itemsProcessed.processorError", new String[0]).increment();
                    } else if (processorResponse.getStatus().equals(ProcessorResponse.Status.ITEM_ERROR)) {
                        this.itemState.setItemStatus(item.getId(), ItemStatus.PROCESSED_ITEM_ERROR);
                        this.metrics.counter("itemsProcessed.itemError", new String[0]).increment();
                    }
                }
            }
            if (this.itemFactory.isEmpty() && read.getStatus() == SourceResponse.Status.DONE) {
                stop();
            }
            if (this.delay > DEFAULT_DELAY) {
                try {
                    Thread.sleep(this.delay);
                } catch (InterruptedException e) {
                    this.logger.debug("Sleep interrupted - {}", e.getMessage());
                }
            }
        }
        this.pipelineFinished = System.currentTimeMillis();
        this.logger.debug("Pipeline {} finished at {}", this.pipeline.getName(), Long.valueOf(this.pipelineFinished));
        this.logger.info("Pipeline {} ran for {} seconds", this.pipeline.getName(), Double.valueOf((this.pipelineFinished - r0.longValue()) / 1000.0d));
    }

    public void stop() {
        this.logger.info("Stopping pipeline after current item/source");
        this.running.set(false);
    }

    public boolean isRunning() {
        return this.running.get();
    }
}
