package com.spotify.styx;

import com.codahale.metrics.Gauge;
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.googleapis.util.Utils;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.services.container.Container;
import com.google.api.services.container.ContainerScopes;
import com.google.api.services.container.model.Cluster;
import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import com.google.cloud.datastore.Datastore;
import com.google.cloud.datastore.DatastoreOptions;
import com.google.common.base.Throwables;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.spotify.apollo.AppInit;
import com.spotify.apollo.Environment;
import com.spotify.apollo.route.Route;
import com.spotify.metrics.core.SemanticMetricRegistry;
import com.spotify.styx.api.SchedulerResource;
import com.spotify.styx.docker.DockerRunner;
import com.spotify.styx.docker.WorkflowValidator;
import com.spotify.styx.model.Event;
import com.spotify.styx.model.Workflow;
import com.spotify.styx.model.WorkflowId;
import com.spotify.styx.model.WorkflowInstance;
import com.spotify.styx.monitoring.MeteredDockerRunner;
import com.spotify.styx.monitoring.MeteredEventStorage;
import com.spotify.styx.monitoring.MeteredStorage;
import com.spotify.styx.monitoring.MetricsStats;
import com.spotify.styx.monitoring.MonitoringHandler;
import com.spotify.styx.monitoring.Stats;
import com.spotify.styx.publisher.Publisher;
import com.spotify.styx.schedule.ScheduleSourceFactory;
import com.spotify.styx.state.OutputHandler;
import com.spotify.styx.state.QueuedStateManager;
import com.spotify.styx.state.RunState;
import com.spotify.styx.state.StaleStateReaper;
import com.spotify.styx.state.StateManager;
import com.spotify.styx.state.StateRetrier;
import com.spotify.styx.state.TimeoutConfig;
import com.spotify.styx.state.handlers.DockerRunnerHandler;
import com.spotify.styx.state.handlers.ExecutionDescriptionHandler;
import com.spotify.styx.state.handlers.PublisherHandler;
import com.spotify.styx.state.handlers.StorageHandler;
import com.spotify.styx.state.handlers.TerminationHandler;
import com.spotify.styx.storage.AggregateStorage;
import com.spotify.styx.storage.EventStorage;
import com.spotify.styx.storage.InMemStorage;
import com.spotify.styx.storage.NoopEventStorage;
import com.spotify.styx.storage.Storage;
import com.spotify.styx.util.EventStorageFactory;
import com.spotify.styx.util.ReplayEvents;
import com.spotify.styx.util.Singleton;
import com.spotify.styx.util.StorageFactory;
import com.spotify.styx.util.Time;
import com.typesafe.config.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import java.io.Closeable;
import java.io.IOException;
import java.lang.Thread;
import java.security.GeneralSecurityException;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.client.Connection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spotify/styx/StyxScheduler.class */
public class StyxScheduler implements AppInit {
    public static final String SERVICE_NAME = "styx-scheduler";
    public static final String GKE_CLUSTER_PREFIX = "styx.gke.";
    public static final String GKE_CLUSTER_PROJECT_ID = ".project-id";
    public static final String GKE_CLUSTER_ZONE = ".cluster-zone";
    public static final String GKE_CLUSTER_ID = ".cluster-id";
    public static final String BIGTABLE_PROJECT_ID = "styx.bigtable.project-id";
    public static final String BIGTABLE_INSTANCE_ID = "styx.bigtable.instance-id";
    public static final String DATASTORE_PROJECT = "styx.datastore.project-id";
    public static final String DATASTORE_NAMESPACE = "styx.datastore.namespace";
    public static final String STYX_STALE_STATE_TTL_CONFIG = "styx.stale-state-ttls";
    public static final String STYX_MODE = "styx.mode";
    public static final String STYX_MODE_DEVELOPMENT = "development";
    public static final int STATE_REAP_INTERVAL_SECONDS = 30;
    public static final int STATE_RETRY_CHECK_INTERVAL_SECONDS = 2;
    public static final int DEFAULT_RETRY_MAX_EXPONENT = 6;
    private final Time time;
    private final StorageFactory storageFactory;
    private final EventStorageFactory eventStorageFactory;
    private final DockerRunnerFactory dockerRunnerFactory;
    private final ScheduleSources scheduleSources;
    private final StatsFactory statsFactory;
    private final ExecutorFactory executorFactory;
    private final PublisherFactory publisherFactory;
    private StateManager stateManager;
    public static final Duration DEFAULT_RETRY_BASE_DELAY = Duration.ofMinutes(3);
    public static final Duration DEFAULT_RETRY_BASE_DELAY_BT = Duration.ofSeconds(1);
    private static final Logger LOG = LoggerFactory.getLogger(StyxScheduler.class);

