package com.spotify.styx;

import com.google.common.base.CaseFormat;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AtomicLongMap;
import com.google.common.util.concurrent.RateLimiter;
import com.spotify.futures.CompletableFutures;
import com.spotify.styx.model.Event;
import com.spotify.styx.model.Resource;
import com.spotify.styx.model.Schedule;
import com.spotify.styx.model.StyxConfig;
import com.spotify.styx.model.Workflow;
import com.spotify.styx.model.WorkflowConfiguration;
import com.spotify.styx.model.WorkflowId;
import com.spotify.styx.model.WorkflowInstance;
import com.spotify.styx.monitoring.Stats;
import com.spotify.styx.state.RunState;
import com.spotify.styx.state.StateManager;
import com.spotify.styx.state.StateTransitionConflictException;
import com.spotify.styx.state.StateUtil;
import com.spotify.styx.storage.Storage;
import com.spotify.styx.util.CounterCapacityException;
import com.spotify.styx.util.ShardedCounter;
import com.spotify.styx.util.Time;
import io.grpc.Context;
import io.opencensus.common.Scope;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import io.opencensus.trace.samplers.Samplers;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.Temporal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spotify/styx/Scheduler.class */
public class Scheduler {
    private static final String TICK_TYPE = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, Scheduler.class.getSimpleName());
    private static final Tracer tracer = Tracing.getTracer();
    private final Time time;
    private final StateManager stateManager;
    private final Storage storage;
    private final WorkflowResourceDecorator resourceDecorator;
    private final Stats stats;
    private final RateLimiter dequeueRateLimiter;
    private final ShardedCounter shardedCounter;
    private final Executor executor;
    private final Logger log;

    /* JADX INFO: Access modifiers changed from: package-private */
    public Scheduler(Time time, StateManager stateManager, Storage storage, WorkflowResourceDecorator workflowResourceDecorator, Stats stats, RateLimiter rateLimiter, ShardedCounter shardedCounter, Executor executor) {
        this(time, stateManager, storage, workflowResourceDecorator, stats, rateLimiter, shardedCounter, executor, LoggerFactory.getLogger(Scheduler.class));
    }

    Scheduler(Time time, StateManager stateManager, Storage storage, WorkflowResourceDecorator workflowResourceDecorator, Stats stats, RateLimiter rateLimiter, ShardedCounter shardedCounter, Executor executor, Logger logger) {
        this.time = (Time) Objects.requireNonNull(time);
        this.stateManager = (StateManager) Objects.requireNonNull(stateManager);
        this.storage = (Storage) Objects.requireNonNull(storage);
        this.resourceDecorator = (WorkflowResourceDecorator) Objects.requireNonNull(workflowResourceDecorator);
        this.stats = (Stats) Objects.requireNonNull(stats);
        this.dequeueRateLimiter = (RateLimiter) Objects.requireNonNull(rateLimiter, "dequeueRateLimiter");
        this.shardedCounter = (ShardedCounter) Objects.requireNonNull(shardedCounter, "shardedCounter");
        this.executor = Context.currentContextExecutor((Executor) Objects.requireNonNull(executor, "executor"));
        this.log = (Logger) Objects.requireNonNull(logger, "log");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void tick() {
        Scope startScopedSpan = tracer.spanBuilder("Styx.Scheduler.tick").setRecordEvents(true).setSampler(Samplers.alwaysSample()).startScopedSpan();
        try {
            tick0();
            if (startScopedSpan != null) {
                startScopedSpan.close();
            }
        } catch (Throwable th) {
            if (startScopedSpan != null) {
                try {
                    startScopedSpan.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void tick0() {
        Instant instant = (Instant) this.time.get();
        try {
            Map<String, Resource> map = (Map) this.storage.resources().stream().collect(Collectors.toMap((v0) -> {
                return v0.id();
            }, Function.identity()));
            StyxConfig config = this.storage.config();
            config.globalConcurrency().ifPresent(l -> {
                map.put("GLOBAL_STYX_CLUSTER", Resource.create("GLOBAL_STYX_CLUSTER", l.longValue()));
            });
            Set<WorkflowInstance> listActiveInstances = this.stateManager.listActiveInstances();
            ConcurrentHashMap<WorkflowId, Optional<Workflow>> concurrentHashMap = new ConcurrentHashMap<>();
            AtomicLongMap<String> create = AtomicLongMap.create();
            AtomicLongMap<String> create2 = AtomicLongMap.create();
            processInstances(config, map, concurrentHashMap, listActiveInstances, create, create2);
            updateResourceStats(map, create);
            Map asMap = create2.asMap();
            Stats stats = this.stats;
            Objects.requireNonNull(stats);
            asMap.forEach((v1, v2) -> {
                r1.recordResourceDemanded(v1, v2);
            });
            this.stats.recordTickDuration(TICK_TYPE, instant.until((Temporal) this.time.get(), ChronoUnit.MILLIS));
            tracer.getCurrentSpan().addAnnotation("processed", Map.of("instances", AttributeValue.longAttributeValue(listActiveInstances.size())));
        } catch (IOException e) {
            this.log.warn("Failed to get resource limits", e);
        }
    }

    private void updateResourceStats(Map<String, Resource> map, AtomicLongMap<String> atomicLongMap) {
        map.values().forEach(resource -> {
            this.stats.recordResourceConfigured(resource.id(), resource.concurrency());
        });
        Map asMap = atomicLongMap.asMap();
        Stats stats = this.stats;
        Objects.requireNonNull(stats);
        asMap.forEach((v1, v2) -> {
            r1.recordResourceUsed(v1, v2);
        });
        Sets.difference(map.keySet(), atomicLongMap.asMap().keySet()).forEach(str -> {
            this.stats.recordResourceUsed(str, 0L);
        });
    }

    private void processInstances(StyxConfig styxConfig, Map<String, Resource> map, ConcurrentHashMap<WorkflowId, Optional<Workflow>> concurrentHashMap, Set<WorkflowInstance> set, AtomicLongMap<String> atomicLongMap, AtomicLongMap<String> atomicLongMap2) {
        ConcurrentHashMap concurrentHashMap2 = new ConcurrentHashMap();
        ArrayList arrayList = new ArrayList(set);
        Collections.shuffle(arrayList);
        CompletableFutures.allAsList((List) arrayList.stream().map(workflowInstance -> {
            return CompletableFuture.runAsync(() -> {
                tracer.spanBuilderWithExplicitParent("Styx.Scheduler.processInstance", (Span) null).startSpanAndRun(() -> {
                    try {
                        processInstance(styxConfig, map, concurrentHashMap, workflowInstance, concurrentHashMap2, atomicLongMap, atomicLongMap2);
                    } catch (StateTransitionConflictException e) {
                        this.log.debug("State transition conflict when scheduling instance: {}", workflowInstance, e);
                    } catch (CounterCapacityException e2) {
                        this.log.debug("Counter capacity exhausted when scheduling instance: {}", workflowInstance, e2);
                    } catch (Throwable th) {
                        this.log.warn("Caught exception when scheduling instance: {}", workflowInstance, th);
                    }
                });
            }, this.executor);
        }).collect(Collectors.toList())).join();
    }

    private void processInstance(StyxConfig styxConfig, Map<String, Resource> map, ConcurrentMap<WorkflowId, Optional<Workflow>> concurrentMap, WorkflowInstance workflowInstance, ConcurrentMap<String, Boolean> concurrentMap2, AtomicLongMap<String> atomicLongMap, AtomicLongMap<String> atomicLongMap2) {
        this.log.debug("Processing instance: {}", workflowInstance);
        Optional<RunState> activeState = this.stateManager.getActiveState(workflowInstance);
        if (activeState.isEmpty()) {
            return;
        }
        RunState orElseThrow = activeState.orElseThrow();
        if (StateUtil.isConsumingResources(orElseThrow.state())) {
            orElseThrow.data().resourceIds().ifPresent(set -> {
                Objects.requireNonNull(atomicLongMap);
                set.forEach((v1) -> {
                    r1.incrementAndGet(v1);
                });
            });
        }
        if (shouldExecute(orElseThrow)) {
            this.log.debug("Evaluating instance for dequeue: {}", workflowInstance);
            Optional<Workflow> computeIfAbsent = concurrentMap.computeIfAbsent(workflowInstance.workflowId(), this::readWorkflow);
            Set<String> decorateResources = this.resourceDecorator.decorateResources(orElseThrow, (WorkflowConfiguration) computeIfAbsent.map((v0) -> {
                return v0.configuration();
            }).orElse(WorkflowConfiguration.builder().id(workflowInstance.workflowId().id()).schedule(Schedule.parse("")).build()), StateUtil.workflowResources(styxConfig.globalConcurrency().isPresent(), computeIfAbsent));
            Set set2 = (Set) decorateResources.stream().filter(str -> {
                return !map.containsKey(str);
            }).collect(Collectors.toSet());
            if (!set2.isEmpty()) {
                this.stateManager.receiveIgnoreClosed(Event.runError(workflowInstance, "Referenced resources not found: " + set2), orElseThrow.counter());
                return;
            }
            Objects.requireNonNull(atomicLongMap2);
            decorateResources.forEach((v1) -> {
                r1.incrementAndGet(v1);
            });
            List list = (List) decorateResources.stream().filter(str2 -> {
                return limitReached(str2, concurrentMap2);
            }).sorted().collect(Collectors.toList());
            if (list.isEmpty()) {
                sendDequeue(workflowInstance, orElseThrow, decorateResources);
            } else {
                this.log.debug("Resource limit reached for instance, not dequeueing: {}: exhausted resources={}", workflowInstance, list);
                MessageUtil.emitResourceLimitReachedMessage(this.stateManager, orElseThrow, list);
            }
        }
    }

    private Optional<Workflow> readWorkflow(WorkflowId workflowId) {
        try {
            return this.storage.workflow(workflowId);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private boolean limitReached(String str, ConcurrentMap<String, Boolean> concurrentMap) {
        return concurrentMap.computeIfAbsent(str, str2 -> {
            try {
                return Boolean.valueOf(!this.shardedCounter.counterHasSpareCapacity(str));
            } catch (IOException | RuntimeException e) {
                this.log.warn("Failed to check resource counter limit", e);
                return false;
            }
        }).booleanValue();
    }

    private boolean shouldExecute(RunState runState) {
        if (runState.state() != RunState.State.QUEUED) {
            return false;
        }
        return !Instant.ofEpochMilli(runState.timestamp()).plusMillis(((Long) runState.data().retryDelayMillis().orElse(0L)).longValue()).isAfter((Instant) this.time.get());
    }

    private void sendDequeue(WorkflowInstance workflowInstance, RunState runState, Set<String> set) {
        double acquire = this.dequeueRateLimiter.acquire();
        if (acquire > 1.0E-4d) {
            double d = acquire * 1000.0d;
            String str = "Dequeue rate limited and slept for " + d + " ms";
            this.log.debug(str, Double.valueOf(d));
            tracer.getCurrentSpan().addAnnotation(str);
        }
        if (runState.data().tries() == 0) {
            this.log.info("Executing {}", workflowInstance);
        } else {
            this.log.info("Executing {}, retry #{}", workflowInstance, Integer.valueOf(runState.data().tries()));
        }
        this.stateManager.receiveIgnoreClosed(Event.dequeue(workflowInstance, set), runState.counter());
    }
}
