package org.apache.pinot.tools;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.tools.Quickstart;
import org.apache.pinot.tools.admin.PinotAdministrator;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
import org.apache.pinot.tools.streams.AirlineDataStream;
import org.apache.pinot.tools.utils.KafkaStarterUtils;

/* loaded from: input_file:org/apache/pinot/tools/HybridQuickstart.class */
public class HybridQuickstart extends QuickStartBase {
    private StreamDataServerStartable _kafkaStarter;
    private ZkStarter.ZookeeperInstance _zookeeperInstance;
    private File _schemaFile;
    private File _realtimeTableConfigFile;
    private File _dataFile;
    private File _ingestionJobSpecFile;

    public static void main(String[] strArr) throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(Arrays.asList("QuickStart", "-type", "HYBRID"));
        arrayList.addAll(Arrays.asList(strArr));
        PinotAdministrator.main((String[]) arrayList.toArray(new String[arrayList.size()]));
    }

    private QuickstartTableRequest prepareTableRequest(File file) throws IOException {
        this._schemaFile = new File(file, "airlineStats_schema.json");
        this._ingestionJobSpecFile = new File(file, "ingestionJobSpec.yaml");
        File file2 = new File(file, "airlineStats_offline_table_config.json");
        ClassLoader classLoader = Quickstart.class.getClassLoader();
        URL resource = classLoader.getResource("examples/batch/airlineStats/airlineStats_schema.json");
        Preconditions.checkNotNull(resource);
        FileUtils.copyURLToFile(resource, this._schemaFile);
        URL resource2 = classLoader.getResource("examples/batch/airlineStats/ingestionJobSpec.yaml");
        Preconditions.checkNotNull(resource2);
        FileUtils.copyURLToFile(resource2, this._ingestionJobSpecFile);
        URL resource3 = classLoader.getResource("examples/batch/airlineStats/airlineStats_offline_table_config.json");
        Preconditions.checkNotNull(resource3);
        FileUtils.copyURLToFile(resource3, file2);
        this._realtimeTableConfigFile = new File(file, "airlineStats_realtime_table_config.json");
        URL resource4 = Quickstart.class.getClassLoader().getResource("examples/stream/airlineStats/airlineStats_realtime_table_config.json");
        Preconditions.checkNotNull(resource4);
        FileUtils.copyURLToFile(resource4, this._realtimeTableConfigFile);
        URL resource5 = Quickstart.class.getClassLoader().getResource("examples/stream/airlineStats/sample_data/airlineStats_data.avro");
        Preconditions.checkNotNull(resource5);
        this._dataFile = new File(file, "airlineStats_data.avro");
        FileUtils.copyURLToFile(resource5, this._dataFile);
        return new QuickstartTableRequest(file.getAbsolutePath());
    }

    private void startKafka() {
        this._zookeeperInstance = ZkStarter.startLocalZkServer();
        try {
            this._kafkaStarter = StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, KafkaStarterUtils.getDefaultKafkaConfiguration(this._zookeeperInstance));
            this._kafkaStarter.start();
            this._kafkaStarter.createTopic("flights-realtime", KafkaStarterUtils.getTopicCreationProps(10));
        } catch (Exception e) {
            throw new RuntimeException("Failed to start " + KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
        }
    }

    @Override // org.apache.pinot.tools.QuickStartBase
    public void execute() throws Exception {
        File file = new File(this._tmpDir, String.valueOf(System.currentTimeMillis()));
        File file2 = new File(file, "airlineStats");
        File file3 = new File(file2, "data");
        Preconditions.checkState(file3.mkdirs());
        QuickstartRunner quickstartRunner = new QuickstartRunner(Lists.newArrayList(new QuickstartTableRequest[]{prepareTableRequest(file2)}), 1, 1, 1, file3);
        Quickstart.printStatus(Quickstart.Color.YELLOW, "***** Starting Kafka  *****");
        startKafka();
        Quickstart.printStatus(Quickstart.Color.YELLOW, "***** Starting airline data stream and publishing to Kafka *****");
        AirlineDataStream airlineDataStream = new AirlineDataStream(Schema.fromFile(this._schemaFile), (TableConfig) JsonUtils.fileToObject(this._realtimeTableConfigFile, TableConfig.class), this._dataFile);
        airlineDataStream.run();
        Quickstart.printStatus(Quickstart.Color.YELLOW, "***** Starting Zookeeper, 1 servers, 1 brokers and 1 controller *****");
        quickstartRunner.startAll();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                Quickstart.printStatus(Quickstart.Color.GREEN, "***** Shutting down hybrid quick start *****");
                quickstartRunner.stop();
                airlineDataStream.shutdown();
                this._kafkaStarter.stop();
                ZkStarter.stopLocalZkServer(this._zookeeperInstance);
                FileUtils.deleteDirectory(file);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }));
        Quickstart.printStatus(Quickstart.Color.YELLOW, "***** Bootstrap airlineStats offline and realtime table *****");
        quickstartRunner.bootstrapTable();
        Quickstart.printStatus(Quickstart.Color.YELLOW, "***** Pinot Hybrid with hybrid table setup is complete *****");
        Quickstart.printStatus(Quickstart.Color.YELLOW, "***** Sequence of operations *****");
        Quickstart.printStatus(Quickstart.Color.YELLOW, "*****    1. Started 1 controller instance where tenant creation is enabled *****");
        Quickstart.printStatus(Quickstart.Color.YELLOW, "*****    2. Started 2 servers and 2 brokers *****");
        Quickstart.printStatus(Quickstart.Color.YELLOW, "*****    3. Created a server tenant with 1 offline and 1 realtime instance *****");
        Quickstart.printStatus(Quickstart.Color.YELLOW, "*****    4. Created a broker tenant with 2 instances *****");
        Quickstart.printStatus(Quickstart.Color.YELLOW, "*****    5. Added a schema *****");
        Quickstart.printStatus(Quickstart.Color.YELLOW, "*****    6. Created an offline and a realtime table with the tenant names created above *****");
        Quickstart.printStatus(Quickstart.Color.YELLOW, "*****    7. Built and pushed an offline segment *****");
        Quickstart.printStatus(Quickstart.Color.YELLOW, "*****    8. Started publishing a Kafka stream for the realtime instance to start consuming *****");
        Quickstart.printStatus(Quickstart.Color.YELLOW, "*****    9. Sleep 5 Seconds to wait for all components brought up *****");
        Thread.sleep(5000L);
        Quickstart.printStatus(Quickstart.Color.YELLOW, "Total number of documents in the table");
        Quickstart.printStatus(Quickstart.Color.CYAN, "Query : " + "select count(*) from airlineStats limit 1");
        Quickstart.printStatus(Quickstart.Color.YELLOW, Quickstart.prettyPrintResponse(quickstartRunner.runQuery("select count(*) from airlineStats limit 1")));
        Quickstart.printStatus(Quickstart.Color.GREEN, "***************************************************");
        Quickstart.printStatus(Quickstart.Color.YELLOW, "Top 5 airlines in cancellation ");
        Quickstart.printStatus(Quickstart.Color.CYAN, "Query : " + "select AirlineID, sum(Cancelled) from airlineStats group by AirlineID order by sum(Cancelled) desc limit 5");
        Quickstart.printStatus(Quickstart.Color.YELLOW, Quickstart.prettyPrintResponse(quickstartRunner.runQuery("select AirlineID, sum(Cancelled) from airlineStats group by AirlineID order by sum(Cancelled) desc limit 5")));
        Quickstart.printStatus(Quickstart.Color.GREEN, "***************************************************");
        Quickstart.printStatus(Quickstart.Color.YELLOW, "Top 5 airlines in number of flights after 2010");
        Quickstart.printStatus(Quickstart.Color.CYAN, "Query : " + "select AirlineID, Year, sum(Flights) from airlineStats where Year > 2010 group by AirlineID, Year order by sum(Flights) desc limit 5");
        Quickstart.printStatus(Quickstart.Color.YELLOW, Quickstart.prettyPrintResponse(quickstartRunner.runQuery("select AirlineID, Year, sum(Flights) from airlineStats where Year > 2010 group by AirlineID, Year order by sum(Flights) desc limit 5")));
        Quickstart.printStatus(Quickstart.Color.GREEN, "***************************************************");
        Quickstart.printStatus(Quickstart.Color.YELLOW, "Top 5 cities for number of flights");
        Quickstart.printStatus(Quickstart.Color.CYAN, "Query : " + "select OriginCityName, max(Flights) from airlineStats group by OriginCityName order by max(Flights) desc limit 5");
        Quickstart.printStatus(Quickstart.Color.YELLOW, Quickstart.prettyPrintResponse(quickstartRunner.runQuery("select OriginCityName, max(Flights) from airlineStats group by OriginCityName order by max(Flights) desc limit 5")));
        Quickstart.printStatus(Quickstart.Color.GREEN, "***************************************************");
        Quickstart.printStatus(Quickstart.Color.YELLOW, "Print AirlineID, OriginCityName, DestCityName, Year for 5 records ordered by Year");
        Quickstart.printStatus(Quickstart.Color.CYAN, "Query : " + "select AirlineID, OriginCityName, DestCityName, Year from airlineStats order by Year limit 5");
        Quickstart.printStatus(Quickstart.Color.YELLOW, Quickstart.prettyPrintResponse(quickstartRunner.runQuery("select AirlineID, OriginCityName, DestCityName, Year from airlineStats order by Year limit 5")));
        Quickstart.printStatus(Quickstart.Color.GREEN, "***************************************************");
        Quickstart.printStatus(Quickstart.Color.GREEN, "You can always go to http://localhost:9000 to play around in the query console");
    }
}
