package com.spotify.styx.state;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.spotify.styx.MessageUtil;
import com.spotify.styx.model.Event;
import com.spotify.styx.model.SequenceEvent;
import com.spotify.styx.model.TriggerParameters;
import com.spotify.styx.model.WorkflowInstance;
import com.spotify.styx.state.RunState;
import com.spotify.styx.storage.Storage;
import com.spotify.styx.storage.StorageTransaction;
import com.spotify.styx.storage.TransactionException;
import com.spotify.styx.util.AlreadyInitializedException;
import com.spotify.styx.util.CounterCapacityException;
import com.spotify.styx.util.EventUtil;
import com.spotify.styx.util.IsClosedException;
import com.spotify.styx.util.MDCUtil;
import com.spotify.styx.util.ShardedCounter;
import com.spotify.styx.util.Time;
import eu.javaspecialists.tjsn.concurrency.stripedexecutor.StripedExecutorService;
import java.io.IOException;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javaslang.Tuple;
import javaslang.Tuple2;
import javaslang.control.Try;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spotify/styx/state/QueuedStateManager.class */
public class QueuedStateManager implements StateManager {
    private static final Logger DEFAULT_LOG = LoggerFactory.getLogger(QueuedStateManager.class);
    private final Logger log;
    private static final long NO_EVENTS_PROCESSED = -1;
    private final LongAdder queuedEvents;
    private final Time time;
    private final StripedExecutorService eventProcessingExecutor;
    private final Storage storage;
    private final BiConsumer<SequenceEvent, RunState> eventConsumer;
    private final Executor eventConsumerExecutor;
    private final OutputHandler outputHandler;
    private final ShardedCounter shardedCounter;
    private volatile boolean running;

    public QueuedStateManager(Time time, StripedExecutorService stripedExecutorService, Storage storage, BiConsumer<SequenceEvent, RunState> biConsumer, Executor executor, OutputHandler outputHandler, ShardedCounter shardedCounter) {
        this(time, stripedExecutorService, storage, biConsumer, executor, outputHandler, shardedCounter, DEFAULT_LOG);
    }

    public QueuedStateManager(Time time, StripedExecutorService stripedExecutorService, Storage storage, BiConsumer<SequenceEvent, RunState> biConsumer, Executor executor, OutputHandler outputHandler, ShardedCounter shardedCounter, Logger logger) {
        this.queuedEvents = new LongAdder();
        this.running = true;
        this.time = (Time) Objects.requireNonNull(time);
        this.storage = (Storage) Objects.requireNonNull(storage);
        this.eventConsumer = (BiConsumer) Objects.requireNonNull(biConsumer);
        this.eventConsumerExecutor = (Executor) Objects.requireNonNull(executor);
        this.eventProcessingExecutor = (StripedExecutorService) Objects.requireNonNull(stripedExecutorService);
        this.outputHandler = (OutputHandler) Objects.requireNonNull(outputHandler);
        this.shardedCounter = (ShardedCounter) Objects.requireNonNull(shardedCounter);
        this.log = (Logger) Objects.requireNonNull(logger, "logger");
    }

    @Override // com.spotify.styx.state.StateManager
    public CompletionStage<Void> trigger(WorkflowInstance workflowInstance, Trigger trigger, TriggerParameters triggerParameters) throws IsClosedException {
        ensureRunning();
        this.log.debug("Trigger {}", workflowInstance);
        return CompletableFuture.runAsync(() -> {
            initialize(workflowInstance);
        }, MDCUtil.withMDC()).thenCompose(r9 -> {
            try {
                return receive(Event.triggerExecution(workflowInstance, trigger, triggerParameters));
            } catch (IsClosedException e) {
                this.log.warn("Failed to send 'triggerExecution' event", e);
                try {
                    this.storage.deleteActiveState(workflowInstance);
                } catch (IOException e2) {
                    this.log.warn("Failed to remove dangling NEW state for: {}", workflowInstance, e2);
                }
                throw new RuntimeException((Throwable) e);
            }
        });
    }

    @Override // com.spotify.styx.state.StateManager
    public CompletableFuture<Void> receive(Event event) throws IsClosedException {
        return receive(event, Long.MAX_VALUE);
    }

