package edu.iu.dsc.tws.rsched.bootstrap;

import com.google.protobuf.InvalidProtocolBufferException;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.exceptions.TimeoutException;
import edu.iu.dsc.tws.api.resource.ControllerContext;
import edu.iu.dsc.tws.api.resource.IWorkerController;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.proto.system.job.JobAPI;
import edu.iu.dsc.tws.proto.utils.WorkerInfoUtils;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.nodes.PersistentNode;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;

/* loaded from: input_file:edu/iu/dsc/tws/rsched/bootstrap/ZKWorkerController.class */
public class ZKWorkerController implements IWorkerController {
    public static final Logger LOG = Logger.getLogger(ZKWorkerController.class.getName());
    private String workerIP;
    private int workerPort;
    private JobMasterAPI.WorkerInfo workerInfo;
    private int numberOfWorkers;
    private String jobName;
    private JobMasterAPI.NodeInfo nodeInfo;
    private JobAPI.ComputeResource computeResource;
    private CuratorFramework client;
    private String jobPath;
    private PersistentNode jobZNode;
    private PathChildrenCache childrenCache;
    private DistributedAtomicInteger daiForWorkerID;
    private DistributedAtomicInteger daiForBarrier;
    private DistributedBarrier barrier;
    private Config config;

    public ZKWorkerController(Config config, String str, String str2, int i, JobMasterAPI.NodeInfo nodeInfo, JobAPI.ComputeResource computeResource) {
        this.config = config;
        this.jobName = str;
        this.numberOfWorkers = i;
        this.nodeInfo = nodeInfo;
        this.jobPath = ZKUtil.constructJobPath(config, str);
        this.computeResource = computeResource;
        String[] split = str2.split(":");
        this.workerIP = split[0];
        this.workerPort = Integer.parseInt(split[1]);
    }

    public boolean initialize() {
        try {
            this.client = CuratorFrameworkFactory.newClient(ZKContext.zooKeeperServerAddresses(this.config), new ExponentialBackoffRetry(1000, 3));
            this.client.start();
            this.barrier = new DistributedBarrier(this.client, ZKUtil.constructBarrierPath(this.config, this.jobName));
            this.daiForWorkerID = new DistributedAtomicInteger(this.client, ZKUtil.constructDaiPathForWorkerID(this.config, this.jobName), new ExponentialBackoffRetry(1000, 3));
            this.daiForBarrier = new DistributedAtomicInteger(this.client, ZKUtil.constructDaiPathForBarrier(this.config, this.jobName), new ExponentialBackoffRetry(1000, 3));
            if (this.client.checkExists().forPath(this.jobPath) == null) {
                this.workerInfo = WorkerInfoUtils.createWorkerInfo(createWorkerID(), this.workerIP, this.workerPort, this.nodeInfo, this.computeResource);
                createWorkerZnode();
                appendWorkerInfo();
            } else {
                this.workerInfo = getIfExists(parseJobZNode());
                if (this.workerInfo != null) {
                    createWorkerZnode();
                    LOG.warning("Worker is coming from a failure. It is using the previous job znode data: " + this.workerInfo);
                } else {
                    this.workerInfo = WorkerInfoUtils.createWorkerInfo(createWorkerID(), this.workerIP, this.workerPort, this.nodeInfo, this.computeResource);
                    createWorkerZnode();
                    appendWorkerInfo();
                }
            }
            this.childrenCache = new PathChildrenCache(this.client, this.jobPath, true);
            this.childrenCache.start();
            LOG.info("This worker: " + this.workerInfo + " initialized successfully.");
            return true;
        } catch (Exception e) {
            LOG.log(Level.SEVERE, "Exception when initializing ZKWorkerController", (Throwable) e);
            return false;
        }
    }

