package com.spotify.styx.docker;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.spotify.styx.StyxScheduler;
import com.spotify.styx.docker.DockerRunner;
import com.spotify.styx.model.Event;
import com.spotify.styx.model.EventVisitor;
import com.spotify.styx.model.ExecutionDescription;
import com.spotify.styx.model.TriggerParameters;
import com.spotify.styx.model.WorkflowConfiguration;
import com.spotify.styx.model.WorkflowInstance;
import com.spotify.styx.monitoring.Stats;
import com.spotify.styx.serialization.Json;
import com.spotify.styx.state.Message;
import com.spotify.styx.state.RunState;
import com.spotify.styx.state.StateManager;
import com.spotify.styx.state.Trigger;
import com.spotify.styx.util.CloserUtil;
import com.spotify.styx.util.Debug;
import com.spotify.styx.util.EventUtil;
import com.spotify.styx.util.GrpcContextUtil;
import com.spotify.styx.util.GuardedRunnable;
import com.spotify.styx.util.IsClosedException;
import com.spotify.styx.util.Time;
import com.spotify.styx.util.TriggerUtil;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.ContainerStateTerminated;
import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.PodSpecBuilder;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.QuantityBuilder;
import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
import io.fabric8.kubernetes.api.model.Secret;
import io.fabric8.kubernetes.api.model.SecretVolumeSourceBuilder;
import io.fabric8.kubernetes.api.model.Volume;
import io.fabric8.kubernetes.api.model.VolumeBuilder;
import io.fabric8.kubernetes.api.model.VolumeMount;
import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.PodResource;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.norberg.automatter.AutoMatter;
import io.opencensus.common.Scope;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import io.opencensus.trace.samplers.Samplers;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.format.DateTimeParseException;
import java.time.temporal.TemporalAmount;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javaslang.Tuple2;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/spotify/styx/docker/KubernetesDockerRunner.class */
public class KubernetesDockerRunner implements DockerRunner {
    static final String STYX_WORKFLOW_INSTANCE_ANNOTATION = "styx-workflow-instance";
    static final String DOCKER_TERMINATION_LOGGING_ANNOTATION = "styx-docker-termination-logging";
    static final String COMPONENT_ID = "STYX_COMPONENT_ID";
    static final String WORKFLOW_ID = "STYX_WORKFLOW_ID";
    static final String SERVICE_ACCOUNT = "STYX_SERVICE_ACCOUNT";
    static final String DOCKER_ARGS = "STYX_DOCKER_ARGS";
    static final String DOCKER_IMAGE = "STYX_DOCKER_IMAGE";
    static final String COMMIT_SHA = "STYX_COMMIT_SHA";
    static final String PARAMETER = "STYX_PARAMETER";
    static final String EXECUTION_ID = "STYX_EXECUTION_ID";
    static final String TERMINATION_LOG = "STYX_TERMINATION_LOG";
    static final String TRIGGER_ID = "STYX_TRIGGER_ID";
    static final String TRIGGER_TYPE = "STYX_TRIGGER_TYPE";
    static final String ENVIRONMENT = "STYX_ENVIRONMENT";
    static final String LOGGING = "STYX_LOGGING";
    private static final int DEFAULT_POLL_PODS_INTERVAL_SECONDS = 60;
    private static final int DEFAULT_POD_DELETION_DELAY_SECONDS = 120;
    private static final long PROCESS_POD_UPDATE_INTERVAL_SECONDS = 5;
    private static final int K8S_EVENT_PROCESSING_THREADS = 32;
    static final String STYX_WORKFLOW_SA_ENV_VARIABLE = "GOOGLE_APPLICATION_CREDENTIALS";
    static final String STYX_WORKFLOW_SA_SECRET_NAME = "styx-wf-sa-keys";
    private static final String STYX_WORKFLOW_SA_JSON_KEY = "styx-wf-sa.json";
    static final String STYX_WORKFLOW_SA_SECRET_MOUNT_PATH = "/etc/styx-wf-sa-keys/";
    static final String KEEPALIVE_CONTAINER_NAME = "keepalive";
    static final String MAIN_CONTAINER_NAME = "styx-run";
    private final Closer closer;
    private final ScheduledExecutorService executor;
    private final KubernetesClient client;
    private final StateManager stateManager;
    private final Stats stats;
    private final KubernetesGCPServiceAccountSecretManager serviceAccountSecretManager;
    private final Debug debug;
    private final String styxEnvironment;
    private final int pollPodsIntervalSeconds;
    private final int podDeletionDelaySeconds;
    private final Time time;
    private final ExecutorService eventExecutor;
    private Watch watch;
    private static final Tracer tracer = Tracing.getTracer();
    private static final Time DEFAULT_TIME = Instant::now;
    private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("k8s-scheduler-thread-%d").build();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.spotify.styx.docker.KubernetesDockerRunner$1, reason: invalid class name */
    /* loaded from: input_file:com/spotify/styx/docker/KubernetesDockerRunner$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$spotify$styx$state$RunState$State = new int[RunState.State.values().length];

        static {
            try {
                $SwitchMap$com$spotify$styx$state$RunState$State[RunState.State.TERMINATED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$spotify$styx$state$RunState$State[RunState.State.FAILED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$spotify$styx$state$RunState$State[RunState.State.ERROR.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$spotify$styx$state$RunState$State[RunState.State.DONE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoMatter
    /* loaded from: input_file:com/spotify/styx/docker/KubernetesDockerRunner$KubernetesSecretSpec.class */
    public interface KubernetesSecretSpec {
        Optional<WorkflowConfiguration.Secret> customSecret();

