package org.apache.pinot.tools;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.File;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.tools.Quickstart;
import org.apache.pinot.tools.admin.PinotAdministrator;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
import org.apache.pinot.tools.utils.KafkaStarterUtils;

/* loaded from: input_file:org/apache/pinot/tools/RealtimeQuickStartWithMinion.class */
public class RealtimeQuickStartWithMinion extends QuickStartBase {
    private StreamDataServerStartable _kafkaStarter;

    public static void main(String[] strArr) throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(Arrays.asList("QuickStart", "-type", "REALTIME-MINION"));
        arrayList.addAll(Arrays.asList(strArr));
        PinotAdministrator.main((String[]) arrayList.toArray(new String[arrayList.size()]));
    }

    public Map<String, Object> getConfigOverrides() {
        HashMap hashMap = new HashMap();
        hashMap.put("controller.task.scheduler.enabled", true);
        return hashMap;
    }

    @Override // org.apache.pinot.tools.QuickStartBase
    public void execute() throws Exception {
        File file = new File(this._tmpDir, String.valueOf(System.currentTimeMillis()));
        File file2 = new File(file, "githubEvents");
        File file3 = new File(file2, "rawdata");
        Preconditions.checkState(file3.mkdirs());
        File file4 = new File(file2, "githubEvents_schema.json");
        File file5 = new File(file2, "githubEvents_realtime_table_config.json");
        File file6 = new File(file2, "githubEvents_offline_table_config.json");
        File file7 = new File(file2, "2021-07-21-few-hours.json");
        ClassLoader classLoader = Quickstart.class.getClassLoader();
        URL resource = classLoader.getResource("examples/minions/stream/githubEvents/githubEvents_schema.json");
        Preconditions.checkNotNull(resource);
        FileUtils.copyURLToFile(resource, file4);
        URL resource2 = classLoader.getResource("examples/minions/stream/githubEvents/githubEvents_realtime_table_config.json");
        Preconditions.checkNotNull(resource2);
        FileUtils.copyURLToFile(resource2, file5);
        URL resource3 = classLoader.getResource("examples/minions/stream/githubEvents/githubEvents_offline_table_config.json");
        Preconditions.checkNotNull(resource3);
        FileUtils.copyURLToFile(resource3, file6);
        URL resource4 = Quickstart.class.getClassLoader().getResource("examples/minions/stream/githubEvents/rawdata/2021-07-21-few-hours.json");
        Preconditions.checkNotNull(resource4);
        FileUtils.copyURLToFile(resource4, file7);
        QuickstartRunner quickstartRunner = new QuickstartRunner(Lists.newArrayList(new QuickstartTableRequest[]{new QuickstartTableRequest(file2.getAbsolutePath())}), 1, 1, 1, 1, file3, true, null, getConfigOverrides());
        Quickstart.printStatus(Quickstart.Color.CYAN, "***** Starting Kafka *****");
        ZkStarter.ZookeeperInstance startLocalZkServer = ZkStarter.startLocalZkServer();
        try {
            this._kafkaStarter = StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, KafkaStarterUtils.getDefaultKafkaConfiguration(startLocalZkServer));
            this._kafkaStarter.start();
            Quickstart.printStatus(Quickstart.Color.CYAN, "***** Starting Zookeeper, controller, server and broker *****");
            quickstartRunner.startAll();
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                try {
                    Quickstart.printStatus(Quickstart.Color.GREEN, "***** Shutting down realtime-minion quick start *****");
                    quickstartRunner.stop();
                    this._kafkaStarter.stop();
                    ZkStarter.stopLocalZkServer(startLocalZkServer);
                    FileUtils.deleteDirectory(file);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }));
            Quickstart.printStatus(Quickstart.Color.CYAN, "***** Sending events to Kafka *****");
            this._kafkaStarter.createTopic("githubEvents", KafkaStarterUtils.getTopicCreationProps(2));
            publishGithubEventsToKafka("githubEvents", file7);
            Quickstart.printStatus(Quickstart.Color.CYAN, "***** Bootstrap githubEvents tables *****");
            quickstartRunner.bootstrapTable();
            Quickstart.printStatus(Quickstart.Color.CYAN, "***** Waiting for 5 seconds for a few events to get populated *****");
            Thread.sleep(5000L);
            Quickstart.printStatus(Quickstart.Color.YELLOW, "***** Realtime-minion quickstart setup complete *****");
            Quickstart.printStatus(Quickstart.Color.YELLOW, "Current number of documents in the table");
            Quickstart.printStatus(Quickstart.Color.CYAN, "Query : " + "select count(*) from githubEvents limit 1");
            Quickstart.printStatus(Quickstart.Color.YELLOW, Quickstart.prettyPrintResponse(quickstartRunner.runQuery("select count(*) from githubEvents limit 1")));
            Quickstart.printStatus(Quickstart.Color.GREEN, "***************************************************");
            Quickstart.printStatus(Quickstart.Color.GREEN, "You can always go to http://localhost:9000 to play around in the query console");
            Quickstart.printStatus(Quickstart.Color.GREEN, "In particular, you will find that OFFLINE table gets segments from REALTIME table;");
            Quickstart.printStatus(Quickstart.Color.GREEN, "and segments in OFFLINE table get merged into larger ones within a few minutes.");
        } catch (Exception e) {
            throw new RuntimeException("Failed to start " + KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
        }
    }

    private static void publishGithubEventsToKafka(String str, File file) throws Exception {
        Properties properties = new Properties();
        properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
        properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
        properties.put("request.required.acks", "1");
        StreamDataProducer streamDataProducer = StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
        try {
            LineIterator lineIterator = FileUtils.lineIterator(file);
            while (lineIterator.hasNext()) {
                streamDataProducer.produce(str, lineIterator.nextLine().getBytes(StandardCharsets.UTF_8));
            }
        } finally {
            streamDataProducer.close();
        }
    }
}