    public boolean initialize(int i) {
        try {
            this.client = CuratorFrameworkFactory.newClient(ZKContext.zooKeeperServerAddresses(this.config), new ExponentialBackoffRetry(1000, 3));
            this.client.start();
            this.barrier = new DistributedBarrier(this.client, ZKUtil.constructBarrierPath(this.config, this.jobName));
            this.daiForWorkerID = new DistributedAtomicInteger(this.client, ZKUtil.constructDaiPathForWorkerID(this.config, this.jobName), new ExponentialBackoffRetry(1000, 3));
            this.daiForBarrier = new DistributedAtomicInteger(this.client, ZKUtil.constructDaiPathForBarrier(this.config, this.jobName), new ExponentialBackoffRetry(1000, 3));
            if (this.client.checkExists().forPath(this.jobPath) == null) {
                this.workerInfo = WorkerInfoUtils.createWorkerInfo(i, this.workerIP, this.workerPort, this.nodeInfo, this.computeResource);
                createWorkerZnode();
                appendWorkerInfo();
            } else {
                this.workerInfo = getIfExists(parseJobZNode());
                if (this.workerInfo != null) {
                    createWorkerZnode();
                    LOG.warning("Worker is coming from a failure. It is using the previous job znode data: " + this.workerInfo);
                } else {
                    this.workerInfo = WorkerInfoUtils.createWorkerInfo(i, this.workerIP, this.workerPort, this.nodeInfo, this.computeResource);
                    createWorkerZnode();
                    appendWorkerInfo();
                }
            }
            this.childrenCache = new PathChildrenCache(this.client, this.jobPath, true);
            this.childrenCache.start();
            LOG.info("This worker: " + this.workerInfo + " initialized successfully.");
            return true;
        } catch (Exception e) {
            LOG.log(Level.SEVERE, "Exception when initializing ZKWorkerController", (Throwable) e);
            return false;
        }
    }

    public JobMasterAPI.WorkerInfo getWorkerInfo() {
        return this.workerInfo;
    }

    public JobMasterAPI.WorkerInfo getWorkerInfoForID(int i) {
        for (JobMasterAPI.WorkerInfo workerInfo : getJoinedWorkers()) {
            if (workerInfo.getWorkerID() == i) {
                return workerInfo;
            }
        }
        return null;
    }

    public int getNumberOfWorkers() {
        return this.numberOfWorkers;
    }

    private int createWorkerID() {
        try {
            AtomicValue increment = this.daiForWorkerID.increment();
            if (!increment.succeeded()) {
                createWorkerID();
                return -1;
            }
            int intValue = ((Integer) increment.preValue()).intValue();
            LOG.fine("Unique WorkerID generated: " + intValue);
            return intValue;
        } catch (Exception e) {
            LOG.log(Level.WARNING, "Failed to generate a unique workerID. Will try again ...", (Throwable) e);
            createWorkerID();
            return -1;
        }
    }

    private void createWorkerZnode() {
        try {
            this.jobZNode = ZKUtil.createPersistentEphemeralZnode(this.client, ZKUtil.constructWorkerPath(this.jobPath, getWorkerIpAndPort(this.workerInfo)), this.workerInfo.toByteArray());
            this.jobZNode.start();
            this.jobZNode.waitForInitialCreate(10000L, TimeUnit.MILLISECONDS);
            LOG.fine("An ephemeral znode is created for this worker: " + this.jobZNode.getActualPath());
        } catch (Exception e) {
            throw new RuntimeException("Could not create znode for the worker: " + this.workerInfo, e);
        }
    }

    private void appendWorkerInfo() {
        InterProcessMutex interProcessMutex = new InterProcessMutex(this.client, ZKUtil.constructJobLockPath(this.config, this.jobName));
        try {
            interProcessMutex.acquire();
            this.client.setData().forPath(this.jobPath, ZKUtil.addTwoByteArrays((byte[]) this.client.getData().forPath(this.jobPath), ZKUtil.encodeWorkerInfo(this.workerInfo)));
            interProcessMutex.release();
            LOG.info("Added own WorkerInfo and updated job znode content.");
        } catch (Exception e) {
            throw new RuntimeException("Could not update the job znode content for the worker: " + this.workerInfo, e);
        }
    }

    public void printWorkers(List<JobMasterAPI.WorkerInfo> list) {
        StringBuffer stringBuffer = new StringBuffer();
        stringBuffer.append("Number of workers in the job: " + list.size() + "\n");
        Iterator<JobMasterAPI.WorkerInfo> it = list.iterator();
        while (it.hasNext()) {
            stringBuffer.append(it.next().toString() + "\n");
        }
        LOG.info(stringBuffer.toString());
    }

    public List<JobMasterAPI.WorkerInfo> getCurrentWorkers() {
        ArrayList arrayList = new ArrayList();
        Iterator it = this.childrenCache.getCurrentData().iterator();
        while (it.hasNext()) {
            JobMasterAPI.WorkerInfo workerInfo = null;
            try {
                workerInfo = JobMasterAPI.WorkerInfo.newBuilder().mergeFrom(((ChildData) it.next()).getData()).build();
            } catch (InvalidProtocolBufferException e) {
                LOG.log(Level.SEVERE, "Could not decode child znode content as a WorkerInfo object", e);
            }
            arrayList.add(workerInfo);
        }
        return arrayList;
    }

