package com.ning.metrics.collector.hadoop.processing;

import com.google.inject.Inject;
import com.mogwee.executors.FailsafeScheduledExecutor;
import com.ning.metrics.collector.binder.config.CollectorConfig;
import com.ning.metrics.serialization.event.Event;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ning/metrics/collector/hadoop/processing/EventSpoolDispatcher.class */
public class EventSpoolDispatcher {
    private static final long CUTOFF_TIME_OLD_DIRS = 3600000;
    private final PersistentWriterFactory factory;
    private final WriterStats stats;
    private final CollectorConfig config;
    private final Logger log = LoggerFactory.getLogger(EventSpoolDispatcher.class);
    private final Map<String, LocalQueueAndWriter> queuesPerPath = new ConcurrentHashMap();
    private final Object queueMapMonitor = new Object();
    private final AtomicBoolean isRunning = new AtomicBoolean(true);

    @Inject
    public EventSpoolDispatcher(PersistentWriterFactory persistentWriterFactory, WriterStats writerStats, final CollectorConfig collectorConfig) {
        this.factory = persistentWriterFactory;
        this.stats = writerStats;
        this.config = collectorConfig;
        final FailsafeScheduledExecutor failsafeScheduledExecutor = new FailsafeScheduledExecutor(1, "WriterQueuesReaper");
        failsafeScheduledExecutor.schedule(new Runnable() { // from class: com.ning.metrics.collector.hadoop.processing.EventSpoolDispatcher.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    for (String str : new HashSet(EventSpoolDispatcher.this.queuesPerPath.keySet())) {
                        LocalQueueAndWriter localQueueAndWriter = (LocalQueueAndWriter) EventSpoolDispatcher.this.queuesPerPath.get(str);
                        if (localQueueAndWriter.isEmpty()) {
                            boolean z = false;
                            synchronized (EventSpoolDispatcher.this.queueMapMonitor) {
                                if (localQueueAndWriter.isEmpty()) {
                                    EventSpoolDispatcher.this.queuesPerPath.remove(str);
                                    z = true;
                                }
                            }
                            if (z) {
                                localQueueAndWriter.close();
                            }
                        }
                    }
                    LocalSpoolManager.cleanupOldSpoolDirectories(LocalSpoolManager.findOldSpoolDirectories(collectorConfig.getSpoolDirectoryName(), EventSpoolDispatcher.CUTOFF_TIME_OLD_DIRS));
                    failsafeScheduledExecutor.schedule(this, collectorConfig.getMaxUncommittedPeriodInSeconds(), TimeUnit.SECONDS);
                } catch (Throwable th) {
                    failsafeScheduledExecutor.schedule(this, collectorConfig.getMaxUncommittedPeriodInSeconds(), TimeUnit.SECONDS);
                    throw th;
                }
            }
        }, 1L, TimeUnit.HOURS);
    }

    public void shutdown() {
        this.isRunning.set(false);
        this.log.info("Closing all local writer queues");
        Iterator<LocalQueueAndWriter> it = this.queuesPerPath.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.queuesPerPath.clear();
        this.factory.close();
    }

    public boolean isRunning() {
        return this.isRunning.get();
    }

    public boolean offer(Event event) {
        SerializationType serializationType = SerializationType.get(event);
        if (event == null || !this.isRunning.get()) {
            this.stats.registerEventIgnored();
            return false;
        }
        String outputDir = event.getOutputDir(this.config.getEventOutputDirectory());
        String format = String.format("%s|%s", event.getOutputDir(this.config.getEventOutputDirectory()), serializationType.getFileSuffix());
        LocalQueueAndWriter localQueueAndWriter = this.queuesPerPath.get(format);
        if (localQueueAndWriter == null) {
            synchronized (this.queueMapMonitor) {
                localQueueAndWriter = this.queuesPerPath.get(format);
                if (localQueueAndWriter == null) {
                    localQueueAndWriter = new LocalQueueAndWriter(this.config, outputDir, this.factory.createPersistentWriter(this.stats, serializationType, event.getName(), outputDir), this.stats);
                    this.queuesPerPath.put(format, localQueueAndWriter);
                }
            }
        }
        return localQueueAndWriter.offer(event);
    }

    public Map<String, Integer> getQueuesSizes() {
        HashMap hashMap = new HashMap();
        for (String str : this.queuesPerPath.keySet()) {
            hashMap.put(str, Integer.valueOf(this.queuesPerPath.get(str).size()));
        }
        return hashMap;
    }

    public Map<String, LocalQueueAndWriter> getQueuesPerPath() {
        return this.queuesPerPath;
    }

    public WriterStats getStats() {
        return this.stats;
    }
}