    /* loaded from: input_file:com/spotify/styx/StyxScheduler$Builder.class */
    public static class Builder {
        private final Singleton<AggregateStorage> storage = Singleton.create(environment -> {
            return StyxScheduler.storage(environment);
        });
        private Time time = Instant::now;
        private StorageFactory storageFactory = StyxScheduler.storage(this.storage);
        private EventStorageFactory eventStorageFactory = StyxScheduler.eventStorage(this.storage);
        private DockerRunnerFactory dockerRunnerFactory = (str, environment, stateManager, scheduledExecutorService, stats) -> {
            return StyxScheduler.createDockerRunner(str, environment, stateManager, scheduledExecutorService, stats);
        };
        private ScheduleSources scheduleSources = () -> {
            return ServiceLoader.load(ScheduleSourceFactory.class);
        };
        private StatsFactory statsFactory = environment -> {
            return StyxScheduler.stats(environment);
        };
        private ExecutorFactory executorFactory = Executors::newScheduledThreadPool;
        private PublisherFactory publisherFactory = environment -> {
            return Publisher.NOOP;
        };

        public Builder setTime(Time time) {
            this.time = time;
            return this;
        }

        public Builder setStorageFactory(StorageFactory storageFactory) {
            this.storageFactory = storageFactory;
            return this;
        }

        public Builder setEventStorageFactory(EventStorageFactory eventStorageFactory) {
            this.eventStorageFactory = eventStorageFactory;
            return this;
        }

        public Builder setDockerRunnerFactory(DockerRunnerFactory dockerRunnerFactory) {
            this.dockerRunnerFactory = dockerRunnerFactory;
            return this;
        }

        public Builder setScheduleSources(ScheduleSources scheduleSources) {
            this.scheduleSources = scheduleSources;
            return this;
        }

        public Builder setStatsFactory(StatsFactory statsFactory) {
            this.statsFactory = statsFactory;
            return this;
        }

        public Builder setExecutorFactory(ExecutorFactory executorFactory) {
            this.executorFactory = executorFactory;
            return this;
        }

        public Builder setPublisherFactory(PublisherFactory publisherFactory) {
            this.publisherFactory = publisherFactory;
            return this;
        }

