package co.cask.cdap.data2.datafabric.dataset.service;

import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.discovery.ResolvingDiscoverable;
import co.cask.cdap.common.http.CommonNettyHttpServiceBuilder;
import co.cask.cdap.common.metrics.MetricsReporterHook;
import co.cask.cdap.common.service.UncaughtExceptionIdleService;
import co.cask.cdap.data2.datafabric.dataset.service.executor.DatasetOpExecutor;
import co.cask.cdap.data2.metrics.DatasetMetricsReporter;
import co.cask.http.NettyHttpService;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.twill.common.Cancellable;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.DiscoveryService;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.discovery.ServiceDiscovered;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data2/datafabric/dataset/service/DatasetService.class */
public class DatasetService extends AbstractExecutionThreadService {
    private static final Logger LOG = LoggerFactory.getLogger(DatasetService.class);
    private final NettyHttpService httpService;
    private final DiscoveryService discoveryService;
    private final DiscoveryServiceClient discoveryServiceClient;
    private final DatasetOpExecutor opExecutorClient;
    private final Set<DatasetMetricsReporter> metricReporters;
    private final DatasetTypeService typeService;
    private final CConfiguration cConf;
    private Cancellable cancelDiscovery;
    private Cancellable opExecutorServiceWatch;
    private SettableFuture<ServiceDiscovered> opExecutorDiscovered;
    private volatile boolean stopping = false;

    @Inject
    public DatasetService(CConfiguration cConfiguration, DiscoveryService discoveryService, DiscoveryServiceClient discoveryServiceClient, MetricsCollectionService metricsCollectionService, DatasetOpExecutor datasetOpExecutor, Set<DatasetMetricsReporter> set, DatasetTypeService datasetTypeService, DatasetInstanceService datasetInstanceService) throws Exception {
        this.cConf = cConfiguration;
        this.typeService = datasetTypeService;
        DatasetTypeHandler datasetTypeHandler = new DatasetTypeHandler(datasetTypeService);
        DatasetInstanceHandler datasetInstanceHandler = new DatasetInstanceHandler(datasetInstanceService);
        CommonNettyHttpServiceBuilder commonNettyHttpServiceBuilder = new CommonNettyHttpServiceBuilder(cConfiguration);
        commonNettyHttpServiceBuilder.addHttpHandlers(ImmutableList.of(datasetTypeHandler, datasetInstanceHandler));
        commonNettyHttpServiceBuilder.setHandlerHooks(ImmutableList.of(new MetricsReporterHook(metricsCollectionService, "dataset.service")));
        commonNettyHttpServiceBuilder.setHost(cConfiguration.get("master.services.bind.address"));
        commonNettyHttpServiceBuilder.setPort(cConfiguration.getInt("dataset.service.bind.port"));
        commonNettyHttpServiceBuilder.setConnectionBacklog(cConfiguration.getInt("dataset.service.connection.backlog", 20000));
        commonNettyHttpServiceBuilder.setExecThreadPoolSize(cConfiguration.getInt("dataset.service.exec.threads", 10));
        commonNettyHttpServiceBuilder.setBossThreadPoolSize(cConfiguration.getInt("dataset.service.boss.threads", 1));
        commonNettyHttpServiceBuilder.setWorkerThreadPoolSize(cConfiguration.getInt("dataset.service.worker.threads", 4));
        this.httpService = commonNettyHttpServiceBuilder.build();
        this.discoveryService = discoveryService;
        this.discoveryServiceClient = discoveryServiceClient;
        this.opExecutorClient = datasetOpExecutor;
        this.metricReporters = set;
    }