        Optional<String> serviceAccountSecret();

        static KubernetesSecretSpecBuilder builder() {
            return new KubernetesSecretSpecBuilder();
        }
    }

    /* loaded from: input_file:com/spotify/styx/docker/KubernetesDockerRunner$PodWatcher.class */
    public class PodWatcher implements Watcher<Pod> {
        private static final int RECONNECT_DELAY_SECONDS = 1;
        private final ConcurrentMap<String, WorkflowInstance> podUpdates = new ConcurrentHashMap();

        public PodWatcher() {
        }

        public void eventReceived(Watcher.Action action, Pod pod) {
            if (pod == null) {
                return;
            }
            KubernetesDockerRunner.this.logEvent(action, pod, pod.getMetadata().getResourceVersion(), false);
            if (action == Watcher.Action.DELETED) {
                return;
            }
            Optional<WorkflowInstance> readPodWorkflowInstance = KubernetesDockerRunner.readPodWorkflowInstance(pod);
            if (readPodWorkflowInstance.isPresent()) {
                this.podUpdates.put(pod.getMetadata().getName(), readPodWorkflowInstance.get());
            }
        }

        void processPodUpdates() {
            ImmutableSet copyOf = ImmutableSet.copyOf(this.podUpdates.keySet());
            DockerRunner.LOG.debug("Processing pod updates: {}", Integer.valueOf(copyOf.size()));
            ((List) copyOf.stream().map(str -> {
                WorkflowInstance remove = this.podUpdates.remove(str);
                return CompletableFuture.runAsync(() -> {
                    processPodUpdate(str, remove);
                }, KubernetesDockerRunner.this.eventExecutor);
            }).collect(Collectors.toList())).forEach((v0) -> {
                v0.join();
            });
        }

        private void processPodUpdate(String str, WorkflowInstance workflowInstance) {
            DockerRunner.LOG.debug("Processing pod update: {}: {}", str, workflowInstance);
            Pod pod = (Pod) ((PodResource) KubernetesDockerRunner.this.client.pods().withName(str)).get();
            if (pod == null) {
                return;
            }
            Optional<RunState> lookupPodRunState = KubernetesDockerRunner.this.lookupPodRunState(pod, workflowInstance);
            if (lookupPodRunState.isPresent()) {
                KubernetesDockerRunner.this.emitPodEvents(pod, lookupPodRunState.get());
            }
        }

        private void reconnect() {
            DockerRunner.LOG.warn("Re-establishing pod watcher");
            try {
                KubernetesDockerRunner.this.watch = (Watch) KubernetesDockerRunner.this.client.pods().watch(this);
            } catch (Throwable th) {
                DockerRunner.LOG.warn("Retry threw", th);
                scheduleReconnect();
            }
        }