    @Override // com.spotify.styx.state.StateManager
    public CompletableFuture<Void> receive(Event event, long j) throws IsClosedException {
        ensureRunning();
        this.log.info("Received event {}", event);
        this.queuedEvents.increment();
        return Striping.supplyAsyncStriped(() -> {
            return transition(event, j);
        }, event.workflowInstance(), this.eventProcessingExecutor).thenAccept(tuple2 -> {
            postTransition((SequenceEvent) tuple2._1, (RunState) tuple2._2);
        });
    }

    private void initialize(WorkflowInstance workflowInstance) {
        try {
            RunState create = RunState.create(workflowInstance, RunState.State.NEW, (Instant) this.time.get(), ((Long) this.storage.getLatestStoredCounter(workflowInstance).orElse(Long.valueOf(NO_EVENTS_PROCESSED))).longValue());
            try {
                this.storage.runInTransaction(storageTransaction -> {
                    if (storageTransaction.workflow(workflowInstance.workflowId()).isPresent()) {
                        return storageTransaction.writeActiveState(workflowInstance, create);
                    }
                    throw new IllegalArgumentException("Workflow not found: " + workflowInstance.workflowId().toKey());
                });
            } catch (TransactionException e) {
                if (e.isAlreadyExists()) {
                    throw new AlreadyInitializedException("Workflow instance is already triggered: " + workflowInstance);
                }
                if (e.isConflict()) {
                    this.log.debug("Transaction conflict when triggering workflow instance. Aborted: {}", workflowInstance);
                    throw new RuntimeException((Throwable) e);
                }
                this.log.debug("Transaction failure when triggering workflow instance: {}: {}", new Object[]{workflowInstance, e.getMessage(), e});
                throw new RuntimeException((Throwable) e);
            } catch (Exception e2) {
                this.log.debug("Failure when triggering workflow instance: {}: {}", new Object[]{workflowInstance, e2.getMessage(), e2});
                Throwables.throwIfUnchecked(e2);
                throw new RuntimeException(e2);
            }
        } catch (IOException e3) {
            throw new RuntimeException(e3);
        }
    }

    private Tuple2<SequenceEvent, RunState> transition(Event event, long j) {
        this.queuedEvents.decrement();
        try {
            return (Tuple2) this.storage.runInTransaction(storageTransaction -> {
                Optional readActiveState = storageTransaction.readActiveState(event.workflowInstance());
                if (!readActiveState.isPresent()) {
                    String str = "Received event for unknown workflow instance: " + event;
                    this.log.warn(str);
                    throw new IllegalArgumentException(str);
                }
                verifyCounter(event, j, (RunState) readActiveState.get());
                this.log.info("Received event (verified) {}", event);
                try {
                    RunState transition = ((RunState) readActiveState.get()).transition(event, this.time);
                    updateResourceCounters(storageTransaction, event, (RunState) readActiveState.get(), transition);
                    if (transition.state().isTerminal()) {
                        storageTransaction.deleteActiveState(event.workflowInstance());
                    } else {
                        storageTransaction.updateActiveState(event.workflowInstance(), transition);
                    }
                    return Tuple.of(SequenceEvent.create(event, transition.counter(), transition.timestamp()), transition);
                } catch (IllegalStateException e) {
                    this.log.warn("Illegal state transition", e);
                    throw e;
                }
            });
        } catch (TransactionException e) {
            if (e.isConflict()) {
                this.log.debug("Transaction conflict during workflow instance transition. Aborted: {}, counter={}", event, Long.valueOf(j));
                throw new RuntimeException((Throwable) e);
            }
            this.log.debug("Transaction failure during workflow instance transition: {}, counter={}", new Object[]{event, Long.valueOf(j), e});
            throw new RuntimeException((Throwable) e);
        } catch (Exception e2) {
            this.log.debug("Failure during workflow instance transition: {}, counter={}", new Object[]{event, Long.valueOf(j), e2});
            Throwables.throwIfUnchecked(e2);
            throw new RuntimeException(e2);
        }
    }

