package com.spotify.styx.state;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.spotify.futures.CompletableFutures;
import com.spotify.styx.MessageUtil;
import com.spotify.styx.model.Event;
import com.spotify.styx.model.SequenceEvent;
import com.spotify.styx.model.TriggerParameters;
import com.spotify.styx.model.WorkflowInstance;
import com.spotify.styx.state.RunState;
import com.spotify.styx.storage.Storage;
import com.spotify.styx.storage.StorageTransaction;
import com.spotify.styx.storage.TransactionException;
import com.spotify.styx.util.AlreadyInitializedException;
import com.spotify.styx.util.CounterCapacityException;
import com.spotify.styx.util.EventUtil;
import com.spotify.styx.util.IsClosedException;
import com.spotify.styx.util.MDCUtil;
import com.spotify.styx.util.ShardedCounter;
import com.spotify.styx.util.Time;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import javaslang.control.Try;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spotify/styx/state/PersistentStateManager.class */
public class PersistentStateManager implements StateManager {
    private static final Logger DEFAULT_LOG = LoggerFactory.getLogger(PersistentStateManager.class);
    private final Logger log;
    private static final long NO_EVENTS_PROCESSED = -1;
    private final Time time;
    private final ExecutorService executor;
    private final Storage storage;
    private final BiConsumer<SequenceEvent, RunState> eventConsumer;
    private final Executor eventConsumerExecutor;
    private final OutputHandler outputHandler;
    private final ShardedCounter shardedCounter;
    private volatile boolean running;

    public PersistentStateManager(Time time, ExecutorService executorService, Storage storage, BiConsumer<SequenceEvent, RunState> biConsumer, Executor executor, OutputHandler outputHandler, ShardedCounter shardedCounter) {
        this(time, executorService, storage, biConsumer, executor, outputHandler, shardedCounter, DEFAULT_LOG);
    }

    PersistentStateManager(Time time, ExecutorService executorService, Storage storage, BiConsumer<SequenceEvent, RunState> biConsumer, Executor executor, OutputHandler outputHandler, ShardedCounter shardedCounter, Logger logger) {
        this.running = true;
        this.time = (Time) Objects.requireNonNull(time);
        this.storage = (Storage) Objects.requireNonNull(storage);
        this.eventConsumer = (BiConsumer) Objects.requireNonNull(biConsumer);
        this.eventConsumerExecutor = (Executor) Objects.requireNonNull(executor);
        this.executor = (ExecutorService) Objects.requireNonNull(executorService);
        this.outputHandler = (OutputHandler) Objects.requireNonNull(outputHandler);
        this.shardedCounter = (ShardedCounter) Objects.requireNonNull(shardedCounter);
        this.log = (Logger) Objects.requireNonNull(logger, "logger");
    }

    @Override // com.spotify.styx.state.StateManager
    public void tick() {
        Storage storage = this.storage;
        Objects.requireNonNull(storage);
        ArrayList arrayList = new ArrayList((Collection) Try.of(storage::listActiveInstances).get());
        Collections.shuffle(arrayList);
        CompletableFutures.allAsList((List) arrayList.stream().map(workflowInstance -> {
            return CompletableFuture.runAsync(() -> {
                tickInstance(workflowInstance);
            }, this.executor);
        }).collect(Collectors.toList())).join();
    }

    private void tickInstance(WorkflowInstance workflowInstance) {
        try {
            this.storage.readActiveState(workflowInstance).ifPresent(this::tickInstance);
        } catch (Exception e) {
            this.log.error("Error ticking instance: {}", workflowInstance, e);
        }
    }

    private void tickInstance(RunState runState) {
        this.log.info("Ticking instance: {}: #{} {}", new Object[]{runState.workflowInstance(), Long.valueOf(runState.counter()), runState.state()});
        try {
            this.outputHandler.transitionInto(runState, this);
        } catch (StateTransitionConflictException e) {
            this.log.debug("State transition conflict when ticking instance: {}", runState.workflowInstance(), e);
        } catch (CounterCapacityException e2) {
            this.log.debug("Counter capacity exhausted when ticking instance: {}", runState.workflowInstance(), e2);
        } catch (IllegalArgumentException e3) {
            this.log.debug("Illegal argument when ticking instance: {}", runState.workflowInstance(), e3);
        }
    }