        private void scheduleReconnect() {
            KubernetesDockerRunner.this.executor.schedule(this::reconnect, 1L, TimeUnit.SECONDS);
        }

        public void onClose(KubernetesClientException kubernetesClientException) {
            DockerRunner.LOG.warn("Watch closed", kubernetesClientException);
            scheduleReconnect();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/spotify/styx/docker/KubernetesDockerRunner$PullImageErrorMatcher.class */
    public static class PullImageErrorMatcher implements EventVisitor<Boolean> {
        private PullImageErrorMatcher() {
        }

        /* renamed from: timeTrigger, reason: merged with bridge method [inline-methods] */
        public Boolean m14timeTrigger(WorkflowInstance workflowInstance) {
            return false;
        }

        /* renamed from: triggerExecution, reason: merged with bridge method [inline-methods] */
        public Boolean m27triggerExecution(WorkflowInstance workflowInstance, Trigger trigger, TriggerParameters triggerParameters) {
            return false;
        }

        /* renamed from: info, reason: merged with bridge method [inline-methods] */
        public Boolean m26info(WorkflowInstance workflowInstance, Message message) {
            return false;
        }

        public Boolean dequeue(WorkflowInstance workflowInstance, Set<String> set) {
            return false;
        }

        /* renamed from: created, reason: merged with bridge method [inline-methods] */
        public Boolean m13created(WorkflowInstance workflowInstance, String str, String str2) {
            return false;
        }

        /* renamed from: submit, reason: merged with bridge method [inline-methods] */
        public Boolean m24submit(WorkflowInstance workflowInstance, ExecutionDescription executionDescription, String str) {
            return false;
        }

        /* renamed from: submitted, reason: merged with bridge method [inline-methods] */
        public Boolean m23submitted(WorkflowInstance workflowInstance, String str) {
            return false;
        }

        /* renamed from: started, reason: merged with bridge method [inline-methods] */
        public Boolean m22started(WorkflowInstance workflowInstance) {
            return false;
        }

        public Boolean terminate(WorkflowInstance workflowInstance, Optional<Integer> optional) {
            return false;
        }

        /* renamed from: runError, reason: merged with bridge method [inline-methods] */
        public Boolean m20runError(WorkflowInstance workflowInstance, String str) {
            return Boolean.valueOf(str.contains("failed to pull"));
        }

        /* renamed from: success, reason: merged with bridge method [inline-methods] */
        public Boolean m19success(WorkflowInstance workflowInstance) {
            return false;
        }

        /* renamed from: retryAfter, reason: merged with bridge method [inline-methods] */
        public Boolean m18retryAfter(WorkflowInstance workflowInstance, long j) {
            return false;
        }

        /* renamed from: retry, reason: merged with bridge method [inline-methods] */
        public Boolean m12retry(WorkflowInstance workflowInstance) {
            return false;
        }

        /* renamed from: stop, reason: merged with bridge method [inline-methods] */
        public Boolean m17stop(WorkflowInstance workflowInstance) {
            return false;
        }

        /* renamed from: timeout, reason: merged with bridge method [inline-methods] */
        public Boolean m16timeout(WorkflowInstance workflowInstance) {
            return false;
        }

        /* renamed from: halt, reason: merged with bridge method [inline-methods] */
        public Boolean m15halt(WorkflowInstance workflowInstance) {
            return false;
        }

        /* renamed from: terminate, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m21terminate(WorkflowInstance workflowInstance, Optional optional) {
            return terminate(workflowInstance, (Optional<Integer>) optional);
        }

        /* renamed from: dequeue, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m25dequeue(WorkflowInstance workflowInstance, Set set) {
            return dequeue(workflowInstance, (Set<String>) set);
        }
    }

    KubernetesDockerRunner(NamespacedKubernetesClient namespacedKubernetesClient, StateManager stateManager, Stats stats, KubernetesGCPServiceAccountSecretManager kubernetesGCPServiceAccountSecretManager, Debug debug, String str, int i, int i2, Time time, ScheduledExecutorService scheduledExecutorService) {
        this.closer = Closer.create();
        this.stateManager = (StateManager) Objects.requireNonNull(stateManager);
        this.client = (KubernetesClient) Objects.requireNonNull(namespacedKubernetesClient);
        this.stats = (Stats) Objects.requireNonNull(stats);
        this.serviceAccountSecretManager = (KubernetesGCPServiceAccountSecretManager) Objects.requireNonNull(kubernetesGCPServiceAccountSecretManager);
        this.debug = debug;
        this.styxEnvironment = str;
        this.pollPodsIntervalSeconds = i;
        this.podDeletionDelaySeconds = i2;
        this.time = (Time) Objects.requireNonNull(time);
        this.executor = (ScheduledExecutorService) CloserUtil.register(this.closer, (ScheduledExecutorService) Objects.requireNonNull(scheduledExecutorService), "kubernetes-poll");
        this.eventExecutor = GrpcContextUtil.currentContextExecutorService(CloserUtil.register(this.closer, new ForkJoinPool(32), "kubernetes-event"));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KubernetesDockerRunner(NamespacedKubernetesClient namespacedKubernetesClient, StateManager stateManager, Stats stats, KubernetesGCPServiceAccountSecretManager kubernetesGCPServiceAccountSecretManager, Debug debug, String str) {
        this(namespacedKubernetesClient, stateManager, stats, kubernetesGCPServiceAccountSecretManager, debug, str, DEFAULT_POLL_PODS_INTERVAL_SECONDS, DEFAULT_POD_DELETION_DELAY_SECONDS, DEFAULT_TIME, Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY));
    }

    @Override // com.spotify.styx.docker.DockerRunner
    public void start(WorkflowInstance workflowInstance, DockerRunner.RunSpec runSpec) throws IOException {
        KubernetesSecretSpec ensureSecrets = ensureSecrets(workflowInstance, runSpec);
        this.stats.recordSubmission(runSpec.executionId());
        try {
            this.client.pods().create(new Pod[]{createPod(workflowInstance, runSpec, ensureSecrets, this.styxEnvironment)});
        } catch (KubernetesClientException e) {
            throw new IOException("Failed to create Kubernetes pod", e);
        }
    }

    @Override // com.spotify.styx.docker.DockerRunner
    public void cleanup() throws IOException {
        this.serviceAccountSecretManager.cleanup();
    }

    private KubernetesSecretSpec ensureSecrets(WorkflowInstance workflowInstance, DockerRunner.RunSpec runSpec) {
        return KubernetesSecretSpec.builder().customSecret((Optional<? extends WorkflowConfiguration.Secret>) ensureCustomSecret(workflowInstance, runSpec)).serviceAccountSecret((Optional<? extends String>) runSpec.serviceAccount().map(str -> {
            return this.serviceAccountSecretManager.ensureServiceAccountKeySecret(workflowInstance.workflowId().toString(), str);
        })).build();
    }

    private Optional<WorkflowConfiguration.Secret> ensureCustomSecret(WorkflowInstance workflowInstance, DockerRunner.RunSpec runSpec) {
        return runSpec.secret().map(secret -> {
            if (secret.name().startsWith(STYX_WORKFLOW_SA_SECRET_NAME)) {
                LOG.warn("[AUDIT] Workflow {} refers to secret {} with managed service account key secret name prefix, denying execution", workflowInstance.workflowId(), secret.name());
                throw new InvalidExecutionException("Referenced secret '" + secret.name() + "' has the managed service account key secret name prefix");
            }
            if (STYX_WORKFLOW_SA_SECRET_MOUNT_PATH.equals(secret.mountPath())) {
                LOG.warn("[AUDIT] Workflow {} tries to mount secret {} to the reserved path", workflowInstance.workflowId(), secret.name());
                throw new InvalidExecutionException("Referenced secret '" + secret.name() + "' has the mount path /etc/styx-wf-sa-keys/ defined that is reserved");
            }
            if (((Secret) ((Resource) this.client.secrets().withName(secret.name())).get()) == null) {
                LOG.warn("[AUDIT] Workflow {} refers to a non-existent secret {}", workflowInstance.workflowId(), secret.name());
                throw new InvalidExecutionException("Referenced secret '" + secret.name() + "' was not found");
            }
            LOG.info("[AUDIT] Workflow {} refers to secret {}", workflowInstance.workflowId(), secret.name());
            return secret;
        });
    }

    @VisibleForTesting
    static Pod createPod(WorkflowInstance workflowInstance, DockerRunner.RunSpec runSpec, KubernetesSecretSpec kubernetesSecretSpec, String str) {
        String imageName = runSpec.imageName().contains(":") ? runSpec.imageName() : runSpec.imageName() + ":latest";
        PodBuilder podBuilder = (PodBuilder) new PodBuilder().withNewMetadata().withName(runSpec.executionId()).addToAnnotations(STYX_WORKFLOW_INSTANCE_ANNOTATION, workflowInstance.toKey()).addToAnnotations(DOCKER_TERMINATION_LOGGING_ANNOTATION, String.valueOf(runSpec.terminationLogging())).endMetadata();
        PodSpecBuilder withRestartPolicy = new PodSpecBuilder().withRestartPolicy("Never");
        ResourceRequirementsBuilder resourceRequirementsBuilder = new ResourceRequirementsBuilder();
        runSpec.memRequest().ifPresent(str2 -> {
            resourceRequirementsBuilder.addToRequests("memory", new Quantity(str2));
        });
        runSpec.memLimit().ifPresent(str3 -> {
            resourceRequirementsBuilder.addToLimits("memory", new Quantity(str3));
        });
        ContainerBuilder withResources = new ContainerBuilder().withName(MAIN_CONTAINER_NAME).withImage(imageName).withArgs(runSpec.args()).withEnv(buildEnv(workflowInstance, runSpec, str)).withResources(resourceRequirementsBuilder.build());
        kubernetesSecretSpec.serviceAccountSecret().ifPresent(str4 -> {
            Volume build = new VolumeBuilder().withName(STYX_WORKFLOW_SA_SECRET_NAME).withSecret(new SecretVolumeSourceBuilder().withSecretName(str4).build()).build();
            withRestartPolicy.addToVolumes(new Volume[]{build});
            VolumeMount build2 = new VolumeMountBuilder().withMountPath(STYX_WORKFLOW_SA_SECRET_MOUNT_PATH).withName(build.getName()).withReadOnly(true).build();
            withResources.addToVolumeMounts(new VolumeMount[]{build2});
            withResources.addToEnv(new EnvVar[]{envVar(STYX_WORKFLOW_SA_ENV_VARIABLE, build2.getMountPath() + "styx-wf-sa.json")});
        });
        kubernetesSecretSpec.customSecret().ifPresent(secret -> {
            Volume build = new VolumeBuilder().withName(secret.name()).withSecret(new SecretVolumeSourceBuilder().withSecretName(secret.name()).build()).build();
            withRestartPolicy.addToVolumes(new Volume[]{build});
            withResources.addToVolumeMounts(new VolumeMount[]{new VolumeMountBuilder().withMountPath(secret.mountPath()).withName(build.getName()).withReadOnly(true).build()});
        });
        withRestartPolicy.addToContainers(new Container[]{withResources.build()});
        withRestartPolicy.addToContainers(new Container[]{keepaliveContainer()});
        podBuilder.withSpec(withRestartPolicy.build());
        return podBuilder.build();
    }

    private static Container keepaliveContainer() {
        return ((ContainerBuilder) new ContainerBuilder().withName(KEEPALIVE_CONTAINER_NAME).withImage("k8s.gcr.io/pause:3.1").withNewResources().addToRequests("cpu", new QuantityBuilder().withAmount("0").build()).addToRequests("memory", new QuantityBuilder().withAmount("0").build()).endResources()).build();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public static EnvVar envVar(String str, String str2) {
        return new EnvVarBuilder().withName(str).withValue(str2).build();
    }

    private static List<EnvVar> buildEnv(WorkflowInstance workflowInstance, DockerRunner.RunSpec runSpec, String str) {
        HashMap hashMap = new HashMap(runSpec.env());
        hashMap.put(COMPONENT_ID, workflowInstance.workflowId().componentId());
        hashMap.put(WORKFLOW_ID, workflowInstance.workflowId().id());
        hashMap.put(PARAMETER, workflowInstance.parameter());
        hashMap.put(COMMIT_SHA, runSpec.commitSha().orElse(""));
        hashMap.put(SERVICE_ACCOUNT, runSpec.serviceAccount().orElse(""));
        hashMap.put(DOCKER_ARGS, String.join(" ", runSpec.args()));
        hashMap.put(DOCKER_IMAGE, runSpec.imageName());
        hashMap.put(EXECUTION_ID, runSpec.executionId());
        hashMap.put(TERMINATION_LOG, "/dev/termination-log");
        hashMap.put(TRIGGER_ID, (String) runSpec.trigger().map(TriggerUtil::triggerId).orElse(null));
        hashMap.put(TRIGGER_TYPE, (String) runSpec.trigger().map(TriggerUtil::triggerType).orElse(null));
        hashMap.put(ENVIRONMENT, str);
        hashMap.put(LOGGING, "structured");
        return (List) hashMap.entrySet().stream().map(entry -> {
            return envVar((String) entry.getKey(), (String) entry.getValue());
        }).collect(Collectors.toList());
    }

    @Override // com.spotify.styx.docker.DockerRunner
    public void cleanup(WorkflowInstance workflowInstance, String str) {
    }

    @VisibleForTesting
    void cleanupWithRunState(WorkflowInstance workflowInstance, Pod pod, RunState runState) {
        Optional<ContainerStatus> mainContainerStatus = getMainContainerStatus(pod);
        if (mainContainerStatus.isPresent() && !wantsPod(runState)) {
            if (KubernetesPodEventTranslator.isTerminated(mainContainerStatus.get())) {
                deletePodIfNonDeletePeriodExpired(workflowInstance, pod);
            } else if (KubernetesPodEventTranslator.hasPullImageError(mainContainerStatus.get())) {
                deletePod(workflowInstance, pod, "Pull image error");
            }
        }
    }

    private boolean wantsPod(RunState runState) {
        switch (AnonymousClass1.$SwitchMap$com$spotify$styx$state$RunState$State[runState.state().ordinal()]) {
            case 1:
            case 2:
            case 3:
            case StyxScheduler.DEFAULT_RETRY_MAX_EXPONENT /* 4 */:
                return false;
            default:
                return true;
        }
    }

