package com.spotify.styx;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CaseFormat;
import com.spotify.styx.model.Backfill;
import com.spotify.styx.model.Workflow;
import com.spotify.styx.monitoring.Stats;
import com.spotify.styx.state.StateManager;
import com.spotify.styx.state.Trigger;
import com.spotify.styx.storage.Storage;
import com.spotify.styx.storage.StorageTransaction;
import com.spotify.styx.util.AlreadyInitializedException;
import com.spotify.styx.util.ExceptionUtil;
import com.spotify.styx.util.GuardedRunnable;
import com.spotify.styx.util.Time;
import com.spotify.styx.util.TimeUtil;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.Temporal;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spotify/styx/BackfillTriggerManager.class */
class BackfillTriggerManager {
    private static final Logger LOG = LoggerFactory.getLogger(BackfillTriggerManager.class);
    private static final String TICK_TYPE = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, BackfillTriggerManager.class.getSimpleName());
    private static Consumer<List<Backfill>> DEFAULT_SHUFFLER = Collections::shuffle;
    private final TriggerListener triggerListener;
    private final Storage storage;
    private final StateManager stateManager;
    private final Stats stats;
    private final Time time;
    private final Consumer<List<Backfill>> shuffler;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BackfillTriggerManager(StateManager stateManager, Storage storage, TriggerListener triggerListener, Stats stats, Time time) {
        this(stateManager, storage, triggerListener, stats, time, DEFAULT_SHUFFLER);
    }

    @VisibleForTesting
    BackfillTriggerManager(StateManager stateManager, Storage storage, TriggerListener triggerListener, Stats stats, Time time, Consumer<List<Backfill>> consumer) {
        this.stateManager = (StateManager) Objects.requireNonNull(stateManager);
        this.storage = (Storage) Objects.requireNonNull(storage);
        this.triggerListener = (TriggerListener) Objects.requireNonNull(triggerListener);
        this.stats = (Stats) Objects.requireNonNull(stats);
        this.time = (Time) Objects.requireNonNull(time);
        this.shuffler = (Consumer) Objects.requireNonNull(consumer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void tick() {
        Instant instant = (Instant) this.time.get();
        try {
            List<Backfill> backfills = this.storage.backfills(false);
            this.shuffler.accept(backfills);
            backfills.forEach(backfill -> {
                GuardedRunnable.guard(() -> {
                    triggerAndProgress(backfill);
                }).run();
            });
            this.stats.recordTickDuration(TICK_TYPE, instant.until((Temporal) this.time.get(), ChronoUnit.MILLIS));
        } catch (IOException e) {
            LOG.warn("Failed to get backfills", e);
        }
    }

    private void triggerAndProgress(Backfill backfill) {
        try {
            Optional workflow = this.storage.workflow(backfill.workflowId());
            if (!workflow.isPresent()) {
                LOG.debug("workflow not found for backfill {}, halt it.", backfill);
                storeBackfill(backfill.builder().halted(true).build());
                return;
            }
            Workflow workflow2 = (Workflow) workflow.get();
            int concurrency = backfill.concurrency() - this.stateManager.getActiveStatesByTriggerId(backfill.id()).size();
            if (concurrency < 1) {
                LOG.debug("No capacity left for backfill {}", backfill);
                return;
            }
            Instant nextTrigger = backfill.nextTrigger();
            do {
                try {
                } catch (IOException e) {
                    LOG.debug("Failure while trying to progress backfill {}", backfill, e);
                    return;
                }
            } while (((Boolean) this.storage.runInTransaction(storageTransaction -> {
                return Boolean.valueOf(triggerNextPartitionAndProgress(storageTransaction, backfill.id(), workflow2, nextTrigger, concurrency, backfill.reverse()));
            })).booleanValue());
        } catch (IOException e2) {
            LOG.warn("Failed to read workflow {}", backfill.workflowId(), e2);
        }
    }

    @VisibleForTesting
    boolean triggerNextPartitionAndProgress(StorageTransaction storageTransaction, String str, Workflow workflow, Instant instant, int i, boolean z) {
        Backfill backfill = (Backfill) storageTransaction.backfill(str).orElseThrow(() -> {
            return new RuntimeException("Error while fetching backfill " + str);
        });
        if (backfill.halted()) {
            LOG.debug("Backfill {} halted", backfill);
            return false;
        }
        Instant nextTrigger = backfill.nextTrigger();
        if (capacityReached(backfill, instant, nextTrigger, i, z)) {
            LOG.debug("Capacity reached for backfill {}", backfill);
            return false;
        }
        if (isAllTriggered(backfill, nextTrigger)) {
            LOG.debug("Backfill {} all triggered", backfill);
            storageTransaction.store(backfill.builder().allTriggered(true).build());
            return false;
        }
        try {
            this.triggerListener.event(workflow, Trigger.backfill(backfill.id()), nextTrigger).toCompletableFuture().get();
        } catch (Exception e) {
            if (ExceptionUtil.findCause(e, AlreadyInitializedException.class) == null) {
                LOG.debug("Failed to trigger {} for backfill {}", new Object[]{nextTrigger, backfill, e});
                throw new RuntimeException(e);
            }
            LOG.debug("{} already triggered for backfill {}", new Object[]{nextTrigger, backfill, e});
        }
        Instant nextPartition = getNextPartition(backfill, nextTrigger, z);
        storageTransaction.store(backfill.builder().nextTrigger(nextPartition).build());
        if (!isAllTriggered(backfill, nextPartition)) {
            return true;
        }
        storageTransaction.store(backfill.builder().nextTrigger(backfill.reverse() ? TimeUtil.previousInstant(backfill.start(), backfill.schedule()) : backfill.end()).allTriggered(true).build());
        LOG.debug("Backfill {} all triggered", backfill);
        return false;
    }

    private boolean isAllTriggered(Backfill backfill, Instant instant) {
        return instant.equals(exclusiveEndTrigger(backfill));
    }

    private Instant exclusiveEndTrigger(Backfill backfill) {
        return backfill.reverse() ? TimeUtil.previousInstant(backfill.start(), backfill.schedule()) : backfill.end();
    }

    private void storeBackfill(Backfill backfill) {
        try {
            this.storage.storeBackfill(backfill);
        } catch (IOException e) {
            LOG.warn("Failed to store updated backfill {}", backfill.id(), e);
        }
    }

    private static Instant getNextPartition(Backfill backfill, Instant instant, boolean z) {
        return z ? TimeUtil.previousInstant(instant, backfill.schedule()) : TimeUtil.nextInstant(instant, backfill.schedule());
    }

    private static boolean capacityReached(Backfill backfill, Instant instant, Instant instant2, int i, boolean z) {
        return z ? TimeUtil.instantsInReversedRange(instant, instant2, backfill.schedule()).size() >= i : TimeUtil.instantsInRange(instant, instant2, backfill.schedule()).size() >= i;
    }
}