    @Override // com.spotify.styx.state.StateManager
    public void trigger(WorkflowInstance workflowInstance, Trigger trigger, TriggerParameters triggerParameters) throws IsClosedException {
        ensureRunning();
        this.log.debug("Trigger {}", workflowInstance);
        RunState create = RunState.create(workflowInstance, RunState.State.NEW, (Instant) this.time.get(), latestEventCounter(workflowInstance));
        Event triggerExecution = Event.triggerExecution(workflowInstance, trigger, triggerParameters);
        RunState transition = create.transition(triggerExecution, this.time);
        try {
            this.storage.runInTransactionWithRetries(storageTransaction -> {
                if (storageTransaction.workflow(workflowInstance.workflowId()).isEmpty()) {
                    throw new IllegalArgumentException("Workflow not found: " + workflowInstance.workflowId().toKey());
                }
                return storageTransaction.writeActiveState(workflowInstance, transition);
            });
            postTransition(triggerExecution, transition);
        } catch (TransactionException e) {
            if (e.isAlreadyExists()) {
                throw new AlreadyInitializedException("Workflow instance is already triggered: " + workflowInstance);
            }
            if (e.isConflict()) {
                this.log.debug("Transaction conflict when triggering workflow instance. Aborted: {}", workflowInstance);
                throw new RuntimeException((Throwable) e);
            }
            this.log.debug("Transaction failure when triggering workflow instance: {}: {}", new Object[]{workflowInstance, e.getMessage(), e});
            throw new RuntimeException((Throwable) e);
        } catch (Exception e2) {
            this.log.debug("Failure when triggering workflow instance: {}: {}", new Object[]{workflowInstance, e2.getMessage(), e2});
            Throwables.throwIfUnchecked(e2);
            throw new RuntimeException(e2);
        }
    }

