package com.spotify.styx.state;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.spotify.styx.model.Event;
import com.spotify.styx.model.SequenceEvent;
import com.spotify.styx.model.WorkflowId;
import com.spotify.styx.model.WorkflowInstance;
import com.spotify.styx.state.RunState;
import com.spotify.styx.state.StateManager;
import com.spotify.styx.storage.EventStorage;
import com.spotify.styx.util.Time;
import java.io.IOException;
import java.time.Instant;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spotify/styx/state/QueuedStateManager.class */
public class QueuedStateManager implements StateManager, StaleStateReaper, StateRetrier {
    private static final Logger LOG = LoggerFactory.getLogger(QueuedStateManager.class);
    static final String DISPATCHER_THREAD_NAME = "styx-event-dispatcher";
    static final int EVENT_QUEUE_SIZE = 1024;
    static final int POLL_TIMEOUT_MILLIS = 10;
    static final int SHUTDOWN_GRACE_PERIOD_SECONDS = 5;
    static final long NO_EVENTS_PROCESSED = -1;
    private final TimeoutConfig ttls;
    private final Time time;
    private final Executor workerPool;
    private final EventStorage storage;
    private final ConcurrentMap<WorkflowInstance, InstanceState> states = Maps.newConcurrentMap();
    private final CountDownLatch closedLatch = new CountDownLatch(1);
    private final Object signal = new Object();
    private AtomicInteger activeEvents = new AtomicInteger(0);
    private volatile boolean running = true;
    private final Thread dispatcherThread = new Thread(this::dispatch);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/spotify/styx/state/QueuedStateManager$InstanceState.class */
    public class InstanceState {
        final WorkflowInstance workflowInstance;
        final Queue<Runnable> queue = new LinkedBlockingQueue(QueuedStateManager.EVENT_QUEUE_SIZE);
        final Semaphore mutex = new Semaphore(1);
        volatile RunState runState;
        volatile long counter;

        InstanceState(WorkflowInstance workflowInstance, RunState runState, long j) {
            this.workflowInstance = workflowInstance;
            this.runState = runState;
            this.counter = j;
        }

        void enqueue(Runnable runnable) {
            if (!this.queue.offer(runnable)) {
                throw new RuntimeException("Transition queue for " + this.workflowInstance.toKey() + " is full");
            }
            QueuedStateManager.this.activeEvents.incrementAndGet();
        }

        void mutexPoll() {
            if (!this.queue.isEmpty() && this.mutex.tryAcquire()) {
                try {
                    QueuedStateManager.this.workerPool.execute(() -> {
                        try {
                            Runnable poll = this.queue.poll();
                            if (poll != null) {
                                try {
                                    invoke(poll);
                                    QueuedStateManager.this.activeEvents.decrementAndGet();
                                } catch (Throwable th) {
                                    QueuedStateManager.this.activeEvents.decrementAndGet();
                                    throw th;
                                }
                            }
                            mutexPoll();
                        } finally {
                            this.mutex.release();
                        }
                    });
                } catch (Throwable th) {
                    QueuedStateManager.LOG.error("Failed to submit event worker task", th);
                    this.mutex.release();
                }
            }
        }

        void invoke(Runnable runnable) {
            try {
                runnable.run();
            } catch (Throwable th) {
                QueuedStateManager.LOG.warn("Exception in event runnable for {}", this.workflowInstance.toKey(), th);
            }
        }
    }

    public QueuedStateManager(TimeoutConfig timeoutConfig, Time time, Executor executor, EventStorage eventStorage) {
        this.ttls = (TimeoutConfig) Objects.requireNonNull(timeoutConfig);
        this.time = (Time) Objects.requireNonNull(time);
        this.workerPool = (Executor) Objects.requireNonNull(executor);
        this.storage = (EventStorage) Objects.requireNonNull(eventStorage);
        this.dispatcherThread.setName(DISPATCHER_THREAD_NAME);
        this.dispatcherThread.start();
    }

