package gobblin.metrics.kafka;

import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import gobblin.metrics.MetricReport;
import gobblin.metrics.reporter.MetricReportReporter;
import gobblin.metrics.reporter.util.AvroJsonSerializer;
import gobblin.metrics.reporter.util.AvroSerializer;
import gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
import gobblin.metrics.reporter.util.SchemaVersionWriter;
import gobblin.util.ClassAliasResolver;
import gobblin.util.ConfigUtils;
import java.io.IOException;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/metrics/kafka/KafkaReporter.class */
public class KafkaReporter extends MetricReportReporter {
    private static final Logger log = LoggerFactory.getLogger(KafkaReporter.class);
    public static final String SCHEMA_VERSION_WRITER_TYPE = "metrics.kafka.schemaVersionWriterType";
    protected final AvroSerializer<MetricReport> serializer;
    protected final KafkaPusher kafkaPusher;

    /* loaded from: input_file:gobblin/metrics/kafka/KafkaReporter$Builder.class */
    public static abstract class Builder<T extends MetricReportReporter.Builder<T>> extends MetricReportReporter.Builder<T> {
        protected String brokers;
        protected String topic;
        protected Optional<KafkaPusher> kafkaPusher;

        /* JADX INFO: Access modifiers changed from: protected */
        public Builder() {
            this.name = "KafkaReporter";
            this.kafkaPusher = Optional.absent();
        }

        public T withKafkaPusher(KafkaPusher kafkaPusher) {
            this.kafkaPusher = Optional.of(kafkaPusher);
            return (T) self();
        }

        public KafkaReporter build(String str, String str2, Properties properties) throws IOException {
            this.brokers = str;
            this.topic = str2;
            return new KafkaReporter(this, ConfigUtils.propertiesToConfig(properties, Optional.of("metrics.")));
        }
    }

    /* loaded from: input_file:gobblin/metrics/kafka/KafkaReporter$BuilderFactory.class */
    public static class BuilderFactory {
        public static BuilderImpl newBuilder() {
            return new BuilderImpl();
        }
    }

    /* loaded from: input_file:gobblin/metrics/kafka/KafkaReporter$BuilderImpl.class */
    public static class BuilderImpl extends Builder<BuilderImpl> {
        /* JADX INFO: Access modifiers changed from: protected */
        @Override // gobblin.metrics.reporter.ConfiguredScheduledReporter.Builder
        public BuilderImpl self() {
            return this;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaReporter(Builder<?> builder, Config config) throws IOException {
        super(builder, config);
        SchemaVersionWriter schemaVersionWriter;
        if (config.hasPath(SCHEMA_VERSION_WRITER_TYPE)) {
            try {
                schemaVersionWriter = (SchemaVersionWriter) new ClassAliasResolver(SchemaVersionWriter.class).resolveClass(config.getString(SCHEMA_VERSION_WRITER_TYPE)).newInstance();
            } catch (ReflectiveOperationException e) {
                throw new IOException("Could not instantiate version writer.", e);
            }
        } else {
            schemaVersionWriter = new FixedSchemaVersionWriter();
        }
        log.info("Schema version writer: " + schemaVersionWriter.getClass().getName());
        this.serializer = (AvroSerializer) this.closer.register(createSerializer(schemaVersionWriter));
        if (builder.kafkaPusher.isPresent()) {
            this.kafkaPusher = (KafkaPusher) builder.kafkaPusher.get();
        } else {
            this.kafkaPusher = (KafkaPusher) this.closer.register(new KafkaPusher(builder.brokers, builder.topic));
        }
    }

    protected AvroSerializer<MetricReport> createSerializer(SchemaVersionWriter schemaVersionWriter) throws IOException {
        return new AvroJsonSerializer(MetricReport.SCHEMA$, schemaVersionWriter);
    }

    /* JADX WARN: Type inference failed for: r1v1, types: [byte[], java.lang.Object[]] */
    @Override // gobblin.metrics.reporter.MetricReportReporter
    protected void emitReport(MetricReport metricReport) {
        this.kafkaPusher.pushMessages(Lists.newArrayList((Object[]) new byte[]{this.serializer.serializeRecord(metricReport)}));
    }
}
