package co.cask.cdap.common.twill;

import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.logging.common.UncaughtExceptionHandler;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Injector;
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.twill.api.AbstractTwillRunnable;
import org.apache.twill.api.TwillContext;
import org.apache.twill.api.TwillRunnableSpecification;
import org.apache.twill.common.Threads;
import org.apache.twill.internal.ServiceListenerAdapter;
import org.apache.twill.internal.Services;
import org.apache.twill.kafka.client.BrokerService;
import org.apache.twill.kafka.client.KafkaClientService;
import org.apache.twill.zookeeper.ZKClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:lib/cdap-common-4.2.0.jar:co/cask/cdap/common/twill/AbstractMasterTwillRunnable.class */
public abstract class AbstractMasterTwillRunnable extends AbstractTwillRunnable {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractMasterTwillRunnable.class);
    protected String name;
    private String cConfName;
    private String hConfName;
    private Configuration hConf;
    private CConfiguration cConf;
    private List<Service> services;
    private volatile Thread runThread;

    public AbstractMasterTwillRunnable(String str, String str2, String str3) {
        this.name = str;
        this.cConfName = str2;
        this.hConfName = str3;
    }

    @Override // org.apache.twill.api.AbstractTwillRunnable, org.apache.twill.api.TwillRunnable
    public TwillRunnableSpecification configure() {
        return TwillRunnableSpecification.Builder.with().setName(this.name).withConfigs(ImmutableMap.of("cConf", this.cConfName, "hConf", this.hConfName)).build();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.twill.api.AbstractTwillRunnable, org.apache.twill.api.TwillRunnable
    public final void initialize(TwillContext twillContext) {
        Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler());
        super.initialize(twillContext);
        this.name = twillContext.getSpecification().getName();
        LOG.info("Initializing runnable {}", this.name);
        Map<String, String> configs = twillContext.getSpecification().getConfigs();
        try {
            this.hConf = new Configuration();
            this.hConf.clear();
            this.hConf.addResource(new File(configs.get("hConf")).toURI().toURL());
            UserGroupInformation.setConfiguration(this.hConf);
            this.cConf = CConfiguration.create(new File(configs.get("cConf")), new File[0]);
            LOG.debug("{} cConf {}", this.name, this.cConf);
            LOG.debug("{} HBase conf {}", this.name, this.hConf);
            Injector doInit = doInit(twillContext);
            this.services = Lists.newArrayList();
            this.services.add(doInit.getInstance(ZKClientService.class));
            this.services.add(doInit.getInstance(KafkaClientService.class));
            this.services.add(doInit.getInstance(BrokerService.class));
            this.services.add(doInit.getInstance(MetricsCollectionService.class));
            addServices(this.services);
            Preconditions.checkArgument(!this.services.isEmpty(), "Should have at least one service");
            LOG.info("Runnable initialized {}", this.name);
        } catch (Throwable th) {
            throw Throwables.propagate(th);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        this.runThread = Thread.currentThread();
        LOG.info("Starting runnable {}", this.name);
        SettableFuture<String> create = SettableFuture.create();
        for (Service service : this.services) {
            service.addListener(createServiceListener(service.getClass().getName(), create), Threads.SAME_THREAD_EXECUTOR);
        }
        Futures.getUnchecked(Services.chainStart(this.services.get(0), (Service[]) this.services.subList(1, this.services.size()).toArray(new Service[0])));
        LOG.info("Runnable started {}", this.name);
        try {
            create.get();
        } catch (InterruptedException e) {
            LOG.debug("Waiting on latch interrupted {}", this.name);
        } catch (ExecutionException e2) {
            throw Throwables.propagate(e2.getCause());
        }
    }

    @Override // org.apache.twill.api.AbstractTwillRunnable, org.apache.twill.api.TwillRunnable
    public void destroy() {
        List reverse = Lists.reverse(this.services);
        Futures.getUnchecked(Services.chainStop((Service) reverse.get(0), (Service[]) reverse.subList(1, reverse.size()).toArray(new Service[0])));
        LOG.info("Runnable stopped {}", this.name);
    }

    private Service.Listener createServiceListener(final String str, final SettableFuture<String> settableFuture) {
        return new ServiceListenerAdapter() { // from class: co.cask.cdap.common.twill.AbstractMasterTwillRunnable.1
            public void terminated(Service.State state) {
                AbstractMasterTwillRunnable.LOG.info("Service " + str + " terminated");
                settableFuture.set(str);
            }

            public void failed(Service.State state, Throwable th) {
                AbstractMasterTwillRunnable.LOG.error("Service " + str + " failed", th);
                settableFuture.setException(th);
            }
        };
    }

    protected final Configuration getConfiguration() {
        return this.hConf;
    }

    protected final CConfiguration getCConfiguration() {
        return this.cConf;
    }

    @Override // org.apache.twill.api.AbstractTwillRunnable, org.apache.twill.api.TwillRunnable
    public void stop() {
        if (this.runThread != null) {
            this.runThread.interrupt();
        }
    }

    protected abstract void addServices(List<? super Service> list);

    protected abstract Injector doInit(TwillContext twillContext);
}
