package com.spotify.styx.docker;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.spotify.styx.docker.DockerRunner;
import com.spotify.styx.model.DataEndpoint;
import com.spotify.styx.model.Event;
import com.spotify.styx.model.EventVisitor;
import com.spotify.styx.model.ExecutionDescription;
import com.spotify.styx.model.WorkflowInstance;
import com.spotify.styx.monitoring.Stats;
import com.spotify.styx.state.RunState;
import com.spotify.styx.state.StateManager;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.PodFluent;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.PodSpecFluent;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.ClientPodResource;
import io.fabric8.kubernetes.client.dsl.Watchable;
import java.io.IOException;
import java.net.ProtocolException;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/spotify/styx/docker/KubernetesDockerRunner.class */
public class KubernetesDockerRunner implements DockerRunner {
    static final String NAMESPACE = "default";
    static final String STYX_RUN = "styx-run";
    static final String STYX_WORKFLOW_INSTANCE_ANNOTATION = "styx-workflow-instance";
    static final int POLL_PODS_INTERVAL_SECONDS = 60;
    private static final ScheduledExecutorService EXECUTOR = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("k8s-scheduler-thread-%d").build());
    private final KubernetesClient client;
    private final StateManager stateManager;
    private final Stats stats;
    private Watch watch;

    /* loaded from: input_file:com/spotify/styx/docker/KubernetesDockerRunner$PodWatcher.class */
    public class PodWatcher implements Watcher<Pod> {
        private static final int RECONNECT_DELAY_SECONDS = 1;
        private int lastResourceVersion;

        PodWatcher(int i) {
            this.lastResourceVersion = i;
        }

        public void eventReceived(Watcher.Action action, Pod pod) {
            if (pod == null) {
                return;
            }
            DockerRunner.LOG.info("Pod event for {} at resource version {}", pod.getMetadata().getName(), Integer.valueOf(this.lastResourceVersion));
            DockerRunner.LOG.info("Action: {}", action);
            try {
                KubernetesDockerRunner.this.inspectPod(action, pod);
                this.lastResourceVersion = Integer.parseInt(pod.getMetadata().getResourceVersion());
            } catch (Throwable th) {
                this.lastResourceVersion = Integer.parseInt(pod.getMetadata().getResourceVersion());
                throw th;
            }
        }

        private void reconnect() {
            DockerRunner.LOG.warn("Re-establishing watching from {}", Integer.valueOf(this.lastResourceVersion));
            try {
                ((Watchable) KubernetesDockerRunner.this.client.pods().withResourceVersion(Integer.toString(this.lastResourceVersion))).watch(this);
            } catch (Throwable th) {
                DockerRunner.LOG.warn("Retry threw", th);
                scheduleReconnect();
            }
        }

        private void scheduleReconnect() {
            KubernetesDockerRunner.EXECUTOR.schedule(this::reconnect, 1L, TimeUnit.SECONDS);
        }

        public void onClose(KubernetesClientException kubernetesClientException) {
            DockerRunner.LOG.warn("Watch closed", kubernetesClientException);
            if (kubernetesClientException == null || !(kubernetesClientException.getCause() instanceof ProtocolException)) {
                scheduleReconnect();
            } else {
                this.lastResourceVersion += RECONNECT_DELAY_SECONDS;
                reconnect();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/spotify/styx/docker/KubernetesDockerRunner$PullImageErrorMatcher.class */
    public static class PullImageErrorMatcher implements EventVisitor<Boolean> {
        private PullImageErrorMatcher() {
        }

        /* renamed from: timeTrigger, reason: merged with bridge method [inline-methods] */
        public Boolean m10timeTrigger(WorkflowInstance workflowInstance) {
            return false;
        }

        /* renamed from: triggerExecution, reason: merged with bridge method [inline-methods] */
        public Boolean m22triggerExecution(WorkflowInstance workflowInstance, String str) {
            return false;
        }

        /* renamed from: created, reason: merged with bridge method [inline-methods] */
        public Boolean m9created(WorkflowInstance workflowInstance, String str, String str2) {
            return false;
        }

        /* renamed from: submit, reason: merged with bridge method [inline-methods] */
        public Boolean m21submit(WorkflowInstance workflowInstance, ExecutionDescription executionDescription) {
            return false;
        }

        /* renamed from: submitted, reason: merged with bridge method [inline-methods] */
        public Boolean m20submitted(WorkflowInstance workflowInstance, String str) {
            return false;
        }

        /* renamed from: started, reason: merged with bridge method [inline-methods] */
        public Boolean m19started(WorkflowInstance workflowInstance) {
            return false;
        }

        /* renamed from: terminate, reason: merged with bridge method [inline-methods] */
        public Boolean m18terminate(WorkflowInstance workflowInstance, int i) {
            return false;
        }

        /* renamed from: runError, reason: merged with bridge method [inline-methods] */
        public Boolean m17runError(WorkflowInstance workflowInstance, String str) {
            return Boolean.valueOf(str.contains("failed to pull"));
        }

        /* renamed from: success, reason: merged with bridge method [inline-methods] */
        public Boolean m16success(WorkflowInstance workflowInstance) {
            return false;
        }

        /* renamed from: retryAfter, reason: merged with bridge method [inline-methods] */
        public Boolean m15retryAfter(WorkflowInstance workflowInstance, long j) {
            return false;
        }

        /* renamed from: retry, reason: merged with bridge method [inline-methods] */
        public Boolean m14retry(WorkflowInstance workflowInstance) {
            return false;
        }

        /* renamed from: stop, reason: merged with bridge method [inline-methods] */
        public Boolean m13stop(WorkflowInstance workflowInstance) {
            return false;
        }

        /* renamed from: timeout, reason: merged with bridge method [inline-methods] */
        public Boolean m12timeout(WorkflowInstance workflowInstance) {
            return false;
        }

        /* renamed from: halt, reason: merged with bridge method [inline-methods] */
        public Boolean m11halt(WorkflowInstance workflowInstance) {
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KubernetesDockerRunner(KubernetesClient kubernetesClient, StateManager stateManager, Stats stats) {
        this.stateManager = (StateManager) Objects.requireNonNull(stateManager);
        this.client = (KubernetesClient) ((KubernetesClient) Objects.requireNonNull(kubernetesClient)).inNamespace("default");
        this.stats = (Stats) Objects.requireNonNull(stats);
    }

    @Override // com.spotify.styx.docker.DockerRunner
    public String start(WorkflowInstance workflowInstance, DockerRunner.RunSpec runSpec) throws IOException {
        try {
            return ((Pod) this.client.pods().create(new Pod[]{createPod(workflowInstance, runSpec)})).getMetadata().getName();
        } catch (KubernetesClientException e) {
            throw new IOException("Failed to create Kubernetes pod", e);
        }
    }

    @VisibleForTesting
    static Pod createPod(WorkflowInstance workflowInstance, DockerRunner.RunSpec runSpec) {
        PodFluent.SpecNested specNested;
        PodSpecFluent.ContainersNested withArgs = ((PodBuilder) new PodBuilder().withNewMetadata().withGenerateName("styx-run-").addToAnnotations(STYX_WORKFLOW_INSTANCE_ANNOTATION, workflowInstance.toKey()).endMetadata()).withNewSpec().withRestartPolicy("Never").addNewContainer().withName(STYX_RUN).withImage(runSpec.imageName().contains(":") ? runSpec.imageName() : runSpec.imageName() + ":latest").withArgs(runSpec.args());
        if (runSpec.secret().isPresent()) {
            DataEndpoint.Secret secret = runSpec.secret().get();
            specNested = (PodFluent.SpecNested) ((PodSpecFluent.VolumesNested) ((PodFluent.SpecNested) ((PodSpecFluent.ContainersNested) withArgs.addNewVolumeMount().withName(secret.name()).withMountPath(secret.mountPath()).withReadOnly(true).endVolumeMount()).endContainer()).addNewVolume().withName(secret.name()).withNewSecret().withSecretName(secret.name()).endSecret()).endVolume();
        } else {
            specNested = (PodFluent.SpecNested) withArgs.endContainer();
        }
        return ((PodBuilder) specNested.endSpec()).build();
    }

    @Override // com.spotify.styx.docker.DockerRunner
    public void cleanup(String str) {
        ((ClientPodResource) this.client.pods().withName(str)).delete();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.watch != null) {
            this.watch.close();
        }
    }

    public void init() {
        EXECUTOR.scheduleWithFixedDelay(this::pollPods, 60L, 60L, TimeUnit.SECONDS);
        String resourceVersion = ((PodList) this.client.pods().list()).getMetadata().getResourceVersion();
        this.watch = (Watch) ((Watchable) this.client.pods().withResourceVersion(resourceVersion)).watch(new PodWatcher(Integer.parseInt(resourceVersion)));
    }

    private void pollPods() {
        try {
            Iterator it = ((PodList) this.client.pods().list()).getItems().iterator();
            while (it.hasNext()) {
                inspectPod(Watcher.Action.MODIFIED, (Pod) it.next());
            }
        } catch (Throwable th) {
            LOG.warn("Error while polling pods", th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void inspectPod(Watcher.Action action, Pod pod) {
        Map annotations = pod.getMetadata().getAnnotations();
        if (!annotations.containsKey(STYX_WORKFLOW_INSTANCE_ANNOTATION)) {
            LOG.warn("Got pod without workflow instance annotation {}", pod.getMetadata().getName());
            return;
        }
        WorkflowInstance parseKey = WorkflowInstance.parseKey((String) annotations.get(STYX_WORKFLOW_INSTANCE_ANNOTATION));
        RunState runState = this.stateManager.get(parseKey);
        if (runState == null) {
            LOG.warn("Pod event for unknown or inactive workflow instance {}", parseKey);
            return;
        }
        for (Event event : KubernetesPodEventTranslator.translate(parseKey, runState, action, pod)) {
            if (((Boolean) event.accept(new PullImageErrorMatcher())).booleanValue()) {
                this.stats.pullImageError();
            }
            try {
                this.stateManager.receive(event);
            } catch (StateManager.IsClosed e) {
                LOG.warn("Could not receive kubernetes event", e);
                throw Throwables.propagate(e);
            }
        }
    }
}
