package org.apache.pinot.tools;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.File;
import java.net.URL;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.tools.Quickstart;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
import org.apache.pinot.tools.streams.githubevents.PullRequestMergedEventsStream;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/tools/GitHubEventsQuickstart.class */
public class GitHubEventsQuickstart {
    private static final Logger LOGGER = LoggerFactory.getLogger(GitHubEventsQuickstart.class);
    private StreamDataServerStartable _kafkaStarter;
    private ZkStarter.ZookeeperInstance _zookeeperInstance;

    private void startKafka() {
        this._zookeeperInstance = ZkStarter.startLocalZkServer();
        try {
            this._kafkaStarter = StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, KafkaStarterUtils.getDefaultKafkaConfiguration(this._zookeeperInstance));
            this._kafkaStarter.start();
            this._kafkaStarter.createTopic("pullRequestMergedEvents", KafkaStarterUtils.getTopicCreationProps(2));
        } catch (Exception e) {
            throw new RuntimeException("Failed to start " + KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
        }
    }

    public void execute(String str) throws Exception {
        File file = new File(new File("githubEvents-" + System.currentTimeMillis()), "pullRequestMergedEvents");
        if (!file.exists()) {
            Preconditions.checkState(file.mkdirs());
        }
        File file2 = new File(file, "pullRequestMergedEvents_schema.json");
        File file3 = new File(file, "pullRequestMergedEvents_realtime_table_config.json");
        ClassLoader classLoader = Quickstart.class.getClassLoader();
        URL resource = classLoader.getResource("examples/stream/githubEvents/pullRequestMergedEvents_schema.json");
        Preconditions.checkNotNull(resource);
        FileUtils.copyURLToFile(resource, file2);
        URL resource2 = classLoader.getResource("examples/stream/githubEvents/pullRequestMergedEvents_realtime_table_config.json");
        Preconditions.checkNotNull(resource2);
        FileUtils.copyURLToFile(resource2, file3);
        File file4 = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis()));
        Preconditions.checkState(file4.mkdirs());
        QuickstartRunner quickstartRunner = new QuickstartRunner(Lists.newArrayList(new QuickstartTableRequest[]{new QuickstartTableRequest(file.getAbsolutePath())}), 1, 1, 1, file4);
        Quickstart.printStatus(Quickstart.Color.CYAN, "***** Starting Kafka *****");
        startKafka();
        Quickstart.printStatus(Quickstart.Color.CYAN, "***** Starting zookeeper, controller, server and broker *****");
        quickstartRunner.startAll();
        Quickstart.printStatus(Quickstart.Color.CYAN, "***** Adding pullRequestMergedEvents table *****");
        quickstartRunner.bootstrapTable();
        Quickstart.printStatus(Quickstart.Color.CYAN, "***** Starting pullRequestMergedEvents data stream and publishing to Kafka *****");
        new PullRequestMergedEventsStream(file2.getAbsolutePath(), "pullRequestMergedEvents", KafkaStarterUtils.DEFAULT_KAFKA_BROKER, str).execute();
        Quickstart.printStatus(Quickstart.Color.CYAN, "***** Waiting for 10 seconds for a few events to get populated *****");
        Thread.sleep(10000L);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                Quickstart.printStatus(Quickstart.Color.GREEN, "***** Shutting down GitHubEventsQuickStart *****");
                quickstartRunner.stop();
                this._kafkaStarter.stop();
                ZkStarter.stopLocalZkServer(this._zookeeperInstance);
                FileUtils.deleteDirectory(file);
            } catch (Exception e) {
                LOGGER.error("Caught exception in shutting down GitHubEvents QuickStart", e);
            }
        }));
        Quickstart.printStatus(Quickstart.Color.YELLOW, "***** Realtime github demo quickstart setup complete *****");
        Quickstart.printStatus(Quickstart.Color.YELLOW, "Total number of documents in the table");
        Quickstart.printStatus(Quickstart.Color.CYAN, "Query : " + "select count(*) from pullRequestMergedEvents limit 0");
        Quickstart.printStatus(Quickstart.Color.YELLOW, Quickstart.prettyPrintResponse(quickstartRunner.runQuery("select count(*) from pullRequestMergedEvents limit 0")));
        Quickstart.printStatus(Quickstart.Color.GREEN, "***************************************************");
        Quickstart.printStatus(Quickstart.Color.YELLOW, "Top 10 repo with the most lines added");
        Quickstart.printStatus(Quickstart.Color.CYAN, "Query : " + "select sum(numLinesAdded) from pullRequestMergedEvents group by repo top 10 limit 0");
        Quickstart.printStatus(Quickstart.Color.YELLOW, Quickstart.prettyPrintResponse(quickstartRunner.runQuery("select sum(numLinesAdded) from pullRequestMergedEvents group by repo top 10 limit 0")));
        Quickstart.printStatus(Quickstart.Color.GREEN, "***************************************************");
        Quickstart.printStatus(Quickstart.Color.YELLOW, "Show data for COLLABORATORS");
        Quickstart.printStatus(Quickstart.Color.CYAN, "Query : " + "select * from pullRequestMergedEvents where authorAssociation = 'COLLABORATOR' limit 10");
        Quickstart.printStatus(Quickstart.Color.YELLOW, Quickstart.prettyPrintResponse(quickstartRunner.runQuery("select * from pullRequestMergedEvents where authorAssociation = 'COLLABORATOR' limit 10")));
        Quickstart.printStatus(Quickstart.Color.GREEN, "***************************************************");
        Quickstart.printStatus(Quickstart.Color.YELLOW, "Show repos with longest alive pull requests");
        Quickstart.printStatus(Quickstart.Color.CYAN, "Query : " + "select max(elapsedTimeMillis) from pullRequestMergedEvents group by repo top 10 limit 0");
        Quickstart.printStatus(Quickstart.Color.YELLOW, Quickstart.prettyPrintResponse(quickstartRunner.runQuery("select max(elapsedTimeMillis) from pullRequestMergedEvents group by repo top 10 limit 0")));
        Quickstart.printStatus(Quickstart.Color.GREEN, "***************************************************");
        Quickstart.printStatus(Quickstart.Color.YELLOW, "Total number of documents in the table");
        Quickstart.printStatus(Quickstart.Color.CYAN, "Query : " + "select count(*) from pullRequestMergedEvents");
        Quickstart.printStatus(Quickstart.Color.YELLOW, Quickstart.prettyPrintResponse(quickstartRunner.runQuery("select count(*) from pullRequestMergedEvents")));
        Quickstart.printStatus(Quickstart.Color.GREEN, "***************************************************");
        Quickstart.printStatus(Quickstart.Color.GREEN, "You can always go to http://localhost:9000 to play around in the query console");
    }
}
