package org.apache.pinot.tools.tuner.driver;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.tools.tuner.meta.manager.MetaManager;
import org.apache.pinot.tools.tuner.query.src.InputIterator;
import org.apache.pinot.tools.tuner.query.src.stats.wrapper.AbstractQueryStats;
import org.apache.pinot.tools.tuner.strategy.AbstractAccumulator;
import org.apache.pinot.tools.tuner.strategy.TuningStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/tools/tuner/driver/TunerDriver.class */
public class TunerDriver {
    private static final Logger LOGGER = LoggerFactory.getLogger(TunerDriver.class);
    public static final int NO_CONCURRENCY = 0;
    private InputIterator _inputIterator = null;
    private MetaManager _metaManager = null;
    private TuningStrategy _tuningStrategy = null;
    private int _threadPoolSize = 0;
    private Map<Long, Map<String, Map<String, AbstractAccumulator>>> _threadToTableAccumulators = null;
    private Map<String, Map<String, AbstractAccumulator>> _tableToColMergers;

    public TunerDriver setThreadPoolSize(int i) {
        this._threadPoolSize = i;
        return this;
    }

    public TunerDriver setInputIterator(InputIterator inputIterator) {
        this._inputIterator = inputIterator;
        return this;
    }

    public TunerDriver setMetaManager(MetaManager metaManager) {
        this._metaManager = metaManager;
        return this;
    }

    public TunerDriver setTuningStrategy(TuningStrategy tuningStrategy) {
        this._tuningStrategy = tuningStrategy;
        return this;
    }

    public void execute() {
        this._threadToTableAccumulators = new HashMap();
        LOGGER.info("Setting up executor for accumulation: {} threads", Integer.valueOf(this._threadPoolSize));
        ThreadPoolExecutor threadPoolExecutor = this._threadPoolSize != 0 ? new ThreadPoolExecutor(this._threadPoolSize, this._threadPoolSize, 365L, TimeUnit.DAYS, new LinkedBlockingQueue(Integer.MAX_VALUE), new ThreadPoolExecutor.CallerRunsPolicy()) : null;
        while (this._inputIterator.hasNext()) {
            AbstractQueryStats next = this._inputIterator.next();
            if (next != null && this._tuningStrategy.filter(next)) {
                LOGGER.debug("Master thread {} submitting: {}", Long.valueOf(Thread.currentThread().getId()), next.toString());
                if (this._threadPoolSize != 0) {
                    threadPoolExecutor.execute(() -> {
                        long id = Thread.currentThread().getId();
                        LOGGER.debug("Thread {} accumulating: {}", Long.valueOf(id), next.toString());
                        this._threadToTableAccumulators.putIfAbsent(Long.valueOf(id), new HashMap());
                        this._tuningStrategy.accumulate(next, this._metaManager, this._threadToTableAccumulators.get(Long.valueOf(id)));
                    });
                } else {
                    long id = Thread.currentThread().getId();
                    LOGGER.debug("Thread {} accumulating: {}", Long.valueOf(id), next.toString());
                    this._threadToTableAccumulators.putIfAbsent(Long.valueOf(id), new HashMap());
                    this._tuningStrategy.accumulate(next, this._metaManager, this._threadToTableAccumulators.get(Long.valueOf(id)));
                }
            }
        }
        try {
            this._inputIterator.close();
        } catch (IOException e) {
            LOGGER.error("Error closing query src ", e);
        }
        if (this._threadPoolSize != 0) {
            threadPoolExecutor.shutdown();
            LOGGER.info("All queries queued for accumulation");
            try {
                threadPoolExecutor.awaitTermination(365L, TimeUnit.DAYS);
            } catch (InterruptedException e2) {
                LOGGER.error("Accumulator Interrupted!", e2);
            }
            LOGGER.info("All accumulation done");
        }
        LOGGER.info("Setting up mergedResults for merging");
        this._tableToColMergers = new HashMap();
        this._threadToTableAccumulators.forEach((l, map) -> {
            map.keySet().forEach(str -> {
                this._tableToColMergers.putIfAbsent(str, new HashMap());
            });
        });
        LOGGER.info("tableNames: {}", this._tableToColMergers.keySet().toString());
        LOGGER.info("Setting up executor for merging: {} threads", Integer.valueOf(this._threadPoolSize));
        ThreadPoolExecutor threadPoolExecutor2 = this._threadPoolSize != 0 ? new ThreadPoolExecutor(this._threadPoolSize, this._threadPoolSize, 365L, TimeUnit.DAYS, new LinkedBlockingQueue(Integer.MAX_VALUE), new ThreadPoolExecutor.CallerRunsPolicy()) : null;
        for (String str : this._tableToColMergers.keySet()) {
            if (this._threadPoolSize != 0) {
                threadPoolExecutor2.execute(() -> {
                    LOGGER.debug("Thread {} working on table {}", Long.valueOf(Thread.currentThread().getId()), str);
                    this._threadToTableAccumulators.forEach((l2, map2) -> {
                        ((Map) map2.getOrDefault(str, new HashMap())).forEach((str2, abstractAccumulator) -> {
                            try {
                                this._tableToColMergers.get(str).putIfAbsent(str2, abstractAccumulator.getClass().newInstance());
                                this._tuningStrategy.merge(this._tableToColMergers.get(str).get(str2), abstractAccumulator);
                            } catch (Exception e3) {
                                LOGGER.error("Instantiation Exception in Merger!", e3);
                            }
                        });
                    });
                });
            } else {
                this._threadToTableAccumulators.forEach((l2, map2) -> {
                    ((Map) map2.getOrDefault(str, new HashMap())).forEach((str2, abstractAccumulator) -> {
                        try {
                            this._tableToColMergers.get(str).putIfAbsent(str2, abstractAccumulator.getClass().newInstance());
                            this._tuningStrategy.merge(this._tableToColMergers.get(str).get(str2), abstractAccumulator);
                        } catch (Exception e3) {
                            LOGGER.error("Instantiation Exception in Merger!", e3);
                        }
                    });
                });
            }
        }
        if (this._threadPoolSize != 0) {
            LOGGER.info("All tables waiting for merge");
            threadPoolExecutor2.shutdown();
            try {
                threadPoolExecutor2.awaitTermination(365L, TimeUnit.DAYS);
            } catch (InterruptedException e3) {
                LOGGER.error("Interruption of merger", e3);
            }
            LOGGER.info("All merge done");
        }
        this._tuningStrategy.report(this._tableToColMergers);
    }
}
