package com.spotify.styx;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
import com.spotify.styx.model.Event;
import com.spotify.styx.model.Resource;
import com.spotify.styx.model.WorkflowId;
import com.spotify.styx.model.WorkflowInstance;
import com.spotify.styx.monitoring.Stats;
import com.spotify.styx.state.Message;
import com.spotify.styx.state.RunState;
import com.spotify.styx.state.StateManager;
import com.spotify.styx.state.TimeoutConfig;
import com.spotify.styx.storage.Storage;
import com.spotify.styx.util.Time;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spotify/styx/Scheduler.class */
public class Scheduler {
    private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);

    @VisibleForTesting
    static final String GLOBAL_RESOURCE_ID = "GLOBAL_STYX_CLUSTER";
    private final Time time;
    private final TimeoutConfig ttls;
    private final StateManager stateManager;
    private final WorkflowCache workflowCache;
    private final Storage storage;
    private final WorkflowResourceDecorator resourceDecorator;
    private final Stats stats;
    private final RateLimiter dequeueRateLimiter;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/spotify/styx/Scheduler$ResourceWithInstance.class */
    public static abstract class ResourceWithInstance {
        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String resource();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract InstanceState instanceState();

        /* JADX INFO: Access modifiers changed from: package-private */
        public static ResourceWithInstance create(String str, InstanceState instanceState) {
            return new AutoValue_Scheduler_ResourceWithInstance(str, instanceState);
        }
    }

    public Scheduler(Time time, TimeoutConfig timeoutConfig, StateManager stateManager, WorkflowCache workflowCache, Storage storage, WorkflowResourceDecorator workflowResourceDecorator, Stats stats, RateLimiter rateLimiter) {
        this.time = (Time) Objects.requireNonNull(time);
        this.ttls = (TimeoutConfig) Objects.requireNonNull(timeoutConfig);
        this.stateManager = (StateManager) Objects.requireNonNull(stateManager);
        this.workflowCache = (WorkflowCache) Objects.requireNonNull(workflowCache);
        this.storage = (Storage) Objects.requireNonNull(storage);
        this.resourceDecorator = (WorkflowResourceDecorator) Objects.requireNonNull(workflowResourceDecorator);
        this.stats = (Stats) Objects.requireNonNull(stats);
        this.dequeueRateLimiter = (RateLimiter) Objects.requireNonNull(rateLimiter, "dequeueRateLimiter");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void tick() {
        try {
            Map<String, Resource> map = (Map) this.storage.resources().stream().collect(Collectors.toMap((v0) -> {
                return v0.id();
            }, Function.identity()));
            Optional globalConcurrency = this.storage.globalConcurrency();
            globalConcurrency.ifPresent(l -> {
            });
            List list = (List) this.stateManager.activeStates().entrySet().stream().map(entry -> {
                return InstanceState.create((WorkflowInstance) entry.getKey(), (RunState) entry.getValue());
            }).collect(Collectors.toList());
            Set set = (Set) list.parallelStream().filter(instanceState -> {
                return hasTimedOut(instanceState.runState());
            }).map((v0) -> {
                return v0.workflowInstance();
            }).collect(Collectors.toSet());
            Map<WorkflowId, Set<String>> map2 = (Map) list.parallelStream().map((v0) -> {
                return v0.workflowInstance();
            }).map((v0) -> {
                return v0.workflowId();
            }).distinct().collect(Collectors.toMap(workflowId -> {
                return workflowId;
            }, workflowId2 -> {
                return workflowResources(globalConcurrency, workflowId2);
            }));
            Map<String, Long> map3 = (Map) list.parallelStream().filter(instanceState2 -> {
                return !set.contains(instanceState2.workflowInstance());
            }).filter(instanceState3 -> {
                return instanceState3.runState().state() != RunState.State.QUEUED;
            }).flatMap(instanceState4 -> {
                return pairWithResources(globalConcurrency, instanceState4);
            }).collect(Collectors.groupingByConcurrent((v0) -> {
                return v0.resource();
            }, ConcurrentHashMap::new, Collectors.counting()));
            List list2 = (List) list.parallelStream().filter(instanceState5 -> {
                return !set.contains(instanceState5.workflowInstance());
            }).filter(instanceState6 -> {
                return shouldExecute(instanceState6.runState());
            }).collect(Collectors.toCollection(Lists::newArrayList));
            Collections.shuffle(list2);
            set.forEach(this::sendTimeout);
            Iterator it = list2.iterator();
            while (it.hasNext() && limitAndDequeue(map, map2, map3, (InstanceState) it.next())) {
            }
            updateStats(map, map3);
        } catch (IOException e) {
            LOG.warn("Failed to get resource limits", e);
        }
    }

    private void updateStats(Map<String, Resource> map, Map<String, Long> map2) {
        map.values().forEach(resource -> {
            this.stats.recordResourceConfigured(resource.id(), resource.concurrency());
        });
        Stats stats = this.stats;
        stats.getClass();
        map2.forEach((v1, v2) -> {
            r1.recordResourceUsed(v1, v2);
        });
        Sets.difference(map.keySet(), map2.keySet()).forEach(str -> {
            this.stats.recordResourceUsed(str, 0L);
        });
    }

    private boolean limitAndDequeue(Map<String, Resource> map, Map<WorkflowId, Set<String>> map2, Map<String, Long> map3, InstanceState instanceState) {
        Set<String> orDefault = map2.getOrDefault(instanceState.workflowInstance().workflowId(), Collections.emptySet());
        Set set = (Set) this.workflowCache.workflow(instanceState.workflowInstance().workflowId()).map(workflow -> {
            return this.resourceDecorator.decorateResources(instanceState.runState(), workflow.configuration(), orDefault);
        }).orElse(orDefault);
        Set set2 = (Set) set.stream().filter(str -> {
            return !map.containsKey(str);
        }).collect(Collectors.toSet());
        Set set3 = (Set) set.stream().filter(str2 -> {
            if (map.containsKey(str2)) {
                return ((Long) map3.getOrDefault(str2, 0L)).longValue() >= ((Resource) map.get(str2)).concurrency();
            }
            return false;
        }).collect(Collectors.toSet());
        if (!set2.isEmpty()) {
            this.stateManager.receiveIgnoreClosed(Event.runError(instanceState.workflowInstance(), String.format("Referenced resources not found: %s", set2)));
            return true;
        }
        if (set3.isEmpty()) {
            if (!this.dequeueRateLimiter.tryAcquire()) {
                LOG.debug("Dequeue rate limited");
                return false;
            }
            set.forEach(str3 -> {
            });
            set.forEach(str4 -> {
            });
            sendDequeue(instanceState);
            return true;
        }
        Stream stream = set3.stream();
        map.getClass();
        Message info = Message.info(String.format("Resource limit reached for: %s", stream.map((v1) -> {
            return r5.get(v1);
        }).collect(Collectors.toList())));
        List messages = instanceState.runState().data().messages();
        if (messages.size() != 0 && info.equals(messages.get(messages.size() - 1))) {
            return true;
        }
        this.stateManager.receiveIgnoreClosed(Event.info(instanceState.workflowInstance(), info));
        return true;
    }

    private Stream<ResourceWithInstance> pairWithResources(Optional<Long> optional, InstanceState instanceState) {
        WorkflowId workflowId = instanceState.workflowInstance().workflowId();
        Set<String> workflowResources = workflowResources(optional, workflowId);
        return ((Set) this.workflowCache.workflow(workflowId).map(workflow -> {
            return this.resourceDecorator.decorateResources(instanceState.runState(), workflow.configuration(), workflowResources);
        }).orElse(workflowResources)).stream().map(str -> {
            return ResourceWithInstance.create(str, instanceState);
        });
    }

    private Set<String> workflowResources(Optional<Long> optional, WorkflowId workflowId) {
        ImmutableSet.Builder builder = ImmutableSet.builder();
        optional.ifPresent(l -> {
            builder.add(GLOBAL_RESOURCE_ID);
        });
        this.workflowCache.workflow(workflowId).ifPresent(workflow -> {
            builder.addAll(workflow.configuration().resources());
        });
        return builder.build();
    }

    private boolean shouldExecute(RunState runState) {
        if (runState.state() != RunState.State.QUEUED) {
            return false;
        }
        return !Instant.ofEpochMilli(runState.timestamp()).plusMillis(((Long) runState.data().retryDelayMillis().orElse(0L)).longValue()).isAfter((Instant) this.time.get());
    }

    private void sendDequeue(InstanceState instanceState) {
        WorkflowInstance workflowInstance = instanceState.workflowInstance();
        RunState runState = instanceState.runState();
        if (runState.data().tries() == 0) {
            LOG.info("Triggering {}", workflowInstance.toKey());
        } else {
            LOG.info("{} executing retry #{}", workflowInstance.toKey(), Integer.valueOf(runState.data().tries()));
        }
        this.stateManager.receiveIgnoreClosed(Event.dequeue(workflowInstance));
    }

    private boolean hasTimedOut(RunState runState) {
        if (runState.state().isTerminal()) {
            return false;
        }
        return !Instant.ofEpochMilli(runState.timestamp()).plus((TemporalAmount) this.ttls.ttlOf(runState.state())).isAfter((Instant) this.time.get());
    }

    private void sendTimeout(WorkflowInstance workflowInstance) {
        LOG.info("Found stale state, issuing timeout for {}", workflowInstance);
        this.stateManager.receiveIgnoreClosed(Event.timeout(workflowInstance));
    }
}
