package org.apache.pinot.tools.streams;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.tools.Quickstart;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/tools/streams/AirlineDataStream.class */
public class AirlineDataStream {
    private static final Logger logger = LoggerFactory.getLogger(AirlineDataStream.class);
    Schema _pinotSchema;
    String _timeColumnName;
    File _avroFile;
    DataFileStream<GenericRecord> _avroDataStream;
    ExecutorService _service;
    private StreamDataProducer _producer;
    Integer _currentTimeValue = 16102;
    boolean _keepIndexing = true;
    int _counter = 0;

    public AirlineDataStream(Schema schema, TableConfig tableConfig, File file) throws Exception {
        this._pinotSchema = schema;
        this._timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
        this._avroFile = file;
        createStream();
        Properties properties = new Properties();
        properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
        properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
        properties.put("request.required.acks", "1");
        this._producer = StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
        this._service = Executors.newFixedThreadPool(1);
        Quickstart.printStatus(Quickstart.Color.YELLOW, "***** Offine data has max time as 16101, realtime will start consuming from time 16102 and increment time every 60 events (which is approximately 60 seconds) *****");
    }

    public void shutdown() {
        this._keepIndexing = false;
        this._avroDataStream = null;
        this._producer.close();
        this._producer = null;
        this._service.shutdown();
    }

    private void createStream() throws IOException {
        if (this._keepIndexing) {
            this._avroDataStream = new DataFileStream<>(new FileInputStream(this._avroFile), new GenericDatumReader());
        } else {
            this._avroDataStream = null;
        }
    }

    private void publish(GenericRecord genericRecord) throws IOException {
        if (this._keepIndexing) {
            this._producer.produce("flights-realtime", genericRecord.toString().getBytes("UTF-8"));
        } else {
            this._avroDataStream.close();
            this._avroDataStream = null;
        }
    }

    public void run() {
        this._service.submit(new Runnable() { // from class: org.apache.pinot.tools.streams.AirlineDataStream.1
            @Override // java.lang.Runnable
            public void run() {
                while (true) {
                    if (!AirlineDataStream.this._avroDataStream.hasNext()) {
                        try {
                            AirlineDataStream.this._avroDataStream.close();
                        } catch (IOException e) {
                            AirlineDataStream.logger.error(e.getMessage());
                        }
                        try {
                            AirlineDataStream.this.createStream();
                        } catch (IOException e2) {
                            AirlineDataStream.logger.error(e2.getMessage());
                        }
                    } else {
                        if (!AirlineDataStream.this._keepIndexing) {
                            return;
                        }
                        GenericRecord genericRecord = (GenericRecord) AirlineDataStream.this._avroDataStream.next();
                        GenericRecord record = new GenericData.Record(AvroUtils.getAvroSchemaFromPinotSchema(AirlineDataStream.this._pinotSchema));
                        for (FieldSpec fieldSpec : AirlineDataStream.this._pinotSchema.getDimensionFieldSpecs()) {
                            record.put(fieldSpec.getName(), genericRecord.get(fieldSpec.getName()));
                        }
                        for (FieldSpec fieldSpec2 : AirlineDataStream.this._pinotSchema.getMetricFieldSpecs()) {
                            record.put(fieldSpec2.getName(), genericRecord.get(fieldSpec2.getName()));
                        }
                        record.put(AirlineDataStream.this._timeColumnName, AirlineDataStream.this._currentTimeValue);
                        try {
                            AirlineDataStream.this.publish(record);
                            AirlineDataStream.this._counter++;
                            if (AirlineDataStream.this._counter % 60 == 0) {
                                AirlineDataStream.this._currentTimeValue = Integer.valueOf(AirlineDataStream.this._currentTimeValue.intValue() + 1);
                            }
                            Thread.sleep(1000L);
                        } catch (Exception e3) {
                            AirlineDataStream.logger.error(e3.getMessage());
                        }
                    }
                }
            }
        });
    }
}