        public StyxScheduler build() {
            return new StyxScheduler(this.time, this.storageFactory, this.eventStorageFactory, this.dockerRunnerFactory, this.scheduleSources, this.statsFactory, this.executorFactory, this.publisherFactory);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:com/spotify/styx/StyxScheduler$DockerRunnerFactory.class */
    public interface DockerRunnerFactory {
        DockerRunner create(String str, Environment environment, StateManager stateManager, ScheduledExecutorService scheduledExecutorService, Stats stats);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:com/spotify/styx/StyxScheduler$ExecutorFactory.class */
    public interface ExecutorFactory {
        ScheduledExecutorService create(int i, ThreadFactory threadFactory);
    }

    /* loaded from: input_file:com/spotify/styx/StyxScheduler$PublisherFactory.class */
    public interface PublisherFactory extends Function<Environment, Publisher> {
    }

    /* loaded from: input_file:com/spotify/styx/StyxScheduler$ScheduleSources.class */
    public interface ScheduleSources extends Supplier<Iterable<ScheduleSourceFactory>> {
    }

    /* loaded from: input_file:com/spotify/styx/StyxScheduler$StateFactory.class */
    public interface StateFactory extends Function<WorkflowInstance, RunState> {
    }

    /* loaded from: input_file:com/spotify/styx/StyxScheduler$StatsFactory.class */
    public interface StatsFactory extends Function<Environment, Stats> {
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    public static StyxScheduler createDefault() {
        return newBuilder().build();
    }

    private StyxScheduler(Time time, StorageFactory storageFactory, EventStorageFactory eventStorageFactory, DockerRunnerFactory dockerRunnerFactory, ScheduleSources scheduleSources, StatsFactory statsFactory, ExecutorFactory executorFactory, PublisherFactory publisherFactory) {
        this.time = (Time) Objects.requireNonNull(time);
        this.storageFactory = (StorageFactory) Objects.requireNonNull(storageFactory);
        this.eventStorageFactory = (EventStorageFactory) Objects.requireNonNull(eventStorageFactory);
        this.dockerRunnerFactory = (DockerRunnerFactory) Objects.requireNonNull(dockerRunnerFactory);
        this.scheduleSources = (ScheduleSources) Objects.requireNonNull(scheduleSources);
        this.statsFactory = (StatsFactory) Objects.requireNonNull(statsFactory);
        this.executorFactory = (ExecutorFactory) Objects.requireNonNull(executorFactory);
        this.publisherFactory = (PublisherFactory) Objects.requireNonNull(publisherFactory);
    }

    public void create(Environment environment) {
        Config config = environment.config();
        Closer closer = environment.closer();
        Thread.UncaughtExceptionHandler uncaughtExceptionHandler = (thread, th) -> {
            LOG.error("Thread {} threw {}", thread, th);
        };
        ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("styx-scheduler-%d").setUncaughtExceptionHandler(uncaughtExceptionHandler).build();
        ThreadFactory build2 = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("styx-event-worker-%d").setUncaughtExceptionHandler(uncaughtExceptionHandler).build();
        ScheduledExecutorService create = this.executorFactory.create(3, build);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(16, build2);
        closer.register(executorCloser("scheduler", create));
        closer.register(executorCloser("event-worker", newFixedThreadPool));
        Stats apply = this.statsFactory.apply(environment);
        Storage meteredStorage = new MeteredStorage((Storage) this.storageFactory.apply(environment), apply, this.time);
        EventStorage meteredEventStorage = new MeteredEventStorage((EventStorage) this.eventStorageFactory.apply(environment), apply, this.time);
        QueuedStateManager queuedStateManager = (QueuedStateManager) closer.register(new QueuedStateManager(TimeoutConfig.createFromConfig(config.getConfig(STYX_STALE_STATE_TTL_CONFIG)), this.time, newFixedThreadPool, meteredEventStorage));
        meteredStorage.getClass();
        OutputHandler[] outputHandlerArr = {ReplayEvents.transitionLogger(""), new DockerRunnerHandler(new MeteredDockerRunner(DockerRunner.routing(str -> {
            return this.dockerRunnerFactory.create(str, environment, queuedStateManager, create, apply);
        }, new CachedSupplier(meteredStorage::globalDockerRunnerId, this.time)), apply, this.time), queuedStateManager), new StorageHandler(meteredStorage, Clock.systemUTC()), new TerminationHandler(DEFAULT_RETRY_BASE_DELAY, 6, queuedStateManager), new MonitoringHandler(this.time, apply), new PublisherHandler(this.publisherFactory.apply(environment)), new ExecutionDescriptionHandler(meteredStorage, queuedStateManager)};
        TriggerListener trigger = trigger(meteredStorage, workflowInstance -> {
            return RunState.fresh(workflowInstance, this.time, outputHandlerArr);
        }, queuedStateManager);
        TickerManager tickerManager = new TickerManager(workflow -> {
            return new TickTock(workflow, trigger, this.time, create);
        });
        WorkflowCache inMemWorkflowCache = new InMemWorkflowCache();
        Consumer<Workflow> workflowChanged = workflowChanged(inMemWorkflowCache, meteredStorage, tickerManager, apply, queuedStateManager);
        tickerManager.getClass();
        Consumer<Workflow> consumer = tickerManager::removeWorkflow;
        restoreState(meteredEventStorage, outputHandlerArr, queuedStateManager);
        startScheduleSources(environment, create, workflowChanged, consumer);
        startRetryChecker(queuedStateManager, create);
        startStateReaper(queuedStateManager, create);
        setupMetrics(queuedStateManager, inMemWorkflowCache, meteredStorage, apply);
        environment.routingEngine().registerAutoRoute(Route.sync("GET", "/ping", requestContext -> {
            return "pong";
        })).registerRoutes(new SchedulerResource(queuedStateManager, trigger, meteredStorage, this.time).routes());
        this.stateManager = queuedStateManager;
    }

    void receive(Event event) throws StateManager.IsClosed {
        this.stateManager.receive(event);
    }

    RunState getState(WorkflowInstance workflowInstance) {
        return this.stateManager.get(workflowInstance);
    }

    private void restoreState(EventStorage eventStorage, OutputHandler[] outputHandlerArr, StateManager stateManager) {
        try {
            Map map = (Map) ReplayEvents.replayActiveStates(eventStorage.readActiveWorkflowInstances(), eventStorage, true).entrySet().stream().collect(Collectors.toMap(entry -> {
                return ((RunState) entry.getKey()).withHandlers(outputHandlerArr).withTime(this.time);
            }, (v0) -> {
                return v0.getValue();
            }));
            stateManager.getClass();
            map.forEach((v1, v2) -> {
                r1.restore(v1, v2);
            });
        } catch (IOException e) {
            throw Throwables.propagate(e);
        }
    }

    private void startScheduleSources(Environment environment, ScheduledExecutorService scheduledExecutorService, Consumer<Workflow> consumer, Consumer<Workflow> consumer2) {
        for (ScheduleSourceFactory scheduleSourceFactory : this.scheduleSources.get()) {
            try {
                LOG.info("Loading auto-discovered ScheduleSource from {}", scheduleSourceFactory);
                scheduleSourceFactory.create(consumer, consumer2, environment, scheduledExecutorService).start();
            } catch (Throwable th) {
                LOG.warn("ScheduleSourceFactory {} threw", scheduleSourceFactory, th);
            }
        }
    }

    private static void startRetryChecker(StateRetrier stateRetrier, ScheduledExecutorService scheduledExecutorService) {
        stateRetrier.getClass();
        scheduledExecutorService.scheduleWithFixedDelay(guard(stateRetrier::triggerRetries), 2L, 2L, TimeUnit.SECONDS);
    }

    private static void startStateReaper(StaleStateReaper staleStateReaper, ScheduledExecutorService scheduledExecutorService) {
        staleStateReaper.getClass();
        scheduledExecutorService.scheduleWithFixedDelay(guard(staleStateReaper::triggerTimeouts), 30L, 30L, TimeUnit.SECONDS);
    }

    private static Runnable guard(Runnable runnable) {
        return () -> {
            try {
                runnable.run();
            } catch (Throwable th) {
                LOG.warn("Guarded runnable threw", th);
            }
        };
    }

    private void setupMetrics(StateManager stateManager, WorkflowCache workflowCache, Storage storage, Stats stats) {
        stateManager.getClass();
        Gauge<Long> gauge = stateManager::getQueuedEventsCount;
        stateManager.getClass();
        Gauge<Long> gauge2 = stateManager::getActiveStatesCount;
        Gauge<Long> gauge3 = () -> {
            return Long.valueOf(workflowCache.all().stream().count());
        };
        Gauge<Long> gauge4 = () -> {
            return Long.valueOf(workflowCache.all().stream().filter(WorkflowValidator::hasDockerConfiguration).count());
        };
        Gauge<Long> gauge5 = () -> {
            try {
                Set enabled = storage.enabled();
                return Long.valueOf(workflowCache.all().stream().filter(WorkflowValidator::hasDockerConfiguration).filter(workflow -> {
                    return enabled.contains(WorkflowId.ofWorkflow(workflow));
                }).count());
            } catch (IOException e) {
                LOG.error("Failed to read enabled status from BigTable", e);
                return 0L;
            }
        };
        stats.registerQueuedEvents(gauge);
        stats.registerActiveStates(gauge2);
        stats.registerWorkflowCount("all", gauge3);
        stats.registerWorkflowCount("configured", gauge4);
        stats.registerWorkflowCount("enabled", gauge5);
    }

    private TriggerListener trigger(Storage storage, StateFactory stateFactory, StateManager stateManager) {
        StateInitializingTrigger stateInitializingTrigger = new StateInitializingTrigger(stateFactory, stateManager, storage);
        return (workflow, str, instant) -> {
            try {
                if (storage.enabled(workflow.id()) && storage.globalEnabled()) {
                    stateInitializingTrigger.event(workflow, str, instant);
                } else {
                    LOG.info("Triggered disabled workflow {}", workflow.endpointId());
                }
            } catch (IOException e) {
                throw Throwables.propagate(e);
            }
        };
    }

    private static Consumer<Workflow> workflowChanged(WorkflowCache workflowCache, Storage storage, TickerManager tickerManager, Stats stats, StateManager stateManager) {
        return workflow -> {
            stats.registerActiveStates(workflow.id(), () -> {
                return Long.valueOf(stateManager.getActiveStatesCount(workflow.id()));
            });
            tickerManager.updateWorkflow(workflow);
            workflowCache.store(workflow);
            try {
                storage.store(workflow);
            } catch (IOException e) {
                LOG.warn("Failed to store workflow " + workflow, e);
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Stats stats(Environment environment) {
        return new MetricsStats((SemanticMetricRegistry) environment.resolve(SemanticMetricRegistry.class));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static StorageFactory storage(Singleton<AggregateStorage> singleton) {
        return environment -> {
            if (!isDevMode(environment.config())) {
                return (Storage) singleton.apply(environment);
            }
            LOG.info("Running Styx in development mode, will use InMemStorage");
            return new InMemStorage();
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static EventStorageFactory eventStorage(Singleton<AggregateStorage> singleton) {
        return environment -> {
            if (!isDevMode(environment.config())) {
                return (EventStorage) singleton.apply(environment);
            }
            LOG.info("Running Styx in development mode, will use NoopEventStorage");
            return new NoopEventStorage();
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static AggregateStorage storage(Environment environment) {
        Config config = environment.config();
        return new AggregateStorage(environment.closer().register(createBigTableConnection(config)), createDatastore(config), DEFAULT_RETRY_BASE_DELAY_BT);
    }

    private static Connection createBigTableConnection(Config config) {
        String string = config.getString(BIGTABLE_PROJECT_ID);
        String string2 = config.getString(BIGTABLE_INSTANCE_ID);
        LOG.info("Creating Bigtable connection for project:{}, instance:{}", string, string2);
        return BigtableConfiguration.connect(string, string2);
    }

    static Datastore createDatastore(Config config) {
        return DatastoreOptions.builder().namespace(config.getString(DATASTORE_NAMESPACE)).projectId(config.getString(DATASTORE_PROJECT)).build().service();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static DockerRunner createDockerRunner(String str, Environment environment, StateManager stateManager, ScheduledExecutorService scheduledExecutorService, Stats stats) {
        Config config = environment.config();
        Closer closer = environment.closer();
        if (!isDevMode(config)) {
            return (DockerRunner) closer.register(DockerRunner.kubernetes(closer.register(getKubernetesClient(config, str)), stateManager, stats));
        }
        LOG.info("Creating LocalDockerRunner");
        return (DockerRunner) closer.register(DockerRunner.local(scheduledExecutorService, stateManager));
    }

    private static KubernetesClient getKubernetesClient(Config config, String str) {
        try {
            NetHttpTransport newTrustedTransport = GoogleNetHttpTransport.newTrustedTransport();
            JsonFactory defaultJsonFactory = Utils.getDefaultJsonFactory();
            Cluster cluster = (Cluster) new Container.Builder(newTrustedTransport, defaultJsonFactory, GoogleCredential.getApplicationDefault(newTrustedTransport, defaultJsonFactory).createScoped(ContainerScopes.all())).setApplicationName(SERVICE_NAME).build().projects().zones().clusters().get(config.getString(GKE_CLUSTER_PREFIX + str + GKE_CLUSTER_PROJECT_ID), config.getString(GKE_CLUSTER_PREFIX + str + GKE_CLUSTER_ZONE), config.getString(GKE_CLUSTER_PREFIX + str + GKE_CLUSTER_ID)).execute();
            return new DefaultKubernetesClient(new ConfigBuilder().withMasterUrl("https://" + cluster.getEndpoint()).withCaCertData(cluster.getMasterAuth().getClusterCaCertificate()).withClientCertData(cluster.getMasterAuth().getClientCertificate()).withClientKeyData(cluster.getMasterAuth().getClientKey()).build());
        } catch (IOException | GeneralSecurityException e) {
            throw Throwables.propagate(e);
        }
    }

    private static Closeable executorCloser(String str, ExecutorService executorService) {
        return () -> {
            LOG.info("Shutting down executor: {}", str);
            executorService.shutdown();
            try {
                executorService.awaitTermination(1L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
            }
            List<Runnable> shutdownNow = executorService.shutdownNow();
            if (shutdownNow.isEmpty()) {
                return;
            }
            LOG.warn("{} task(s) in {} did not execute", Integer.valueOf(shutdownNow.size()), str);
        };
    }

    private static boolean isDevMode(Config config) {
        return STYX_MODE_DEVELOPMENT.equals(config.getString(STYX_MODE));
    }
}