    @Override // com.spotify.styx.state.StateManager
    public void initialize(RunState runState) throws StateManager.IsClosed {
        ensureRunning();
        WorkflowInstance workflowInstance = runState.workflowInstance();
        if (this.states.containsKey(workflowInstance)) {
            throw new RuntimeException("RunState initialization called on active instance " + workflowInstance.toKey());
        }
        try {
            long longValue = ((Long) this.storage.getLatestStoredCounter(workflowInstance).orElse(Long.valueOf(NO_EVENTS_PROCESSED))).longValue();
            storeActivation(workflowInstance, longValue);
            this.states.computeIfAbsent(workflowInstance, workflowInstance2 -> {
                return new InstanceState(workflowInstance2, runState, longValue + 1);
            });
        } catch (IOException e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // com.spotify.styx.state.StateManager
    public void restore(RunState runState, long j) {
        WorkflowInstance workflowInstance = runState.workflowInstance();
        if (this.states.containsKey(workflowInstance)) {
            throw new RuntimeException("RunState initialization called on active instance " + workflowInstance.toKey());
        }
        this.states.computeIfAbsent(workflowInstance, workflowInstance2 -> {
            return new InstanceState(workflowInstance2, runState, j + 1);
        });
    }

    @Override // com.spotify.styx.state.StateManager
    public void receive(Event event) throws StateManager.IsClosed {
        ensureRunning();
        InstanceState instanceState = this.states.get(event.workflowInstance());
        if (instanceState == null) {
            LOG.warn("Received event for unknown workflow instance: {}", event);
        } else {
            instanceState.enqueue(() -> {
                transition(instanceState, event);
            });
            signalDispatcher();
        }
    }

    @Override // com.spotify.styx.state.StateManager
    public long getActiveStatesCount() {
        return this.states.size();
    }

    @Override // com.spotify.styx.state.StateManager
    public long getQueuedEventsCount() {
        return this.states.values().stream().mapToInt(instanceState -> {
            return instanceState.queue.size();
        }).sum();
    }

    @Override // com.spotify.styx.state.StateManager
    public long getActiveStatesCount(WorkflowId workflowId) {
        return this.states.keySet().stream().filter(workflowInstance -> {
            return workflowInstance.workflowId().equals(workflowId);
        }).count();
    }

    @Override // com.spotify.styx.state.StateManager
    public boolean isActiveWorkflowInstance(WorkflowInstance workflowInstance) {
        return this.states.containsKey(workflowInstance);
    }

    @Override // com.spotify.styx.state.StateManager
    public RunState get(WorkflowInstance workflowInstance) {
        InstanceState instanceState = this.states.get(workflowInstance);
        if (instanceState != null) {
            return instanceState.runState;
        }
        return null;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.running) {
            this.running = false;
            LOG.info("Shutting down, waiting for queued events to process");
            try {
                if (this.closedLatch.await(5L, TimeUnit.SECONDS)) {
                    LOG.info("Shutdown was clean, {} events left in queue", Long.valueOf(getQueuedEventsCount()));
                } else {
                    this.dispatcherThread.interrupt();
                    throw new IOException("Graceful shutdown failed, event loop did not finish within grace period");
                }
            } catch (InterruptedException e) {
                this.dispatcherThread.interrupt();
                throw new IOException(e);
            }
        }
    }

    @Override // com.spotify.styx.state.StaleStateReaper
    public void triggerTimeouts() {
        if (this.running) {
            this.states.entrySet().stream().filter(entry -> {
                return hasTimedOut(((InstanceState) entry.getValue()).runState);
            }).forEach(entry2 -> {
                ((InstanceState) entry2.getValue()).enqueue(() -> {
                    RunState runState = this.states.get(entry2.getKey()).runState;
                    if (hasTimedOut(runState)) {
                        LOG.info("Found stale state, triggering timeout for {}", runState);
                        try {
                            receive(Event.timeout((WorkflowInstance) entry2.getKey()));
                        } catch (StateManager.IsClosed e) {
                        }
                    }
                });
            });
        }
    }

    @Override // com.spotify.styx.state.StateRetrier
    public void triggerRetries() {
        if (this.running) {
            this.states.entrySet().stream().filter(entry -> {
                return shouldRetry(((InstanceState) entry.getValue()).runState);
            }).forEach(entry2 -> {
                ((InstanceState) entry2.getValue()).enqueue(() -> {
                    WorkflowInstance workflowInstance = (WorkflowInstance) entry2.getKey();
                    RunState runState = this.states.get(workflowInstance).runState;
                    if (shouldRetry(runState)) {
                        LOG.info("{} triggering retry #{}", workflowInstance.toKey(), Integer.valueOf(runState.tries()));
                        try {
                            receive(Event.retry(workflowInstance));
                        } catch (StateManager.IsClosed e) {
                        }
                    }
                });
            });
        }
    }

    private void dispatch() {
        while (true) {
            if (!this.running && getQueuedEventsCount() <= 0) {
                this.closedLatch.countDown();
                return;
            } else {
                this.states.values().forEach((v0) -> {
                    v0.mutexPoll();
                });
                waitForSignal();
            }
        }
    }

    private void signalDispatcher() {
        synchronized (this.signal) {
            this.signal.notifyAll();
        }
    }

    private void waitForSignal() {
        synchronized (this.signal) {
            try {
                this.signal.wait(10L);
            } catch (InterruptedException e) {
            }
        }
    }

    private void ensureRunning() throws StateManager.IsClosed {
        if (!this.running) {
            throw new StateManager.IsClosed();
        }
    }

    private void transition(InstanceState instanceState, Event event) {
        LOG.debug("Event {} -> {}", event, instanceState);
        try {
            RunState transition = instanceState.runState.transition(event);
            WorkflowInstance workflowInstance = instanceState.workflowInstance;
            long j = instanceState.counter;
            storeEvent(SequenceEvent.create(event, j, ((Instant) this.time.get()).toEpochMilli()));
            if (transition.state().isTerminal()) {
                this.states.remove(workflowInstance);
                storeDeactivation(workflowInstance);
            } else {
                instanceState.runState = transition;
                instanceState.counter++;
                storeActivation(workflowInstance, j);
            }
            this.activeEvents.incrementAndGet();
            this.workerPool.execute(() -> {
                try {
                    transition.outputHandler().transitionInto(transition);
                } catch (Throwable th) {
                    LOG.warn("Output handler threw", th);
                } finally {
                    this.activeEvents.decrementAndGet();
                }
            });
        } catch (IOException e) {
            LOG.error("Failed to read/write from/to Storage", e);
        } catch (IllegalStateException e2) {
            LOG.warn("Illegal state transition", e2);
        }
    }

    private void storeEvent(SequenceEvent sequenceEvent) throws IOException {
        this.storage.writeEvent(sequenceEvent);
    }

    private void storeActivation(WorkflowInstance workflowInstance, long j) throws IOException {
        this.storage.writeActiveState(workflowInstance, j);
    }

    private void storeDeactivation(WorkflowInstance workflowInstance) throws IOException {
        this.storage.deleteActiveState(workflowInstance);
    }

    private boolean hasTimedOut(RunState runState) {
        return !runState.state().isTerminal() && ((Instant) this.time.get()).toEpochMilli() - runState.timestamp() >= this.ttls.ttlOf(runState.state()).toMillis();
    }

    private boolean shouldRetry(RunState runState) {
        return runState.state() == RunState.State.AWAITING_RETRY && ((Instant) this.time.get()).toEpochMilli() - runState.timestamp() >= runState.retryDelayMillis();
    }

    @VisibleForTesting
    boolean awaitIdle(long j) {
        long epochMilli = ((Instant) this.time.get()).toEpochMilli();
        while (this.activeEvents.get() > 0 && ((Instant) this.time.get()).toEpochMilli() - epochMilli < j) {
            Thread.yield();
        }
        return ((Instant) this.time.get()).toEpochMilli() - epochMilli < j;
    }
}
