package edu.iu.dsc.tws.rsched.uploaders.k8s;

import com.google.gson.reflect.TypeToken;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.config.SchedulerContext;
import edu.iu.dsc.tws.api.scheduler.IUploader;
import edu.iu.dsc.tws.api.scheduler.UploaderException;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesContext;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesController;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesUtils;
import edu.iu.dsc.tws.rsched.schedulers.k8s.client.JobEndListener;
import edu.iu.dsc.tws.rsched.schedulers.k8s.client.JobEndWatcher;
import edu.iu.dsc.tws.rsched.utils.FileUtils;
import edu.iu.dsc.tws.rsched.utils.JobUtils;
import io.kubernetes.client.openapi.ApiCallback;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.util.Watch;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/rsched/uploaders/k8s/DirectUploader.class */
public class DirectUploader extends Thread implements IUploader, JobEndListener {
    private static final Logger LOG = Logger.getLogger(DirectUploader.class.getName());
    private CoreV1Api coreApi;
    private ApiClient apiClient;
    private Config config;
    private String namespace;
    private String jobID;
    private String tempJobDir;
    private String localJobPackageFile;
    private Watch<V1Pod> watcher;
    private JobEndWatcher jobEndWatcher;
    private ArrayList<UploaderToPod> uploaders = new ArrayList<>();
    private boolean stopUploader = false;

    public void initialize(Config config, String str) {
        this.config = config;
        this.namespace = KubernetesContext.namespace(this.config);
        this.jobID = str;
    }

    public URI uploadPackage(String str) throws UploaderException {
        this.tempJobDir = str;
        this.localJobPackageFile = str + File.separator + SchedulerContext.jobPackageFileName(this.config);
        KubernetesController.init(this.namespace);
        this.apiClient = KubernetesController.getApiClient();
        this.coreApi = KubernetesController.createCoreV1Api();
        start();
        this.jobEndWatcher = JobEndWatcher.init(this.namespace, this.jobID);
        this.jobEndWatcher.addJobEndListener(this::jobEnded);
        return null;
    }

    private void printLog() {
        LOG.info(((((System.lineSeparator() + System.lineSeparator()) + "Twister2 Client will upload the job package directly to the job pods.\n") + "Twister2 Client needs to run until the job completes. \n") + "###########   Please do not kill the Twister2 Client   ###########\n") + System.lineSeparator());
    }

    /* JADX WARN: Type inference failed for: r3v3, types: [edu.iu.dsc.tws.rsched.uploaders.k8s.DirectUploader$1] */
    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        printLog();
        String jobLabelSelector = KubernetesUtils.jobLabelSelector(this.jobID);
        String str = "/twister2-memory-dir/" + JobUtils.createJobPackageFileName(this.jobID);
        try {
            this.watcher = Watch.createWatch(this.apiClient, this.coreApi.listNamespacedPodCall(this.namespace, (String) null, (Boolean) null, (String) null, (String) null, jobLabelSelector, (Integer) null, (String) null, Integer.MAX_VALUE, Boolean.TRUE, (ApiCallback) null), new TypeToken<Watch.Response<V1Pod>>() { // from class: edu.iu.dsc.tws.rsched.uploaders.k8s.DirectUploader.1
            }.getType());
            try {
                Iterator it = this.watcher.iterator();
                while (it.hasNext()) {
                    Watch.Response response = (Watch.Response) it.next();
                    if (this.stopUploader) {
                        break;
                    }
                    if (response.object != null && ((V1Pod) response.object).getMetadata().getName().startsWith(this.jobID) && KubernetesUtils.isPodRunning((V1Pod) response.object)) {
                        UploaderToPod uploaderToPod = new UploaderToPod(this.namespace, ((V1Pod) response.object).getMetadata().getName(), this.localJobPackageFile, str);
                        uploaderToPod.start();
                        this.uploaders.add(uploaderToPod);
                    }
                }
                closeWatcher();
            } catch (RuntimeException e) {
                if (!this.stopUploader) {
                    throw e;
                }
                LOG.fine("Uploader is stopped.");
            }
        } catch (ApiException e2) {
            LOG.log(Level.SEVERE, "Exception when watching the pods to get the IPs: \nexCode: " + e2.getCode() + "\nresponseBody: " + e2.getResponseBody(), e2);
            throw new RuntimeException(e2);
        }
    }

    public boolean complete() {
        return true;
    }

    private void closeWatcher() {
        if (this.watcher == null) {
            return;
        }
        try {
            this.watcher.close();
        } catch (IOException e) {
            LOG.log(Level.SEVERE, "Exception closing watcher.", (Throwable) e);
        }
        this.watcher = null;
    }

    public void stopUploader() {
        this.stopUploader = true;
        closeWatcher();
        Iterator<UploaderToPod> it = this.uploaders.iterator();
        while (it.hasNext()) {
            it.next().cancelTransfer();
        }
    }

    @Override // edu.iu.dsc.tws.rsched.schedulers.k8s.client.JobEndListener
    public void jobEnded() {
        stopUploader();
        if (FileUtils.deleteDir(this.tempJobDir)) {
            LOG.log(Level.INFO, "CLEANED TEMPORARY DIRECTORY......:" + this.tempJobDir);
        }
    }

    public boolean undo() {
        stopUploader();
        if (this.jobEndWatcher == null) {
            return true;
        }
        this.jobEndWatcher.stopWatcher();
        return true;
    }

    public void close() {
    }
}
