package co.cask.tigon.internal.app.runtime.flow;

import co.cask.tigon.api.flow.FlowSpecification;
import co.cask.tigon.api.flow.FlowletDefinition;
import co.cask.tigon.api.flow.flowlet.FlowletSpecification;
import co.cask.tigon.app.program.Program;
import co.cask.tigon.app.program.ProgramType;
import co.cask.tigon.data.queue.QueueName;
import co.cask.tigon.data.transaction.queue.QueueAdmin;
import co.cask.tigon.internal.app.runtime.AbstractProgramController;
import co.cask.tigon.internal.app.runtime.BasicArguments;
import co.cask.tigon.internal.app.runtime.ProgramController;
import co.cask.tigon.internal.app.runtime.ProgramOptionConstants;
import co.cask.tigon.internal.app.runtime.ProgramOptions;
import co.cask.tigon.internal.app.runtime.ProgramRunner;
import co.cask.tigon.internal.app.runtime.ProgramRunnerFactory;
import co.cask.tigon.internal.app.runtime.SimpleProgramOptions;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Table;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.twill.api.RunId;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.discovery.ServiceDiscovered;
import org.apache.twill.internal.RunIds;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/tigon/internal/app/runtime/flow/FlowProgramRunner.class */
public final class FlowProgramRunner implements ProgramRunner {
    private static final Logger LOG = LoggerFactory.getLogger(FlowProgramRunner.class);
    private final ProgramRunnerFactory programRunnerFactory;
    private final Map<RunId, ProgramOptions> programOptions = Maps.newHashMap();
    private final QueueAdmin queueAdmin;
    private final DiscoveryServiceClient discoveryServiceClient;

    /* loaded from: input_file:co/cask/tigon/internal/app/runtime/flow/FlowProgramRunner$FlowProgramController.class */
    private final class FlowProgramController extends AbstractProgramController {
        private final Table<String, Integer, ProgramController> flowlets;
        private final Program program;
        private final FlowSpecification flowSpec;
        private final Lock lock;
        private final Multimap<String, QueueName> consumerQueues;
        private final DiscoveryServiceClient discoveryServiceClient;

        FlowProgramController(Table<String, Integer, ProgramController> table, RunId runId, Program program, FlowSpecification flowSpecification, Multimap<String, QueueName> multimap, DiscoveryServiceClient discoveryServiceClient) {
            super(program.getName(), runId);
            this.lock = new ReentrantLock();
            this.flowlets = table;
            this.program = program;
            this.flowSpec = flowSpecification;
            this.consumerQueues = multimap;
            this.discoveryServiceClient = discoveryServiceClient;
            started();
        }