    private void updateResourceCounters(StorageTransaction storageTransaction, Event event, RunState runState, RunState runState2) throws IOException {
        if (isDequeue(event) && runState2.data().resourceIds().isPresent()) {
            tryUpdatingCounter(runState, storageTransaction, (Set) runState2.data().resourceIds().get());
        }
        if (!StateUtil.isConsumingResources(runState.state()) || StateUtil.isConsumingResources(runState2.state())) {
            return;
        }
        if (!runState2.data().resourceIds().isPresent()) {
            this.log.error("Resource ids are missing for {} when transitioning from {} to {}.", new Object[]{runState2.workflowInstance(), runState, runState2});
            return;
        }
        Iterator it = ((Set) runState2.data().resourceIds().get()).iterator();
        while (it.hasNext()) {
            storageTransaction.updateCounter(this.shardedCounter, (String) it.next(), -1);
        }
    }

    private void tryUpdatingCounter(RunState runState, StorageTransaction storageTransaction, Set<String> set) {
        Set set2 = (Set) set.stream().map(str -> {
            return Tuple.of(str, Try.run(() -> {
                storageTransaction.updateCounter(this.shardedCounter, str, 1);
            }));
        }).filter(tuple2 -> {
            return ((Try) tuple2._2).isFailure();
        }).collect(Collectors.toSet());
        List list = (List) set2.stream().filter(tuple22 -> {
            return ((Try) tuple22._2).getCause() instanceof CounterCapacityException;
        }).map(tuple23 -> {
            return (String) tuple23._1;
        }).sorted().collect(Collectors.toList());
        if (!list.isEmpty()) {
            MessageUtil.emitResourceLimitReachedMessage(this, runState, list);
        }
        if (set2.isEmpty()) {
            return;
        }
        RuntimeException runtimeException = new RuntimeException("Failed to update resource counter for workflow instance: " + runState.workflowInstance() + ": " + ((List) set2.stream().map(tuple24 -> {
            return (String) tuple24._1;
        }).sorted().collect(Collectors.toList())));
        Stream map = set2.stream().map(tuple25 -> {
            return ((Try) tuple25._2).getCause();
        });
        Objects.requireNonNull(runtimeException);
        map.forEach(runtimeException::addSuppressed);
        throw runtimeException;
    }

    private boolean isDequeue(Event event) {
        return EventUtil.name(event).equals("dequeue");
    }

    private void verifyCounter(Event event, long j, RunState runState) {
        if (j == Long.MAX_VALUE) {
            return;
        }
        long counter = runState.counter();
        if (counter > j) {
            String str = "Stale event encountered. Expected counter is " + j + " but current counter is " + counter + ". Discarding event " + event;
            this.log.debug(str);
            throw new StaleEventException(str);
        }
        if (counter < j) {
            String str2 = "Unexpected current counter is less than last observed one for " + runState;
            this.log.error(str2);
            throw new RuntimeException(str2);
        }
    }

    private void postTransition(SequenceEvent sequenceEvent, RunState runState) {
        try {
            this.storage.writeEvent(sequenceEvent);
        } catch (IOException e) {
            this.log.warn("Error writing event {}", sequenceEvent, e);
        }
        try {
            this.eventConsumerExecutor.execute(MDCUtil.withMDC(() -> {
                this.eventConsumer.accept(sequenceEvent, runState);
            }));
        } catch (Exception e2) {
            this.log.warn("Error while consuming event {}", sequenceEvent, e2);
        }
        this.outputHandler.transitionInto(runState);
    }

    @Override // com.spotify.styx.state.StateManager
    public Map<WorkflowInstance, RunState> getActiveStates() {
        try {
            return this.storage.readActiveStates();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // com.spotify.styx.state.StateManager
    public Map<WorkflowInstance, RunState> getActiveStatesByTriggerId(String str) {
        try {
            return this.storage.readActiveStatesByTriggerId(str);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // com.spotify.styx.state.StateManager
    public Optional<RunState> getActiveState(WorkflowInstance workflowInstance) {
        try {
            return this.storage.readActiveState(workflowInstance);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.running) {
            this.running = false;
        }
    }

    @VisibleForTesting
    void ensureRunning() throws IsClosedException {
        if (!this.running) {
            throw new IsClosedException();
        }
    }

    public Long queuedEvents() {
        return Long.valueOf(this.queuedEvents.sum());
    }
}
