package com.spotify.styx;

import com.google.common.base.CaseFormat;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AtomicLongMap;
import com.google.common.util.concurrent.RateLimiter;
import com.spotify.styx.WorkflowExecutionGate;
import com.spotify.styx.model.Event;
import com.spotify.styx.model.Resource;
import com.spotify.styx.model.StyxConfig;
import com.spotify.styx.model.Workflow;
import com.spotify.styx.model.WorkflowId;
import com.spotify.styx.model.WorkflowInstance;
import com.spotify.styx.monitoring.Stats;
import com.spotify.styx.state.InstanceState;
import com.spotify.styx.state.RunState;
import com.spotify.styx.state.StateManager;
import com.spotify.styx.state.StateUtil;
import com.spotify.styx.state.TimeoutConfig;
import com.spotify.styx.storage.Storage;
import com.spotify.styx.util.ShardedCounter;
import com.spotify.styx.util.Time;
import io.grpc.Context;
import io.opencensus.common.Scope;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import io.opencensus.trace.samplers.Samplers;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.Temporal;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spotify/styx/Scheduler.class */
public class Scheduler {
    private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
    private static final String TICK_TYPE = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, Scheduler.class.getSimpleName());
    private static final Tracer tracer = Tracing.getTracer();
    private final Time time;
    private final TimeoutConfig ttls;
    private final StateManager stateManager;
    private final Storage storage;
    private final WorkflowResourceDecorator resourceDecorator;
    private final Stats stats;
    private final RateLimiter dequeueRateLimiter;
    private final WorkflowExecutionGate gate;
    private final ShardedCounter shardedCounter;
    private final Executor executor;

    public Scheduler(Time time, TimeoutConfig timeoutConfig, StateManager stateManager, Storage storage, WorkflowResourceDecorator workflowResourceDecorator, Stats stats, RateLimiter rateLimiter, WorkflowExecutionGate workflowExecutionGate, ShardedCounter shardedCounter, Executor executor) {
        this.time = (Time) Objects.requireNonNull(time);
        this.ttls = (TimeoutConfig) Objects.requireNonNull(timeoutConfig);
        this.stateManager = (StateManager) Objects.requireNonNull(stateManager);
        this.storage = (Storage) Objects.requireNonNull(storage);
        this.resourceDecorator = (WorkflowResourceDecorator) Objects.requireNonNull(workflowResourceDecorator);
        this.stats = (Stats) Objects.requireNonNull(stats);
        this.dequeueRateLimiter = (RateLimiter) Objects.requireNonNull(rateLimiter, "dequeueRateLimiter");
        this.gate = (WorkflowExecutionGate) Objects.requireNonNull(workflowExecutionGate, "gate");
        this.shardedCounter = (ShardedCounter) Objects.requireNonNull(shardedCounter, "shardedCounter");
        this.executor = Context.currentContextExecutor((Executor) Objects.requireNonNull(executor, "executor"));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void tick() {
        Scope startScopedSpan = tracer.spanBuilder("Styx.Scheduler.tick").setRecordEvents(true).setSampler(Samplers.alwaysSample()).startScopedSpan();
        try {
            tick0();
            if (startScopedSpan != null) {
                startScopedSpan.close();
            }
        } catch (Throwable th) {
            if (startScopedSpan != null) {
                try {
                    startScopedSpan.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void tick0() {
        Instant instant = (Instant) this.time.get();
        try {
            Map<String, Resource> map = (Map) this.storage.resources().stream().collect(Collectors.toMap((v0) -> {
                return v0.id();
            }, Function.identity()));
            StyxConfig config = this.storage.config();
            Optional globalConcurrency = config.globalConcurrency();
            globalConcurrency.ifPresent(l -> {
                map.put("GLOBAL_STYX_CLUSTER", Resource.create("GLOBAL_STYX_CLUSTER", l.longValue()));
            });
            Map<WorkflowInstance, RunState> activeStates = this.stateManager.getActiveStates();
            List<InstanceState> activeInstanceStates = StateUtil.getActiveInstanceStates(activeStates);
            Map<WorkflowId, Workflow> workflows = getWorkflows(activeInstanceStates);
            Set<WorkflowInstance> timedOutInstances = StateUtil.getTimedOutInstances(workflows, activeInstanceStates, (Instant) this.time.get(), this.ttls);
            Map<WorkflowId, Set<String>> map2 = (Map) activeInstanceStates.parallelStream().map((v0) -> {
                return v0.workflowInstance();
            }).map((v0) -> {
                return v0.workflowId();
            }).distinct().collect(Collectors.toMap(workflowId -> {
                return workflowId;
            }, workflowId2 -> {
                return StateUtil.workflowResources(globalConcurrency.isPresent(), Optional.ofNullable((Workflow) workflows.get(workflowId2)));
            }));
            ConcurrentMap<String, Long> resourceUsage = StateUtil.getResourceUsage(globalConcurrency.isPresent(), activeInstanceStates, timedOutInstances, this.resourceDecorator, workflows);
            AtomicLongMap<String> create = AtomicLongMap.create();
            updateResourceStats(map, resourceUsage);
            List<InstanceState> list = (List) activeInstanceStates.parallelStream().filter(instanceState -> {
                return !timedOutInstances.contains(instanceState.workflowInstance());
            }).filter(instanceState2 -> {
                return shouldExecute(instanceState2.runState());
            }).sorted(Comparator.comparingLong(instanceState3 -> {
                return instanceState3.runState().timestamp();
            })).collect(Collectors.toList());
            String format = String.format("Instances: active=%d, eligible=%d, timedOut=%d", Integer.valueOf(activeInstanceStates.size()), Integer.valueOf(list.size()), Integer.valueOf(timedOutInstances.size()));
            LOG.info(format);
            tracer.getCurrentSpan().addAnnotation(format);
            timedOutInstances.forEach(workflowInstance -> {
                sendTimeout(workflowInstance, (RunState) activeStates.get(workflowInstance));
            });
            dequeueInstances(config, map, map2, workflows, list, create);
            Map asMap = create.asMap();
            Stats stats = this.stats;
            Objects.requireNonNull(stats);
            asMap.forEach((v1, v2) -> {
                r1.recordResourceDemanded(v1, v2);
            });
            this.stats.recordTickDuration(TICK_TYPE, instant.until((Temporal) this.time.get(), ChronoUnit.MILLIS));
        } catch (IOException e) {
            LOG.warn("Failed to get resource limits", e);
        }
    }

    private void updateResourceStats(Map<String, Resource> map, Map<String, Long> map2) {
        map.values().forEach(resource -> {
            this.stats.recordResourceConfigured(resource.id(), resource.concurrency());
        });
        Stats stats = this.stats;
        Objects.requireNonNull(stats);
        map2.forEach((v1, v2) -> {
            r1.recordResourceUsed(v1, v2);
        });
        Sets.difference(map.keySet(), map2.keySet()).forEach(str -> {
            this.stats.recordResourceUsed(str, 0L);
        });
    }

    private Map<WorkflowId, Workflow> getWorkflows(List<InstanceState> list) {
        return this.storage.workflows((Set) list.stream().map(instanceState -> {
            return instanceState.workflowInstance().workflowId();
        }).collect(Collectors.toSet()));
    }

    private void dequeueInstances(StyxConfig styxConfig, Map<String, Resource> map, Map<WorkflowId, Set<String>> map2, Map<WorkflowId, Workflow> map3, List<InstanceState> list, AtomicLongMap<String> atomicLongMap) {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        ((Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.workflowInstance();
        }, instanceState -> {
            return CompletableFuture.runAsync(() -> {
                dequeueInstance(styxConfig, map, map2, Optional.ofNullable((Workflow) map3.get(instanceState.workflowInstance().workflowId())), instanceState, concurrentHashMap, atomicLongMap);
            }, this.executor);
        }))).forEach((workflowInstance, completableFuture) -> {
            try {
                completableFuture.get();
            } catch (InterruptedException e) {
                LOG.warn("Interrupted", e);
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            } catch (ExecutionException e2) {
                LOG.error("Failed to process instance for dequeue: " + workflowInstance, e2);
            }
        });
    }

    private void dequeueInstance(StyxConfig styxConfig, Map<String, Resource> map, Map<WorkflowId, Set<String>> map2, Optional<Workflow> optional, InstanceState instanceState, ConcurrentMap<String, Boolean> concurrentMap, AtomicLongMap<String> atomicLongMap) {
        tracer.spanBuilder("dequeueInstance").startSpanAndRun(() -> {
            dequeueInstance0(styxConfig, map, map2, optional, instanceState, concurrentMap, atomicLongMap);
        });
    }

    private void dequeueInstance0(StyxConfig styxConfig, Map<String, Resource> map, Map<WorkflowId, Set<String>> map2, Optional<Workflow> optional, InstanceState instanceState, ConcurrentMap<String, Boolean> concurrentMap, AtomicLongMap<String> atomicLongMap) {
        LOG.debug("Evaluating instance for dequeue: {}", instanceState.workflowInstance());
        Set<String> orDefault = map2.getOrDefault(instanceState.workflowInstance().workflowId(), Collections.emptySet());
        Set<String> set = (Set) optional.map(workflow -> {
            return this.resourceDecorator.decorateResources(instanceState.runState(), workflow.configuration(), orDefault);
        }).orElse(orDefault);
        Set set2 = (Set) set.stream().filter(str -> {
            return !map.containsKey(str);
        }).collect(Collectors.toSet());
        if (!set2.isEmpty()) {
            this.stateManager.receiveIgnoreClosed(Event.runError(instanceState.workflowInstance(), String.format("Referenced resources not found: %s", set2)), instanceState.runState().counter());
            return;
        }
        Objects.requireNonNull(atomicLongMap);
        set.forEach((v1) -> {
            r1.incrementAndGet(v1);
        });
        List list = (List) set.stream().filter(str2 -> {
            return limitReached(str2, concurrentMap);
        }).sorted().collect(Collectors.toList());
        if (!list.isEmpty()) {
            LOG.debug("Resource limit reached for instance, not dequeueing: {}: exhausted resources={}", instanceState.workflowInstance(), list);
            MessageUtil.emitResourceLimitReachedMessage(this.stateManager, instanceState.runState(), list);
            return;
        }
        if (styxConfig.executionGatingEnabled()) {
            Optional<WorkflowExecutionGate.ExecutionBlocker> empty = Optional.empty();
            try {
                empty = this.gate.executionBlocker(instanceState.workflowInstance()).toCompletableFuture().get(10L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            } catch (ExecutionException | TimeoutException e2) {
                LOG.warn("Failed to check execution blocker for {}, assuming there is no blocker", instanceState.workflowInstance(), e2);
            }
            if (empty.isPresent()) {
                this.stateManager.receiveIgnoreClosed(Event.retryAfter(instanceState.workflowInstance(), empty.get().delay().toMillis()), instanceState.runState().counter());
                LOG.debug("Dequeue rescheduled: {}: {}", instanceState.workflowInstance(), empty.get());
                return;
            }
        }
        double acquire = this.dequeueRateLimiter.acquire();
        if (acquire > 1.0E-4d) {
            double d = acquire * 1000.0d;
            String str3 = "Dequeue rate limited and slept for " + d + " ms";
            LOG.debug(str3, Double.valueOf(d));
            tracer.getCurrentSpan().addAnnotation(str3);
        }
        sendDequeue(instanceState, set);
    }

    private boolean limitReached(String str, ConcurrentMap<String, Boolean> concurrentMap) {
        return concurrentMap.computeIfAbsent(str, str2 -> {
            try {
                return Boolean.valueOf(!this.shardedCounter.counterHasSpareCapacity(str));
            } catch (IOException | RuntimeException e) {
                LOG.warn("Failed to check resource counter limit", e);
                return false;
            }
        }).booleanValue();
    }

    private boolean shouldExecute(RunState runState) {
        if (runState.state() != RunState.State.QUEUED) {
            return false;
        }
        return !Instant.ofEpochMilli(runState.timestamp()).plusMillis(((Long) runState.data().retryDelayMillis().orElse(0L)).longValue()).isAfter((Instant) this.time.get());
    }

    private void sendDequeue(InstanceState instanceState, Set<String> set) {
        WorkflowInstance workflowInstance = instanceState.workflowInstance();
        RunState runState = instanceState.runState();
        if (runState.data().tries() == 0) {
            LOG.info("Executing {}", workflowInstance);
        } else {
            LOG.info("Executing {}, retry #{}", workflowInstance, Integer.valueOf(runState.data().tries()));
        }
        this.stateManager.receiveIgnoreClosed(Event.dequeue(workflowInstance, set), instanceState.runState().counter());
    }

    private void sendTimeout(WorkflowInstance workflowInstance, RunState runState) {
        LOG.info("Found stale state {} since {} for workflow {}; Issuing a timeout", new Object[]{runState.state(), Instant.ofEpochMilli(runState.timestamp()), workflowInstance});
        this.stateManager.receiveIgnoreClosed(Event.timeout(workflowInstance), runState.counter());
    }
}