        @Override // co.cask.tigon.internal.app.runtime.AbstractProgramController
        protected void doSuspend() throws Exception {
            FlowProgramRunner.LOG.info("Suspending flow: " + this.flowSpec.getName());
            this.lock.lock();
            try {
                Futures.successfulAsList(Iterables.transform(this.flowlets.values(), new Function<ProgramController, ListenableFuture<ProgramController>>() { // from class: co.cask.tigon.internal.app.runtime.flow.FlowProgramRunner.FlowProgramController.1
                    public ListenableFuture<ProgramController> apply(ProgramController programController) {
                        return programController.suspend();
                    }
                })).get();
                this.lock.unlock();
                FlowProgramRunner.LOG.info("Flow suspended: " + this.flowSpec.getName());
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        }

        @Override // co.cask.tigon.internal.app.runtime.AbstractProgramController
        protected void doResume() throws Exception {
            FlowProgramRunner.LOG.info("Resuming flow: " + this.flowSpec.getName());
            this.lock.lock();
            try {
                Futures.successfulAsList(Iterables.transform(this.flowlets.values(), new Function<ProgramController, ListenableFuture<ProgramController>>() { // from class: co.cask.tigon.internal.app.runtime.flow.FlowProgramRunner.FlowProgramController.2
                    public ListenableFuture<ProgramController> apply(ProgramController programController) {
                        return programController.resume();
                    }
                })).get();
                this.lock.unlock();
                FlowProgramRunner.LOG.info("Flow resumed: " + this.flowSpec.getName());
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        }

        @Override // co.cask.tigon.internal.app.runtime.AbstractProgramController
        protected void doStop() throws Exception {
            FlowProgramRunner.LOG.info("Stopping flow: " + this.flowSpec.getName());
            this.lock.lock();
            try {
                Futures.successfulAsList(Iterables.transform(this.flowlets.values(), new Function<ProgramController, ListenableFuture<ProgramController>>() { // from class: co.cask.tigon.internal.app.runtime.flow.FlowProgramRunner.FlowProgramController.3
                    public ListenableFuture<ProgramController> apply(ProgramController programController) {
                        return programController.stop();
                    }
                })).get();
                this.lock.unlock();
                FlowProgramRunner.LOG.info("Flow stopped: " + this.flowSpec.getName());
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        }

        @Override // co.cask.tigon.internal.app.runtime.AbstractProgramController
        protected void doCommand(String str, Object obj) throws Exception {
            if (ProgramOptionConstants.FLOWLET_INSTANCES.equals(str) && (obj instanceof Map)) {
                Map map = (Map) obj;
                this.lock.lock();
                try {
                    try {
                        changeInstances((String) map.get("flowlet"), Integer.valueOf((String) map.get("newInstances")).intValue());
                        this.lock.unlock();
                    } catch (Throwable th) {
                        FlowProgramRunner.LOG.error(String.format("Fail to change instances: %s", map), th);
                        this.lock.unlock();
                    }
                } catch (Throwable th2) {
                    this.lock.unlock();
                    throw th2;
                }
            }
        }

        private synchronized void changeInstances(String str, int i) throws Exception {
            Map<Integer, ProgramController> row = this.flowlets.row(str);
            int size = row.size();
            if (size == i) {
                return;
            }
            if (size < i) {
                increaseInstances(str, i, row, size);
            } else {
                decreaseInstances(str, i, row, size);
            }
        }

        private synchronized void increaseInstances(String str, final int i, Map<Integer, ProgramController> map, int i2) throws Exception {
            FlowletSpecification specification = ((FlowletProgramController) Iterables.getFirst(map.values(), (Object) null)).getFlowletContext().getSpecification();
            int maxInstances = specification.getMaxInstances();
            Preconditions.checkArgument(i <= maxInstances, "Flowlet %s can have a maximum of %s instances", new Object[]{specification.getName(), Integer.valueOf(maxInstances)});
            Futures.successfulAsList(Iterables.transform(map.values(), new Function<ProgramController, ListenableFuture<?>>() { // from class: co.cask.tigon.internal.app.runtime.flow.FlowProgramRunner.FlowProgramController.4
                public ListenableFuture<?> apply(ProgramController programController) {
                    return programController.suspend();
                }
            })).get();
            FlowUtils.reconfigure(this.consumerQueues.get(str), FlowUtils.generateConsumerGroupId(this.program, str), i, FlowProgramRunner.this.queueAdmin);
            Futures.successfulAsList(Iterables.transform(map.values(), new Function<ProgramController, ListenableFuture<?>>() { // from class: co.cask.tigon.internal.app.runtime.flow.FlowProgramRunner.FlowProgramController.5
                public ListenableFuture<?> apply(ProgramController programController) {
                    return programController.command(ProgramOptionConstants.INSTANCES, Integer.valueOf(i));
                }
            })).get();
            Futures.successfulAsList(Iterables.transform(map.values(), new Function<ProgramController, ListenableFuture<?>>() { // from class: co.cask.tigon.internal.app.runtime.flow.FlowProgramRunner.FlowProgramController.6
                public ListenableFuture<?> apply(ProgramController programController) {
                    return programController.resume();
                }
            })).get();
            for (int i3 = i2; i3 < i; i3++) {
                this.flowlets.put(str, Integer.valueOf(i3), FlowProgramRunner.this.startFlowlet(this.program, FlowProgramRunner.this.createFlowletOptions(str, i3, i, getRunId())));
            }
        }

        private synchronized void decreaseInstances(String str, final int i, Map<Integer, ProgramController> map, int i2) throws Exception {
            ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(i2 - i);
            for (int i3 = i2 - 1; i3 >= i; i3--) {
                newArrayListWithCapacity.add(((ProgramController) this.flowlets.remove(str, Integer.valueOf(i3))).stop());
            }
            Futures.successfulAsList(newArrayListWithCapacity).get();
            Futures.successfulAsList(Iterables.transform(map.values(), new Function<ProgramController, ListenableFuture<?>>() { // from class: co.cask.tigon.internal.app.runtime.flow.FlowProgramRunner.FlowProgramController.7
                public ListenableFuture<?> apply(ProgramController programController) {
                    return programController.suspend();
                }
            })).get();
            FlowUtils.reconfigure(this.consumerQueues.get(str), FlowUtils.generateConsumerGroupId(this.program, str), i, FlowProgramRunner.this.queueAdmin);
            Futures.successfulAsList(Iterables.transform(map.values(), new Function<ProgramController, ListenableFuture<?>>() { // from class: co.cask.tigon.internal.app.runtime.flow.FlowProgramRunner.FlowProgramController.8
                public ListenableFuture<?> apply(ProgramController programController) {
                    return programController.command(ProgramOptionConstants.INSTANCES, Integer.valueOf(i));
                }
            })).get();
            Futures.successfulAsList(Iterables.transform(map.values(), new Function<ProgramController, ListenableFuture<?>>() { // from class: co.cask.tigon.internal.app.runtime.flow.FlowProgramRunner.FlowProgramController.9
                public ListenableFuture<?> apply(ProgramController programController) {
                    return programController.resume();
                }
            })).get();
        }

        @Override // co.cask.tigon.internal.app.runtime.ProgramController
        public ServiceDiscovered discover(String str) {
            return this.discoveryServiceClient.discover(str);
        }
    }

    @Inject
    public FlowProgramRunner(ProgramRunnerFactory programRunnerFactory, QueueAdmin queueAdmin, DiscoveryServiceClient discoveryServiceClient) {
        this.programRunnerFactory = programRunnerFactory;
        this.queueAdmin = queueAdmin;
        this.discoveryServiceClient = discoveryServiceClient;
    }

    @Override // co.cask.tigon.internal.app.runtime.ProgramRunner
    public ProgramController run(Program program, ProgramOptions programOptions) {
        FlowSpecification specification = program.getSpecification();
        ProgramType type = program.getType();
        Preconditions.checkNotNull(type, "Missing processor type.");
        Preconditions.checkArgument(type == ProgramType.FLOW, "Only FLOW process type is supported.");
        Preconditions.checkNotNull(specification, "Missing FlowSpecification for %s", new Object[]{program.getName()});
        for (FlowletDefinition flowletDefinition : specification.getFlowlets().values()) {
            int maxInstances = flowletDefinition.getFlowletSpec().getMaxInstances();
            Preconditions.checkArgument(flowletDefinition.getInstances() <= maxInstances, "Flowlet %s can have a maximum of %s instances", new Object[]{flowletDefinition.getFlowletSpec().getName(), Integer.valueOf(maxInstances)});
        }
        try {
            RunId generate = RunIds.generate();
            this.programOptions.put(generate, programOptions);
            return new FlowProgramController(createFlowlets(program, generate, specification), generate, program, specification, FlowUtils.configureQueue(program, specification, this.queueAdmin), this.discoveryServiceClient);
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    private Table<String, Integer, ProgramController> createFlowlets(Program program, RunId runId, FlowSpecification flowSpecification) {
        HashBasedTable create = HashBasedTable.create();
        try {
            for (Map.Entry entry : flowSpecification.getFlowlets().entrySet()) {
                int instances = ((FlowletDefinition) entry.getValue()).getInstances();
                for (int i = 0; i < instances; i++) {
                    create.put(entry.getKey(), Integer.valueOf(i), startFlowlet(program, createFlowletOptions((String) entry.getKey(), i, instances, runId)));
                }
            }
            return create;
        } catch (Throwable th) {
            try {
                Futures.successfulAsList(Iterables.transform(create.values(), new Function<ProgramController, ListenableFuture<?>>() { // from class: co.cask.tigon.internal.app.runtime.flow.FlowProgramRunner.1
                    public ListenableFuture<?> apply(ProgramController programController) {
                        return programController.stop();
                    }
                })).get();
            } catch (Exception e) {
                LOG.error("Fail to stop all flowlets on failure.");
            }
            throw Throwables.propagate(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ProgramController startFlowlet(Program program, ProgramOptions programOptions) {
        return this.programRunnerFactory.create(ProgramRunnerFactory.Type.FLOWLET).run(program, programOptions);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v9, types: [co.cask.tigon.internal.app.runtime.Arguments] */
    public ProgramOptions createFlowletOptions(String str, int i, int i2, RunId runId) {
        BasicArguments basicArguments = new BasicArguments();
        if (this.programOptions.containsKey(runId)) {
            basicArguments = this.programOptions.get(runId).getUserArguments();
        }
        return new SimpleProgramOptions(str, new BasicArguments(ImmutableMap.of(ProgramOptionConstants.INSTANCE_ID, Integer.toString(i), ProgramOptionConstants.INSTANCES, Integer.toString(i2), ProgramOptionConstants.RUN_ID, runId.getId())), basicArguments);
    }
}