    protected void startUp() throws Exception {
        LOG.info("Starting DatasetService...");
        this.typeService.startAndWait();
        this.opExecutorClient.startAndWait();
        this.httpService.startAndWait();
        ServiceDiscovered discover = this.discoveryServiceClient.discover("dataset.executor");
        this.opExecutorDiscovered = SettableFuture.create();
        this.opExecutorServiceWatch = discover.watchChanges(new ServiceDiscovered.ChangeListener() { // from class: co.cask.cdap.data2.datafabric.dataset.service.DatasetService.1
            public void onChange(ServiceDiscovered serviceDiscovered) {
                if (Iterables.isEmpty(serviceDiscovered)) {
                    return;
                }
                DatasetService.LOG.info("Discovered {} service", "dataset.executor");
                DatasetService.this.opExecutorDiscovered.set(serviceDiscovered);
            }
        }, MoreExecutors.sameThreadExecutor());
        Iterator<DatasetMetricsReporter> it = this.metricReporters.iterator();
        while (it.hasNext()) {
            it.next().start();
        }
    }

    protected String getServiceName() {
        return "DatasetService";
    }

    protected void run() throws Exception {
        waitForOpExecutorToStart();
        final InetSocketAddress inetSocketAddress = new InetSocketAddress(this.cConf.get("master.services.announce.address", this.httpService.getBindAddress().getHostName()), this.cConf.getInt("dataset.service.announce.port", this.httpService.getBindAddress().getPort()));
        LOG.info("Announcing DatasetService for discovery...");
        this.cancelDiscovery = this.discoveryService.register(ResolvingDiscoverable.of(new Discoverable() { // from class: co.cask.cdap.data2.datafabric.dataset.service.DatasetService.2
            public String getName() {
                return "dataset.service";
            }

            public InetSocketAddress getSocketAddress() {
                return inetSocketAddress;
            }
        }));
        LOG.info("DatasetService started successfully on {}", inetSocketAddress);
        while (isRunning()) {
            try {
                TimeUnit.SECONDS.sleep(1L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
        }
    }

    private void waitForOpExecutorToStart() throws Exception {
        LOG.info("Waiting for {} service to be discoverable", "dataset.executor");
        while (!this.stopping) {
            try {
                this.opExecutorDiscovered.get(1L, TimeUnit.SECONDS);
                this.opExecutorServiceWatch.cancel();
                return;
            } catch (InterruptedException e) {
                LOG.warn("Got interrupted while waiting for service {}", "dataset.executor");
                Thread.currentThread().interrupt();
                this.opExecutorServiceWatch.cancel();
                return;
            } catch (ExecutionException e2) {
                LOG.error("Error during discovering service {}, DatasetService start failed", "dataset.executor");
                this.opExecutorServiceWatch.cancel();
                throw e2;
            } catch (TimeoutException e3) {
            }
        }
    }

    protected void triggerShutdown() {
        this.stopping = true;
        super.triggerShutdown();
    }

    protected void shutDown() throws Exception {
        LOG.info("Stopping DatasetService...");
        Iterator<DatasetMetricsReporter> it = this.metricReporters.iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        if (this.opExecutorServiceWatch != null) {
            this.opExecutorServiceWatch.cancel();
        }
        this.typeService.stopAndWait();
        if (this.cancelDiscovery != null) {
            this.cancelDiscovery.cancel();
        }
        try {
            TimeUnit.SECONDS.sleep(3L);
        } catch (InterruptedException e) {
            LOG.error("Interrupted while waiting...", e);
        }
        this.httpService.stopAndWait();
        this.opExecutorClient.stopAndWait();
    }

    public String toString() {
        return Objects.toStringHelper(this).add("bindAddress", this.httpService.getBindAddress()).toString();
    }

    protected Executor executor() {
        final String simpleName = getClass().getSimpleName();
        return new Executor() { // from class: co.cask.cdap.data2.datafabric.dataset.service.DatasetService.3
            @Override // java.util.concurrent.Executor
            public void execute(Runnable runnable) {
                Thread thread = new Thread(runnable, simpleName);
                thread.setUncaughtExceptionHandler(UncaughtExceptionIdleService.newHandler(DatasetService.LOG));
                thread.start();
            }
        };
    }
}
