package co.cask.tigon.internal.app.runtime.distributed;

import co.cask.tigon.api.flow.FlowSpecification;
import co.cask.tigon.api.flow.FlowletDefinition;
import co.cask.tigon.app.program.Program;
import co.cask.tigon.app.program.ProgramType;
import co.cask.tigon.conf.CConfiguration;
import co.cask.tigon.data.queue.QueueName;
import co.cask.tigon.data.transaction.queue.QueueAdmin;
import co.cask.tigon.internal.app.runtime.ProgramController;
import co.cask.tigon.internal.app.runtime.ProgramOptions;
import co.cask.tigon.internal.app.runtime.distributed.AbstractDistributedProgramRunner;
import co.cask.tigon.internal.app.runtime.flow.FlowUtils;
import co.cask.tigon.twill.AbortOnTimeoutEventHandler;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Multimap;
import com.google.inject.Inject;
import java.io.File;
import org.apache.hadoop.conf.Configuration;
import org.apache.twill.api.EventHandler;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/tigon/internal/app/runtime/distributed/DistributedFlowProgramRunner.class */
public final class DistributedFlowProgramRunner extends AbstractDistributedProgramRunner {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedFlowProgramRunner.class);
    private final QueueAdmin queueAdmin;

    @Inject
    DistributedFlowProgramRunner(TwillRunner twillRunner, Configuration configuration, CConfiguration cConfiguration, QueueAdmin queueAdmin) {
        super(twillRunner, configuration, cConfiguration);
        this.queueAdmin = queueAdmin;
    }

    @Override // co.cask.tigon.internal.app.runtime.distributed.AbstractDistributedProgramRunner
    protected ProgramController launch(Program program, ProgramOptions programOptions, File file, File file2, AbstractDistributedProgramRunner.ApplicationLauncher applicationLauncher) {
        FlowSpecification specification = program.getSpecification();
        ProgramType type = program.getType();
        Preconditions.checkNotNull(type, "Missing processor type.");
        Preconditions.checkArgument(type == ProgramType.FLOW, "Only FLOW process type is supported.");
        try {
            Preconditions.checkNotNull(specification, "Missing FlowSpecification for %s", new Object[]{program.getName()});
            for (FlowletDefinition flowletDefinition : specification.getFlowlets().values()) {
                int maxInstances = flowletDefinition.getFlowletSpec().getMaxInstances();
                Preconditions.checkArgument(flowletDefinition.getInstances() <= maxInstances, "Flowlet %s can have a maximum of %s instances", new Object[]{flowletDefinition.getFlowletSpec().getName(), Integer.valueOf(maxInstances)});
            }
            LOG.info("Configuring flowlets queues");
            Multimap<String, QueueName> configureQueue = FlowUtils.configureQueue(program, specification, this.queueAdmin);
            LOG.info("Launching distributed flow: " + program.getName() + ":" + specification.getName());
            TwillController launch = applicationLauncher.launch(new FlowTwillApplication(program, specification, file, file2, this.eventHandler));
            return new FlowTwillProgramController(program.getName(), launch, new DistributedFlowletInstanceUpdater(program, launch, this.queueAdmin, configureQueue)).startListen();
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // co.cask.tigon.internal.app.runtime.distributed.AbstractDistributedProgramRunner
    protected EventHandler createEventHandler(CConfiguration cConfiguration) {
        return new AbortOnTimeoutEventHandler(cConfiguration.getLong("twill.no.container.timeout", Long.MAX_VALUE), true);
    }
}