    public int getNumberOfCurrentWorkers() {
        return this.childrenCache.getCurrentData().size();
    }

    public List<JobMasterAPI.WorkerInfo> getJoinedWorkers() {
        return parseJobZNode();
    }

    private List<JobMasterAPI.WorkerInfo> parseJobZNode() {
        try {
            return ZKUtil.decodeWorkerInfos((byte[]) this.client.getData().forPath(this.jobPath));
        } catch (Exception e) {
            LOG.log(Level.SEVERE, "Could not get the job node data", (Throwable) e);
            return null;
        }
    }

    private JobMasterAPI.WorkerInfo getIfExists(List<JobMasterAPI.WorkerInfo> list) {
        String str = this.workerIP + ":" + this.workerPort;
        for (JobMasterAPI.WorkerInfo workerInfo : list) {
            if (str.equalsIgnoreCase(getWorkerIpAndPort(workerInfo))) {
                return workerInfo;
            }
        }
        return null;
    }

    private String getWorkerIpAndPort(JobMasterAPI.WorkerInfo workerInfo) {
        return workerInfo.getWorkerIP() + ":" + workerInfo.getPort();
    }

    private int countNumberOfJoinedWorkers() {
        try {
            byte[] bArr = (byte[]) this.client.getData().forPath(this.jobPath);
            int i = 0;
            int i2 = 0;
            while (i < bArr.length) {
                i += 4 + ZKUtil.intFromBytes(bArr, i);
                i2++;
            }
            return i2;
        } catch (Exception e) {
            LOG.log(Level.SEVERE, "Could not get the job node data", (Throwable) e);
            return -1;
        }
    }

    public List<JobMasterAPI.WorkerInfo> getAllWorkers() throws TimeoutException {
        long maxWaitTimeForAllToJoin = ControllerContext.maxWaitTimeForAllToJoin(this.config);
        long j = 0;
        while (j < maxWaitTimeForAllToJoin) {
            if (countNumberOfJoinedWorkers() >= this.numberOfWorkers) {
                return getJoinedWorkers();
            }
            try {
                Thread.sleep(50L);
                j += 50;
            } catch (InterruptedException e) {
                LOG.fine("Thread sleep interrupted. Will try again ...");
            }
        }
        throw new TimeoutException("All workers have not joined the job on the specified time limit: " + maxWaitTimeForAllToJoin + "ms.");
    }

    private String getZnodeName(String str) {
        if (str == null || str.length() < 40) {
            return null;
        }
        return str.substring(40);
    }

    private boolean incrementBarrierDAI(int i, long j) {
        if (i == 100) {
            return false;
        }
        try {
            AtomicValue increment = this.daiForBarrier.increment();
            if (!increment.succeeded()) {
                return incrementBarrierDAI(i + 1, j);
            }
            LOG.fine("DistributedAtomicInteger for Barrier increased to: " + increment.postValue());
            if (((Integer) increment.postValue()).intValue() % this.numberOfWorkers == 0) {
                this.barrier.removeBarrier();
                return true;
            }
            this.barrier.setBarrier();
            return this.barrier.waitOnBarrier(j, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            LOG.log(Level.WARNING, "Failed to increment the DistributedAtomicInteger for Barrier. Will try again ...", (Throwable) e);
            return incrementBarrierDAI(i + 1, j);
        }
    }

    public void waitOnBarrier() throws TimeoutException {
        if (!incrementBarrierDAI(0, ControllerContext.maxWaitTimeOnBarrier(this.config))) {
            throw new TimeoutException("All workers have not arrived at the barrier on the time limit: " + ControllerContext.maxWaitTimeOnBarrier(this.config) + "ms.");
        }
    }

    public void close() {
        if (this.client != null) {
            try {
                int size = this.childrenCache.getCurrentData().size();
                this.jobZNode.close();
                CloseableUtils.closeQuietly(this.childrenCache);
                if (size == 1) {
                    LOG.log(Level.INFO, "This is the last worker to finish. Deleting the job znodes.");
                    ZKUtil.deleteJobZNodes(this.config, this.client, this.jobName);
                }
                CloseableUtils.closeQuietly(this.client);
            } catch (Exception e) {
                LOG.log(Level.SEVERE, "Exception when closing", (Throwable) e);
            }
        }
    }
}
