package co.cask.cdap.filetailer;

import co.cask.cdap.client.StreamClient;
import co.cask.cdap.client.StreamWriter;
import co.cask.cdap.filetailer.config.ConfigurationLoaderImpl;
import co.cask.cdap.filetailer.config.PipeConfiguration;
import co.cask.cdap.filetailer.config.exception.ConfigurationLoadingException;
import co.cask.cdap.filetailer.metrics.FileTailerMetricsProcessor;
import co.cask.cdap.filetailer.queue.FileTailerQueue;
import co.cask.cdap.filetailer.sink.FileTailerSink;
import co.cask.cdap.filetailer.sink.SinkStrategy;
import co.cask.cdap.filetailer.state.FileTailerStateProcessorImpl;
import co.cask.cdap.filetailer.tailer.LogTailer;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.ServiceManager;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/filetailer/PipeManager.class */
public class PipeManager extends AbstractIdleService {
    private static final Logger LOG = LoggerFactory.getLogger(PipeManager.class);
    private final File confFile;
    private final List<Pipe> pipeList = new ArrayList();
    private final ServiceManager serviceManager = createManager();

    public PipeManager(File file) {
        this.confFile = file;
    }

    private ServiceManager setupPipes() throws IOException {
        StreamClient streamClient = null;
        StreamWriter streamWriter = null;
        try {
            try {
                for (PipeConfiguration pipeConfiguration : getPipeConfigs()) {
                    FileTailerQueue fileTailerQueue = new FileTailerQueue(pipeConfiguration.getQueueSize());
                    StreamWriter streamWriterForPipe = getStreamWriterForPipe(pipeConfiguration.getSinkConfiguration().getStreamClient(), pipeConfiguration.getSinkConfiguration().getStreamName());
                    FileTailerStateProcessorImpl fileTailerStateProcessorImpl = new FileTailerStateProcessorImpl(pipeConfiguration.getDaemonDir(), pipeConfiguration.getStateFile());
                    FileTailerMetricsProcessor fileTailerMetricsProcessor = new FileTailerMetricsProcessor(pipeConfiguration.getDaemonDir(), pipeConfiguration.getStatisticsFile(), pipeConfiguration.getStatisticsSleepInterval(), pipeConfiguration.getPipeName(), pipeConfiguration.getSourceConfiguration().getFileName());
                    this.pipeList.add(new Pipe(new LogTailer(pipeConfiguration, fileTailerQueue, fileTailerStateProcessorImpl, fileTailerMetricsProcessor, null), new FileTailerSink(fileTailerQueue, streamWriterForPipe, SinkStrategy.LOADBALANCE, fileTailerStateProcessorImpl, fileTailerMetricsProcessor, null, pipeConfiguration.getSinkConfiguration().getPackSize()), fileTailerMetricsProcessor));
                    streamClient = null;
                    streamWriter = null;
                }
                ServiceManager serviceManager = new ServiceManager(this.pipeList);
                if (streamClient != null) {
                    streamClient.close();
                }
                if (streamWriter != null) {
                    streamWriter.close();
                }
                return serviceManager;
            } catch (ConfigurationLoadingException e) {
                throw new ConfigurationLoadingException("Error during loading configuration from file: " + this.confFile.getAbsolutePath() + e.getMessage());
            }
        } catch (Throwable th) {
            if (streamClient != null) {
                streamClient.close();
            }
            if (streamWriter != null) {
                streamWriter.close();
            }
            throw th;
        }
    }

    private ServiceManager createManager() {
        try {
            return setupPipes();
        } catch (IOException e) {
            LOG.error("Error during pipes: {} setup", e);
            throw new RuntimeException("Error during pipes setup. Cannot start daemon.");
        }
    }

    private List<PipeConfiguration> getPipeConfigs() throws ConfigurationLoadingException {
        return new ConfigurationLoaderImpl().load(this.confFile).getPipeConfigurations();
    }

    private StreamWriter getStreamWriterForPipe(StreamClient streamClient, String str) throws IOException {
        try {
            streamClient.create(str);
            return streamClient.createWriter(str);
        } catch (IOException e) {
            throw new IOException("Can not create/get client stream by name:" + str + ": " + e.getMessage());
        }
    }

    public void startUp() {
        this.serviceManager.startAsync().awaitHealthy();
    }

    public void shutDown() {
        try {
            this.serviceManager.stopAsync().awaitStopped(5L, TimeUnit.SECONDS);
        } catch (TimeoutException e) {
            LOG.warn("Cannot stop pipes: {}", e);
        }
    }
}
