package co.cask.tigon.cli;

import co.cask.tigon.app.program.Program;
import co.cask.tigon.app.program.Programs;
import co.cask.tigon.data.transaction.queue.QueueAdmin;
import co.cask.tigon.flow.DeployClient;
import co.cask.tigon.internal.app.runtime.distributed.DistributedFlowletInstanceUpdater;
import co.cask.tigon.internal.app.runtime.distributed.FlowTwillProgramController;
import co.cask.tigon.internal.app.runtime.flow.FlowUtils;
import co.cask.tigon.io.Locations;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Service;
import com.google.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.api.TwillRunnerService;
import org.apache.twill.api.logging.PrinterLogHandler;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.ServiceDiscovered;
import org.apache.twill.filesystem.Location;
import org.apache.twill.filesystem.LocationFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/tigon/cli/DistributedFlowOperations.class */
public class DistributedFlowOperations extends AbstractIdleService implements FlowOperations {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedFlowOperations.class);
    private final Location location;
    private final TwillRunnerService runnerService;
    private final DeployClient deployClient;
    private final File jarUnpackDir;
    private final QueueAdmin queueAdmin;

    @Inject
    public DistributedFlowOperations(LocationFactory locationFactory, TwillRunnerService twillRunnerService, DeployClient deployClient, QueueAdmin queueAdmin) throws IOException {
        this.location = locationFactory.create("flowjars");
        this.location.mkdirs();
        this.runnerService = twillRunnerService;
        this.deployClient = deployClient;
        this.jarUnpackDir = Files.createTempDir();
        this.queueAdmin = queueAdmin;
    }

    protected void startUp() throws Exception {
        this.runnerService.startAndWait();
    }

    @Override // co.cask.tigon.cli.FlowOperations
    public void startFlow(File file, String str, Map<String, String> map) {
        try {
            Program createProgram = this.deployClient.createProgram(file, str, this.jarUnpackDir);
            String name = createProgram.getSpecification().getName();
            if (listAllFlows().contains(name)) {
                throw new Exception("Flow with the same name is running! Stop or Delete the Flow before starting again");
            }
            Location append = this.location.append(name);
            append.delete();
            append.createNew();
            ByteStreams.copy(Locations.newInputSupplier(createProgram.getJarLocation()), Locations.newOutputSupplier(append));
            this.deployClient.startFlow(createProgram, map);
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    @Override // co.cask.tigon.cli.FlowOperations
    public Service.State getStatus(String str) {
        try {
            Iterable<TwillController> lookupFlow = lookupFlow(str);
            sleepForZK(lookupFlow);
            if (!lookupFlow.iterator().hasNext()) {
                return null;
            }
            Service.State state = lookupFlow.iterator().next().state();
            sleepForZK(state);
            return state;
        } catch (Exception e) {
            LOG.warn(e.getMessage(), e);
            return null;
        }
    }

    @Override // co.cask.tigon.cli.FlowOperations
    public List<String> listAllFlows() {
        ArrayList newArrayList = Lists.newArrayList();
        try {
            Iterator<TwillRunner.LiveInfo> it = lookupService().iterator();
            while (it.hasNext()) {
                String applicationName = it.next().getApplicationName();
                newArrayList.add(applicationName.substring(applicationName.indexOf(46) + 1));
            }
        } catch (Exception e) {
            LOG.warn(e.getMessage(), e);
        }
        return newArrayList;
    }

    @Override // co.cask.tigon.cli.FlowOperations
    public List<InetSocketAddress> discover(String str, String str2) {
        ArrayList newArrayList = Lists.newArrayList();
        try {
            Iterator<TwillController> it = lookupFlow(str).iterator();
            while (it.hasNext()) {
                ServiceDiscovered discoverService = it.next().discoverService(str2);
                sleepForZK(discoverService);
                Iterator it2 = discoverService.iterator();
                while (it2.hasNext()) {
                    newArrayList.add(((Discoverable) it2.next()).getSocketAddress());
                }
            }
        } catch (Exception e) {
            LOG.warn(e.getMessage(), e);
        }
        return newArrayList;
    }

    @Override // co.cask.tigon.cli.FlowOperations
    public void stopFlow(String str) {
        try {
            Iterator<TwillController> it = lookupFlow(str).iterator();
            while (it.hasNext()) {
                it.next().stopAndWait();
            }
        } catch (Exception e) {
            LOG.warn(e.getMessage(), e);
        }
    }

    @Override // co.cask.tigon.cli.FlowOperations
    public void deleteFlow(String str) {
        stopFlow(str);
        try {
            this.queueAdmin.clearAllForFlow(str, str);
            this.location.append(str).delete();
        } catch (Exception e) {
            LOG.warn(e.getMessage(), e);
        }
    }

    @Override // co.cask.tigon.cli.FlowOperations
    public List<String> getServices(String str) {
        ArrayList newArrayList = Lists.newArrayList();
        try {
            Iterator<TwillController> it = lookupFlow(str).iterator();
            while (it.hasNext()) {
                ResourceReport resourceReport = it.next().getResourceReport();
                sleepForZK(resourceReport);
                newArrayList.addAll(resourceReport.getServices());
            }
        } catch (Exception e) {
            LOG.warn(e.getMessage(), e);
        }
        return newArrayList;
    }

    @Override // co.cask.tigon.cli.FlowOperations
    public void setInstances(String str, String str2, int i) {
        try {
            for (TwillController twillController : lookupFlow(str)) {
                ResourceReport resourceReport = twillController.getResourceReport();
                sleepForZK(resourceReport);
                int size = ((Collection) resourceReport.getResources().get(str2)).size();
                Program create = Programs.create(this.location.append(str));
                FlowTwillProgramController flowTwillProgramController = new FlowTwillProgramController(create.getName(), twillController, new DistributedFlowletInstanceUpdater(create, twillController, this.queueAdmin, FlowUtils.configureQueue(create, create.getSpecification(), this.queueAdmin)));
                HashMap newHashMap = Maps.newHashMap();
                newHashMap.put("flowlet", str2);
                newHashMap.put("newInstances", String.valueOf(i));
                newHashMap.put("oldInstances", String.valueOf(size));
                flowTwillProgramController.command("flowletInstances", newHashMap).get();
            }
        } catch (Exception e) {
            LOG.warn(e.getMessage(), e);
        }
    }

    @Override // co.cask.tigon.cli.FlowOperations
    public Map<String, Integer> getFlowInfo(String str) {
        HashMap newHashMap = Maps.newHashMap();
        try {
            Iterator<TwillController> it = lookupFlow(str).iterator();
            while (it.hasNext()) {
                ResourceReport resourceReport = it.next().getResourceReport();
                sleepForZK(resourceReport);
                for (Map.Entry entry : resourceReport.getResources().entrySet()) {
                    newHashMap.put(entry.getKey(), Integer.valueOf(((Collection) entry.getValue()).size()));
                }
            }
        } catch (Exception e) {
            LOG.warn(e.getMessage(), e);
        }
        return newHashMap;
    }

    @Override // co.cask.tigon.cli.FlowOperations
    public void addLogHandler(String str, PrintStream printStream) {
        try {
            Iterable<TwillController> lookupFlow = lookupFlow(str);
            if (lookupFlow.iterator().hasNext()) {
                lookupFlow.iterator().next().addLogHandler(new PrinterLogHandler(new PrintWriter((OutputStream) printStream, true)));
            }
        } catch (Exception e) {
            LOG.warn(e.getMessage(), e);
        }
    }

    private Iterable<TwillRunner.LiveInfo> lookupService() throws Exception {
        Iterable<TwillRunner.LiveInfo> lookupLive = this.runnerService.lookupLive();
        sleepForZK(lookupLive);
        return lookupLive;
    }

    private Iterable<TwillController> lookupFlow(String str) throws Exception {
        Iterable<TwillController> lookup = this.runnerService.lookup(String.format("flow.%s", str));
        sleepForZK(lookup);
        return lookup;
    }

    private void sleepForZK(Object obj) throws Exception {
        for (int i = 0; i < 100; i++) {
            if (obj != null) {
                TimeUnit.MILLISECONDS.sleep(250L);
                return;
            }
            try {
                TimeUnit.MILLISECONDS.sleep(25L);
            } catch (InterruptedException e) {
                LOG.warn("Caught interrupted exception", e);
                Thread.currentThread().interrupt();
                return;
            }
        }
        throw new Exception("Didn't receive data from ZK");
    }

    protected void shutDown() throws Exception {
        this.runnerService.stopAndWait();
        FileUtils.deleteDirectory(this.jarUnpackDir);
    }
}
