package gobblin.metrics.kafka;

import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import gobblin.metrics.GobblinTrackingEvent;
import gobblin.metrics.MetricContext;
import gobblin.metrics.reporter.EventReporter;
import gobblin.metrics.reporter.util.AvroJsonSerializer;
import gobblin.metrics.reporter.util.AvroSerializer;
import gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
import gobblin.metrics.reporter.util.SchemaVersionWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Queue;

/* loaded from: input_file:gobblin/metrics/kafka/KafkaEventReporter.class */
public class KafkaEventReporter extends EventReporter {
    protected final AvroSerializer<GobblinTrackingEvent> serializer;
    private final KafkaPusher kafkaPusher;

    /* loaded from: input_file:gobblin/metrics/kafka/KafkaEventReporter$Builder.class */
    public static abstract class Builder<T extends EventReporter.Builder<T>> extends EventReporter.Builder<T> {
        protected String brokers;
        protected String topic;
        protected Optional<KafkaPusher> kafkaPusher;

        /* JADX INFO: Access modifiers changed from: protected */
        public Builder(MetricContext metricContext) {
            super(metricContext);
            this.kafkaPusher = Optional.absent();
        }

        public T withKafkaPusher(KafkaPusher kafkaPusher) {
            this.kafkaPusher = Optional.of(kafkaPusher);
            return self();
        }

        public KafkaEventReporter build(String str, String str2) throws IOException {
            this.brokers = str;
            this.topic = str2;
            return new KafkaEventReporter(this);
        }
    }

    /* loaded from: input_file:gobblin/metrics/kafka/KafkaEventReporter$BuilderImpl.class */
    public static class BuilderImpl extends Builder<BuilderImpl> {
        private BuilderImpl(MetricContext metricContext) {
            super(metricContext);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // gobblin.metrics.reporter.EventReporter.Builder
        public BuilderImpl self() {
            return this;
        }
    }

    /* loaded from: input_file:gobblin/metrics/kafka/KafkaEventReporter$Factory.class */
    public static class Factory {
        public static BuilderImpl forContext(MetricContext metricContext) {
            return new BuilderImpl(metricContext);
        }
    }

    public KafkaEventReporter(Builder<?> builder) throws IOException {
        super(builder);
        this.serializer = (AvroSerializer) this.closer.register(createSerializer(new FixedSchemaVersionWriter()));
        if (builder.kafkaPusher.isPresent()) {
            this.kafkaPusher = (KafkaPusher) builder.kafkaPusher.get();
        } else {
            this.kafkaPusher = (KafkaPusher) this.closer.register(new KafkaPusher(builder.brokers, builder.topic));
        }
    }

    @Override // gobblin.metrics.reporter.EventReporter
    public void reportEventQueue(Queue<GobblinTrackingEvent> queue) {
        ArrayList newArrayList = Lists.newArrayList();
        while (true) {
            GobblinTrackingEvent poll = queue.poll();
            if (null == poll) {
                break;
            } else {
                newArrayList.add(this.serializer.serializeRecord(poll));
            }
        }
        if (newArrayList.isEmpty()) {
            return;
        }
        this.kafkaPusher.pushMessages(newArrayList);
    }

    protected AvroSerializer<GobblinTrackingEvent> createSerializer(SchemaVersionWriter schemaVersionWriter) throws IOException {
        return new AvroJsonSerializer(GobblinTrackingEvent.SCHEMA$, schemaVersionWriter);
    }

    @Deprecated
    public static Builder<? extends Builder> forContext(MetricContext metricContext) {
        return new BuilderImpl(metricContext);
    }
}