    @VisibleForTesting
    void cleanupWithoutRunState(WorkflowInstance workflowInstance, Pod pod) {
        if (KubernetesPodEventTranslator.isTerminated(pod)) {
            deletePodIfNonDeletePeriodExpired(workflowInstance, pod);
        } else {
            refreshAndDeleteNonTerminatedPodWithoutRunState(workflowInstance, pod.getMetadata().getName());
        }
    }

    private void refreshAndDeleteNonTerminatedPodWithoutRunState(WorkflowInstance workflowInstance, String str) {
        Pod pod = (Pod) ((PodResource) this.client.pods().withName(str)).get();
        if (pod == null || KubernetesPodEventTranslator.isTerminated(pod)) {
            return;
        }
        deletePod(workflowInstance, pod, "No RunState, not terminated");
    }

    private void deletePodIfNonDeletePeriodExpired(WorkflowInstance workflowInstance, Pod pod) {
        if (((Boolean) getMainContainerStatus(pod).map(this::isNonDeletePeriodExpired).orElse(false)).booleanValue()) {
            deletePod(workflowInstance, pod, "Terminated and expired");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Optional<ContainerStatus> getMainContainerStatus(Pod pod) {
        return readPodWorkflowInstance(pod).flatMap(workflowInstance -> {
            return pod.getStatus().getContainerStatuses().stream().filter(containerStatus -> {
                return isMainContainer(containerStatus.getName(), pod);
            }).findAny();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public static boolean isMainContainer(String str, Pod pod) {
        return str.equals(MAIN_CONTAINER_NAME);
    }

    private boolean isNonDeletePeriodExpired(ContainerStatus containerStatus) {
        ContainerStateTerminated terminated = containerStatus.getState().getTerminated();
        if (terminated.getFinishedAt() == null) {
            return true;
        }
        try {
            return Instant.parse(terminated.getFinishedAt()).isBefore(((Instant) this.time.get()).minus((TemporalAmount) Duration.ofSeconds(this.podDeletionDelaySeconds)));
        } catch (DateTimeParseException e) {
            LOG.warn("Failed to parse container state terminated finishedAt: '{}'", terminated.getFinishedAt(), e);
            return true;
        }
    }

    @VisibleForTesting
    void deletePod(WorkflowInstance workflowInstance, Pod pod, String str) {
        String name = pod.getMetadata().getName();
        if (((Boolean) this.debug.get()).booleanValue()) {
            LOG.info("Keeping {} pod: {}, reason: '{}'", new Object[]{workflowInstance, name, str});
        } else {
            ((PodResource) this.client.pods().withName(name)).delete();
            LOG.info("Deleted {} pod: {}, reason: '{}'", new Object[]{workflowInstance, name, str});
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.watch != null) {
            this.watch.close();
        }
        this.closer.close();
    }

    @Override // com.spotify.styx.docker.DockerRunner
    public void restore() {
        try {
            RetryerBuilder.newBuilder().retryIfException().withWaitStrategy(WaitStrategies.exponentialWait(10L, TimeUnit.SECONDS)).withStopStrategy(StopStrategies.stopAfterAttempt(10)).withRetryListener(this::onRestorePollPodsAttempt).build().call(Executors.callable(this::tryPollPods));
        } catch (ExecutionException | RetryException e) {
            throw new RuntimeException(e);
        }
    }

    private <V> void onRestorePollPodsAttempt(Attempt<V> attempt) {
        if (attempt.hasException()) {
            LOG.warn("restore: failed polling pods, attempt = {}", Long.valueOf(attempt.getAttemptNumber()), attempt.getExceptionCause());
        }
    }

    public void init() {
        this.executor.scheduleWithFixedDelay(this::pollPods, this.pollPodsIntervalSeconds, this.pollPodsIntervalSeconds, TimeUnit.SECONDS);
        PodWatcher podWatcher = new PodWatcher();
        ScheduledExecutorService scheduledExecutorService = this.executor;
        Objects.requireNonNull(podWatcher);
        scheduledExecutorService.scheduleWithFixedDelay(GuardedRunnable.guard(podWatcher::processPodUpdates), PROCESS_POD_UPDATE_INTERVAL_SECONDS, PROCESS_POD_UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS);
        this.watch = (Watch) this.client.pods().watch(podWatcher);
    }

    private void examineRunningWFISandAssociatedPods(Map<WorkflowInstance, RunState> map, PodList podList) {
        Map filterValues = Maps.filterValues(map, runState -> {
            return runState.state().equals(RunState.State.RUNNING) && runState.data().executionId().isPresent();
        });
        Set set = (Set) podList.getItems().stream().map(pod -> {
            return pod.getMetadata().getAnnotations();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).map(map2 -> {
            return (String) map2.get(STYX_WORKFLOW_INSTANCE_ANNOTATION);
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).map(WorkflowInstance::parseKey).collect(Collectors.toSet());
        filterValues.forEach((workflowInstance, runState2) -> {
            if (set.contains(workflowInstance)) {
                return;
            }
            if (((Pod) ((PodResource) this.client.pods().withName((String) runState2.data().executionId().get())).get()) != null) {
                return;
            }
            this.stateManager.receiveIgnoreClosed(Event.runError(workflowInstance, "No pod associated with this instance"), runState2.counter());
        });
    }

    private void pollPods() {
        try {
            Scope startScopedSpan = tracer.spanBuilder("Styx.KubernetesDockerRunner.pollPods").setRecordEvents(true).setSampler(Samplers.alwaysSample()).startScopedSpan();
            try {
                tryPollPods();
                if (startScopedSpan != null) {
                    startScopedSpan.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            LOG.warn("Error while polling pods", th);
        }
    }

    @VisibleForTesting
    synchronized void tryPollPods() {
        PodList podList = (PodList) this.client.pods().list();
        Tuple2<Predicate<WorkflowInstance>, Map<WorkflowInstance, RunState>> activeStatesPartial = this.stateManager.getActiveStatesPartial();
        Predicate predicate = (Predicate) activeStatesPartial._1;
        examineRunningWFISandAssociatedPods((Map) activeStatesPartial._2, podList);
        for (Pod pod : podList.getItems()) {
            logEvent(Watcher.Action.MODIFIED, pod, podList.getMetadata().getResourceVersion(), true);
            Optional<WorkflowInstance> readPodWorkflowInstance = readPodWorkflowInstance(pod);
            if (readPodWorkflowInstance.isPresent()) {
                if (predicate.test(readPodWorkflowInstance.get())) {
                    LOG.debug("Ignoring unavailable workflow instance: {}", readPodWorkflowInstance.get());
                } else {
                    RunState runState = (RunState) ((Map) activeStatesPartial._2).get(readPodWorkflowInstance.get());
                    if (runState == null || !isPodRunState(pod, runState)) {
                        cleanupWithoutRunState(readPodWorkflowInstance.get(), pod);
                    } else {
                        emitPodEvents(pod, runState);
                        cleanupWithRunState(readPodWorkflowInstance.get(), pod, runState);
                    }
                }
            }
        }
    }

    private static Optional<WorkflowInstance> readPodWorkflowInstance(Pod pod) {
        Map annotations = pod.getMetadata().getAnnotations();
        String name = pod.getMetadata().getName();
        if (annotations != null && annotations.containsKey(STYX_WORKFLOW_INSTANCE_ANNOTATION)) {
            return Optional.of(WorkflowInstance.parseKey((String) annotations.get(STYX_WORKFLOW_INSTANCE_ANNOTATION)));
        }
        LOG.warn("[AUDIT] Got pod without workflow instance annotation {}", name);
        return Optional.empty();
    }

    private Optional<RunState> lookupPodRunState(Pod pod, WorkflowInstance workflowInstance) {
        Optional<RunState> activeState = this.stateManager.getActiveState(workflowInstance);
        if (activeState.isPresent()) {
            return activeState.filter(runState -> {
                return isPodRunState(pod, runState);
            });
        }
        LOG.debug("Pod event for unknown or inactive workflow instance {}", workflowInstance);
        return Optional.empty();
    }

    private boolean isPodRunState(Pod pod, RunState runState) {
        String name = pod.getMetadata().getName();
        Optional executionId = runState.data().executionId();
        if (!executionId.isPresent()) {
            LOG.debug("Pod event for state with no current executionId: {}", name);
            return false;
        }
        String str = (String) executionId.get();
        if (name.equals(str)) {
            return true;
        }
        LOG.debug("Pod event not matching current exec id, current:{} != pod:{}", str, name);
        return false;
    }

    private void emitPodEvents(Pod pod, RunState runState) {
        List<Event> translate = KubernetesPodEventTranslator.translate(runState.workflowInstance(), runState, pod, this.stats);
        for (int i = 0; i < translate.size(); i++) {
            Event event = translate.get(i);
            if (((Boolean) event.accept(new PullImageErrorMatcher())).booleanValue()) {
                this.stats.recordPullImageError();
            }
            if (EventUtil.name(event).equals("started")) {
                Optional executionId = runState.data().executionId();
                Stats stats = this.stats;
                Objects.requireNonNull(stats);
                executionId.ifPresent(stats::recordRunning);
            }
            try {
                this.stateManager.receive(event, runState.counter() + i);
            } catch (IsClosedException e) {
                LOG.warn("Could not receive kubernetes event", e);
                throw new RuntimeException((Throwable) e);
            }
        }
    }

    private void logEvent(Watcher.Action action, Pod pod, String str, boolean z) {
        String name = pod.getMetadata().getName();
        String str2 = (String) Optional.ofNullable(pod.getMetadata().getAnnotations()).map(map -> {
            return (String) map.get(STYX_WORKFLOW_INSTANCE_ANNOTATION);
        }).orElse("N/A");
        String readStatus = readStatus(pod);
        Logger logger = LOG;
        Object[] objArr = new Object[7];
        objArr[0] = z ? "Polled: " : "";
        objArr[1] = name;
        objArr[2] = pod.getMetadata().getUid();
        objArr[3] = str;
        objArr[4] = action;
        objArr[5] = str2;
        objArr[6] = readStatus;
        logger.info("{}Pod event for {} ({}) at resource version {}, action: {}, workflow instance: {}, status: {}", objArr);
    }

    private String readStatus(Pod pod) {
        try {
            return Json.OBJECT_MAPPER.writeValueAsString(pod.getStatus());
        } catch (JsonProcessingException e) {
            return pod.getStatus().toString();
        }
    }
}
