package com.spotify.styx;

import com.google.common.base.CaseFormat;
import com.google.common.base.Throwables;
import com.spotify.styx.model.Workflow;
import com.spotify.styx.model.WorkflowId;
import com.spotify.styx.model.WorkflowInstance;
import com.spotify.styx.monitoring.Stats;
import com.spotify.styx.state.Trigger;
import com.spotify.styx.storage.Storage;
import com.spotify.styx.util.AlreadyInitializedException;
import com.spotify.styx.util.ExceptionUtil;
import com.spotify.styx.util.GuardedRunnable;
import com.spotify.styx.util.ParameterUtil;
import com.spotify.styx.util.Time;
import com.spotify.styx.util.TimeUtil;
import com.spotify.styx.util.TriggerInstantSpec;
import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.Temporal;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spotify/styx/TriggerManager.class */
class TriggerManager implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(TriggerManager.class);
    private static final String TICK_TYPE = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, TriggerManager.class.getSimpleName());
    private static final int TRIGGER_CONCURRENCY = 32;
    private final TriggerListener triggerListener;
    private final Time time;
    private final Storage storage;
    private final Stats stats;
    private final ForkJoinPool forkJoinPool = new ForkJoinPool(32);

    /* JADX INFO: Access modifiers changed from: package-private */
    public TriggerManager(TriggerListener triggerListener, Time time, Storage storage, Stats stats) {
        this.triggerListener = (TriggerListener) Objects.requireNonNull(triggerListener);
        this.time = (Time) Objects.requireNonNull(time);
        this.storage = (Storage) Objects.requireNonNull(storage);
        this.stats = (Stats) Objects.requireNonNull(stats);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void tick() {
        Instant instant = (Instant) this.time.get();
        try {
            if (!this.storage.config().globalEnabled()) {
                LOG.info("Triggering has been disabled globally.");
                return;
            }
            try {
                Map workflowsWithNextNaturalTrigger = this.storage.workflowsWithNextNaturalTrigger();
                Set enabled = this.storage.enabled();
                Instant instant2 = (Instant) this.time.get();
                ((List) workflowsWithNextNaturalTrigger.entrySet().stream().filter(entry -> {
                    return instant2.isAfter(((TriggerInstantSpec) entry.getValue()).offsetInstant());
                }).map(entry2 -> {
                    return this.forkJoinPool.submit(tryTriggering((Workflow) entry2.getKey(), (TriggerInstantSpec) entry2.getValue(), enabled));
                }).collect(Collectors.toList())).forEach((v0) -> {
                    v0.join();
                });
                this.stats.recordTickDuration(TICK_TYPE, instant.until((Temporal) this.time.get(), ChronoUnit.MILLIS));
            } catch (IOException e) {
                LOG.warn("Couldn't fetch workflows to trigger, skipping this run", e);
            }
        } catch (IOException e2) {
            LOG.warn("Couldn't fetch global enabled status, skipping this run", e2);
        }
    }

    private Runnable tryTriggering(Workflow workflow, TriggerInstantSpec triggerInstantSpec, Set<WorkflowId> set) {
        return GuardedRunnable.guard(() -> {
            if (set.contains(workflow.id())) {
                try {
                    this.triggerListener.event(workflow, Trigger.natural(), triggerInstantSpec.instant()).toCompletableFuture().get();
                } catch (Exception e) {
                    WorkflowInstance create = WorkflowInstance.create(workflow.id(), ParameterUtil.toParameter(workflow.configuration().schedule(), triggerInstantSpec.instant()));
                    if (ExceptionUtil.findCause(e, AlreadyInitializedException.class) == null) {
                        LOG.debug("Failed to trigger {}", create, e);
                        return;
                    }
                    LOG.debug("{} already triggered", create, e);
                }
                this.stats.recordNaturalTrigger();
            }
            Instant nextInstant = TimeUtil.nextInstant(triggerInstantSpec.instant(), workflow.configuration().schedule());
            try {
                this.storage.updateNextNaturalTrigger(workflow.id(), TriggerInstantSpec.create(nextInstant, workflow.configuration().addOffset(nextInstant)));
            } catch (IOException e2) {
                LOG.error("Sent trigger for workflow {}, but didn't succeed storing next scheduled run {}.", workflow.id(), nextInstant);
                throw Throwables.propagate(e2);
            }
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.forkJoinPool.shutdownNow();
        try {
            this.forkJoinPool.awaitTermination(30L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