    private long latestEventCounter(WorkflowInstance workflowInstance) {
        try {
            return ((Long) this.storage.getLatestStoredCounter(workflowInstance).orElse(Long.valueOf(NO_EVENTS_PROCESSED))).longValue();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void receive(Event event) throws IsClosedException {
        receive(event, Long.MAX_VALUE);
    }

    public void receive(Event event, long j) throws IsClosedException {
        ensureRunning();
        this.log.info("Received event {}", event);
        postTransition(event, transition(event, j));
    }

    private RunState transition(Event event, long j) {
        try {
            return (RunState) this.storage.runInTransactionWithRetries(storageTransaction -> {
                return transition0(storageTransaction, event, j);
            });
        } catch (Throwable th) {
            this.log.debug("Failed workflow instance transition: {}, counter={}", new Object[]{event, Long.valueOf(j), th});
            Throwables.throwIfUnchecked(th);
            throw new RuntimeException(th);
        }
    }

    private RunState transition0(StorageTransaction storageTransaction, Event event, long j) throws IOException {
        Optional readActiveState = storageTransaction.readActiveState(event.workflowInstance());
        if (readActiveState.isEmpty()) {
            String str = "Received event for unknown workflow instance: " + event;
            this.log.warn(str);
            throw new IllegalArgumentException(str);
        }
        RunState runState = (RunState) readActiveState.orElseThrow();
        verifyCounter(event, j, runState);
        this.log.info("Received event (verified) {}", event);
        RunState nextRunState = nextRunState(event, runState);
        updateResourceCounters(storageTransaction, event, runState, nextRunState);
        if (nextRunState.state().isTerminal()) {
            storageTransaction.deleteActiveState(event.workflowInstance());
        } else {
            storageTransaction.updateActiveState(event.workflowInstance(), nextRunState);
        }
        return nextRunState;
    }

    private RunState nextRunState(Event event, RunState runState) {
        try {
            return runState.transition(event, this.time);
        } catch (IllegalStateException e) {
            this.log.warn("Illegal state transition", e);
            throw e;
        }
    }

    private void updateResourceCounters(StorageTransaction storageTransaction, Event event, RunState runState, RunState runState2) throws IOException {
        if (isDequeue(event) && runState2.data().resourceIds().isPresent()) {
            tryUpdatingCounter(runState, storageTransaction, (Set) runState2.data().resourceIds().get());
        }
        if (!StateUtil.isConsumingResources(runState.state()) || StateUtil.isConsumingResources(runState2.state())) {
            return;
        }
        if (!runState2.data().resourceIds().isPresent()) {
            this.log.error("Resource ids are missing for {} when transitioning from {} to {}.", new Object[]{runState2.workflowInstance(), runState, runState2});
            return;
        }
        Iterator it = ((Set) runState2.data().resourceIds().get()).iterator();
        while (it.hasNext()) {
            storageTransaction.updateCounter(this.shardedCounter, (String) it.next(), -1);
        }
    }

    private void tryUpdatingCounter(RunState runState, StorageTransaction storageTransaction, Set<String> set) throws IOException {
        ArrayList arrayList = new ArrayList();
        for (String str : set) {
            try {
                storageTransaction.updateCounter(this.shardedCounter, str, 1);
            } catch (CounterCapacityException e) {
                arrayList.add(str);
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        throw new CounterCapacityException("Failed to update resource counter for workflow instance: " + runState.workflowInstance() + ": " + MessageUtil.emitResourceLimitReachedMessage(this, runState, arrayList).line());
    }

    private boolean isDequeue(Event event) {
        return EventUtil.name(event).equals("dequeue");
    }

    private void verifyCounter(Event event, long j, RunState runState) {
        if (j == Long.MAX_VALUE) {
            return;
        }
        long counter = runState.counter();
        if (counter > j) {
            String str = "Stale event encountered. Expected counter is " + j + " but current counter is " + j + ". Discarding event " + counter;
            this.log.debug(str);
            throw new StateTransitionConflictException(str);
        }
        if (counter < j) {
            String str2 = "Unexpected current counter is less than last observed one for " + runState;
            this.log.error(str2);
            throw new RuntimeException(str2);
        }
    }

    private void postTransition(Event event, RunState runState) {
        SequenceEvent create = SequenceEvent.create(event, runState.counter(), runState.timestamp());
        try {
            this.storage.writeEvent(create);
        } catch (IOException e) {
            this.log.warn("Error writing event {}", create, e);
        }
        try {
            this.eventConsumerExecutor.execute(MDCUtil.withMDC(() -> {
                this.eventConsumer.accept(create, runState);
            }));
        } catch (Exception e2) {
            this.log.warn("Error while consuming event {}", create, e2);
        }
        try {
            this.outputHandler.transitionInto(runState, this);
        } catch (StateTransitionConflictException e3) {
            this.log.debug("State transition conflict when invoking output handler: {}", runState.workflowInstance(), e3);
        }
    }

    @Override // com.spotify.styx.state.StateManager
    public Set<WorkflowInstance> listActiveInstances() {
        Storage storage = this.storage;
        Objects.requireNonNull(storage);
        return (Set) Try.of(storage::listActiveInstances).get();
    }

    @Override // com.spotify.styx.state.StateManager
    public Map<WorkflowInstance, RunState> getActiveStates() {
        try {
            return this.storage.readActiveStates();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // com.spotify.styx.state.StateManager
    public Map<WorkflowInstance, RunState> getActiveStatesByTriggerId(String str) {
        try {
            return this.storage.readActiveStatesByTriggerId(str);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // com.spotify.styx.state.StateManager
    public Optional<RunState> getActiveState(WorkflowInstance workflowInstance) {
        try {
            return this.storage.readActiveState(workflowInstance);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.running) {
            this.running = false;
        }
    }

    @VisibleForTesting
    void ensureRunning() throws IsClosedException {
        if (!this.running) {
            throw new IsClosedException();
        }
    }
}
