package io.annot8.implementations.pipeline;

import io.annot8.api.components.responses.SourceResponse;
import io.annot8.api.context.Context;
import io.annot8.api.data.ItemFactory;
import io.annot8.api.pipelines.Pipeline;
import io.annot8.api.pipelines.PipelineDescriptor;
import io.annot8.api.pipelines.PipelineRunner;
import io.annot8.common.components.logging.Logging;
import io.annot8.common.components.metering.Metering;
import io.annot8.common.components.metering.Metrics;
import io.annot8.common.components.metering.NoOpMetrics;
import io.annot8.implementations.pipeline.SimplePipeline;
import io.annot8.implementations.support.stores.QueueItemFactory;
import java.util.Objects;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/annot8/implementations/pipeline/InMemoryPipelineRunner.class */
public class InMemoryPipelineRunner implements PipelineRunner {
    private static final long DEFAULT_DELAY = 1000;
    private final Pipeline pipeline;
    private final Logger logger;
    private final Metrics metrics;
    private final QueueItemFactory itemFactory;
    private final long delay;
    private boolean running;

    public InMemoryPipelineRunner(Pipeline pipeline, ItemFactory itemFactory) {
        this(pipeline, itemFactory, DEFAULT_DELAY);
    }

    public InMemoryPipelineRunner(Pipeline pipeline, ItemFactory itemFactory, long j) {
        this.running = true;
        this.pipeline = pipeline;
        this.logger = (Logger) pipeline.getContext().getResource(Logging.class).map(logging -> {
            return logging.getLogger(InMemoryPipelineRunner.class);
        }).orElse(LoggerFactory.getLogger(InMemoryPipelineRunner.class));
        this.metrics = (Metrics) pipeline.getContext().getResource(Metering.class).map(metering -> {
            return metering.getMetrics("pipeline");
        }).orElseGet(NoOpMetrics::instance);
        this.itemFactory = new QueueItemFactory(itemFactory);
        this.itemFactory.register(item -> {
            this.logger.debug("Item {} added to queue", item.getId());
        });
        this.delay = j;
    }

    public InMemoryPipelineRunner(PipelineDescriptor pipelineDescriptor, ItemFactory itemFactory) {
        this(new SimplePipeline.Builder().m8from(pipelineDescriptor).build(), itemFactory, DEFAULT_DELAY);
    }

    public InMemoryPipelineRunner(PipelineDescriptor pipelineDescriptor, ItemFactory itemFactory, long j) {
        this(new SimplePipeline.Builder().m8from(pipelineDescriptor).build(), itemFactory, j);
    }

    public InMemoryPipelineRunner(PipelineDescriptor pipelineDescriptor, ItemFactory itemFactory, Context context) {
        this(new SimplePipeline.Builder().m8from(pipelineDescriptor).m2withContext(context).build(), itemFactory, DEFAULT_DELAY);
    }

    public InMemoryPipelineRunner(PipelineDescriptor pipelineDescriptor, ItemFactory itemFactory, Context context, long j) {
        this(new SimplePipeline.Builder().m8from(pipelineDescriptor).m2withContext(context).build(), itemFactory, j);
    }

    public void run() {
        this.logger.info("Pipeline {} started", this.pipeline.getName());
        this.running = true;
        Long l = (Long) this.metrics.gauge("runTime", Long.valueOf(System.currentTimeMillis()), l2 -> {
            return System.currentTimeMillis() - l2.longValue();
        });
        this.logger.debug("Pipeline {} started at {}", this.pipeline.getName(), l);
        while (this.running) {
            SourceResponse read = this.pipeline.read(this.itemFactory);
            while (this.running && !this.itemFactory.isEmpty()) {
                this.metrics.timer("itemProcessingTime", new String[0]).record(() -> {
                    Optional next = this.itemFactory.next();
                    Pipeline pipeline = this.pipeline;
                    Objects.requireNonNull(pipeline);
                    next.ifPresent(pipeline::process);
                });
                this.metrics.counter("itemsProcessed", new String[0]).increment();
            }
            if (read.getStatus() == SourceResponse.Status.DONE) {
                stop();
            }
            if (this.delay > 0) {
                try {
                    Thread.sleep(this.delay);
                } catch (InterruptedException e) {
                    this.logger.debug("Sleep interrupted - {}", e.getMessage());
                }
            }
        }
        this.logger.debug("Pipeline {} finished at {}", this.pipeline.getName(), l);
        this.logger.info("Pipeline {} ran for {} seconds", this.pipeline.getName(), Double.valueOf((System.currentTimeMillis() - l.longValue()) / 1000.0d));
    }

    public void stop() {
        this.logger.info("Stopping pipeline after current item/source");
        this.running = false;
    }
}
