package gobblin.metrics.reporter;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.Timer;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.MoreExecutors;
import gobblin.metrics.GobblinTrackingEvent;
import gobblin.metrics.MetricContext;
import gobblin.metrics.notification.EventNotification;
import gobblin.metrics.notification.Notification;
import gobblin.util.ExecutorsUtils;
import java.io.Closeable;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/metrics/reporter/EventReporter.class */
public abstract class EventReporter extends com.codahale.metrics.ScheduledReporter implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(EventReporter.class);
    private static final Logger LOGGER = LoggerFactory.getLogger(EventReporter.class);
    private static final int QUEUE_CAPACITY = 100;
    private static final String NULL_STRING = "null";
    private final MetricContext metricContext;
    private final BlockingQueue<GobblinTrackingEvent> reportingQueue;
    private final ExecutorService immediateReportExecutor;
    private final UUID notificationTargetKey;
    protected final Closer closer;

    /* loaded from: input_file:gobblin/metrics/reporter/EventReporter$Builder.class */
    public static abstract class Builder<T extends Builder<T>> {
        protected MetricContext context;
        protected String name = "MetricReportReporter";
        protected TimeUnit rateUnit = TimeUnit.SECONDS;
        protected TimeUnit durationUnit = TimeUnit.MILLISECONDS;
        protected MetricFilter filter = MetricFilter.ALL;

        /* JADX INFO: Access modifiers changed from: protected */
        public Builder(MetricContext metricContext) {
            this.context = metricContext;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public abstract T self();
    }

    public EventReporter(Builder builder) {
        super(builder.context, builder.name, builder.filter, builder.rateUnit, builder.durationUnit);
        this.closer = Closer.create();
        this.immediateReportExecutor = MoreExecutors.getExitingExecutorService((ThreadPoolExecutor) Executors.newFixedThreadPool(1, ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("EventReporter-" + builder.name + "-%d"))), 5L, TimeUnit.MINUTES);
        this.metricContext = builder.context;
        this.notificationTargetKey = builder.context.addNotificationTarget(new Function<Notification, Void>() { // from class: gobblin.metrics.reporter.EventReporter.1
            @Nullable
            public Void apply(Notification notification) {
                EventReporter.this.notificationCallback(notification);
                return null;
            }
        });
        this.reportingQueue = Queues.newLinkedBlockingQueue(QUEUE_CAPACITY);
    }

    public void notificationCallback(Notification notification) {
        if (notification instanceof EventNotification) {
            addEventToReportingQueue(((EventNotification) notification).getEvent());
        }
    }

    public void addEventToReportingQueue(GobblinTrackingEvent gobblinTrackingEvent) {
        if (this.reportingQueue.size() > 66) {
            immediatelyScheduleReport();
        }
        try {
            if (!this.reportingQueue.offer(sanitizeEvent(gobblinTrackingEvent), 10L, TimeUnit.SECONDS)) {
                log.error("Enqueuing of event %s at reporter with class %s timed out. Sending of events is probably stuck.", gobblinTrackingEvent, getClass().getCanonicalName());
            }
        } catch (InterruptedException e) {
            log.warn(String.format("Enqueuing of event %s at reporter with class %s was interrupted.", gobblinTrackingEvent, getClass().getCanonicalName()), e);
        }
    }

    public void report() {
        reportEventQueue(this.reportingQueue);
    }

    public abstract void reportEventQueue(Queue<GobblinTrackingEvent> queue);

    public final void report(SortedMap<String, Gauge> sortedMap, SortedMap<String, Counter> sortedMap2, SortedMap<String, Histogram> sortedMap3, SortedMap<String, Meter> sortedMap4, SortedMap<String, Timer> sortedMap5) {
    }

    private void immediatelyScheduleReport() {
        this.immediateReportExecutor.submit(new Runnable() { // from class: gobblin.metrics.reporter.EventReporter.2
            @Override // java.lang.Runnable
            public void run() {
                EventReporter.this.report();
            }
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        try {
            this.metricContext.removeNotificationTarget(this.notificationTargetKey);
            report();
            this.closer.close();
        } catch (Exception e) {
            LOGGER.warn("Exception when closing EventReporter", e);
        } finally {
            super.close();
        }
    }

    private GobblinTrackingEvent sanitizeEvent(GobblinTrackingEvent gobblinTrackingEvent) {
        HashMap newHashMap = Maps.newHashMap();
        for (Map.Entry<String, String> entry : gobblinTrackingEvent.getMetadata().entrySet()) {
            newHashMap.put(entry.getKey() == null ? NULL_STRING : entry.getKey(), entry.getValue() == null ? NULL_STRING : entry.getValue());
        }
        gobblinTrackingEvent.setMetadata(newHashMap);
        return gobblinTrackingEvent;
    }
}
