package com.yahoo.bullet.storm;

import com.yahoo.bullet.pubsub.Metadata;
import com.yahoo.bullet.pubsub.PubSubMessage;
import com.yahoo.bullet.storage.StorageManager;
import com.yahoo.bullet.storm.TupleClassifier;
import com.yahoo.bullet.storm.batching.BatchManager;
import com.yahoo.bullet.storm.grouping.TaskIndexCaptureGrouping;
import com.yahoo.bullet.storm.metric.AbsoluteCountMetric;
import com.yahoo.bullet.storm.metric.BulletMetrics;
import com.yahoo.bullet.storm.metric.MapCountMetric;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/yahoo/bullet/storm/ReplayBolt.class */
public class ReplayBolt extends ConfigComponent implements IRichBolt {
    private static final Logger log = LoggerFactory.getLogger(ReplayBolt.class);
    private static final long serialVersionUID = 7678526821834215930L;
    private transient BulletMetrics metrics;
    private transient AbsoluteCountMetric batchedQueriesCount;
    private transient MapCountMetric activeReplaysCount;
    private transient MapCountMetric createdReplaysCount;
    private transient OutputCollector collector;
    private transient TupleClassifier classifier;
    private transient StorageManager<PubSubMessage> storageManager;
    private transient BatchManager<PubSubMessage> batchManager;
    private transient Map<String, Replay> replays;
    private transient boolean replayBatchCompressEnable;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/yahoo/bullet/storm/ReplayBolt$Replay.class */
    public static class Replay {
        private final int taskID;
        private final long timestamp;
        private List batches;
        private int anchor = -1;
        private int index;

        Replay(int i, long j, List list) {
            this.taskID = i;
            this.timestamp = j;
            this.batches = list;
        }

        int getTaskID() {
            return this.taskID;
        }

        long getTimestamp() {
            return this.timestamp;
        }

        List getBatches() {
            return this.batches;
        }

        int getAnchor() {
            return this.anchor;
        }

        int getIndex() {
            return this.index;
        }

        static /* synthetic */ int access$308(Replay replay) {
            int i = replay.index;
            replay.index = i + 1;
            return i;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/yahoo/bullet/storm/ReplayBolt$ReplayState.class */
    public enum ReplayState {
        START_REPLAY,
        PAST_REPLAY,
        FINISHED_REPLAY,
        NEW_REPLAY,
        OLD_REPLAY,
        ACK
    }

    public ReplayBolt(BulletStormConfig bulletStormConfig) {
        super(bulletStormConfig);
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        this.classifier = new TupleClassifier();
        this.metrics = new BulletMetrics(this.config);
        if (this.metrics.isEnabled()) {
            this.batchedQueriesCount = this.metrics.registerAbsoluteCountMetric(TopologyConstants.BATCHED_QUERIES_METRIC, topologyContext);
            this.activeReplaysCount = this.metrics.registerMapCountMetric(TopologyConstants.ACTIVE_REPLAYS_METRIC, topologyContext);
            this.createdReplaysCount = this.metrics.registerMapCountMetric(TopologyConstants.CREATED_REPLAYS_METRIC, topologyContext);
        }
        int intValue = ((Integer) this.config.getAs(BulletStormConfig.REPLAY_BATCH_SIZE, Integer.class)).intValue();
        Number number = (Number) this.config.getRequiredConfigAs(BulletStormConfig.JOIN_BOLT_PARALLELISM, Number.class);
        this.replayBatchCompressEnable = ((Boolean) this.config.getAs(BulletStormConfig.REPLAY_BATCH_COMPRESS_ENABLE, Boolean.class)).booleanValue();
        this.batchManager = new BatchManager<>(intValue, number.intValue(), this.replayBatchCompressEnable);
        this.replays = new HashMap();
        try {
            this.storageManager = StorageManager.from(this.config);
            long currentTimeMillis = System.currentTimeMillis();
            Map<String, PubSubMessage> storedQueries = getStoredQueries();
            log.info("Took {} ms to get {} queries.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Integer.valueOf(storedQueries.size()));
            long currentTimeMillis2 = System.currentTimeMillis();
            this.batchManager.addAll(storedQueries);
            log.info("Took {} ms to batch {} queries.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis2), Integer.valueOf(this.batchManager.size()));
            this.metrics.setCount(this.batchedQueriesCount, this.batchManager.size());
        } catch (Exception e) {
            throw new RuntimeException("Could not create StorageManager.", e);
        }
    }

    private Map<String, PubSubMessage> getStoredQueries() {
        try {
            return (Map) this.storageManager.getAll().get();
        } catch (Exception e) {
            throw new RuntimeException("Failed to get queries from storage.", e);
        }
    }

    public void execute(Tuple tuple) {
        TupleClassifier.Type orElse = this.classifier.classify(tuple).orElse(TupleClassifier.Type.UNKNOWN_TUPLE);
        switch (orElse) {
            case QUERY_TUPLE:
                onQuery(tuple);
                break;
            case METADATA_TUPLE:
                onMeta(tuple);
                break;
            case REPLAY_TUPLE:
                onReplay(tuple);
                return;
            default:
                log.error("Unknown tuple encountered: {} from {}-{}", new Object[]{orElse, tuple.getSourceComponent(), tuple.getSourceStreamId()});
                return;
        }
        this.collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream(TopologyConstants.REPLAY_STREAM, new Fields(new String[]{TopologyConstants.ID_FIELD, "timestamp", TopologyConstants.REPLAY_INDEX_FIELD, TopologyConstants.REPLAY_BATCH_FIELD}));
        outputFieldsDeclarer.declareStream(TopologyConstants.CAPTURE_STREAM, new Fields(new String[0]));
    }

    public void cleanup() {
    }

    private void onQuery(Tuple tuple) {
        String string = tuple.getString(0);
        this.batchManager.add(string, new PubSubMessage(string, (byte[]) tuple.getValue(1)));
        this.metrics.setCount(this.batchedQueriesCount, this.batchManager.size());
    }

    private void onMeta(Tuple tuple) {
        String string = tuple.getString(0);
        Metadata metadata = (Metadata) tuple.getValue(1);
        if (metadata == null) {
            return;
        }
        Metadata.Signal signal = metadata.getSignal();
        if (TopologyConstants.isKillSignal(signal)) {
            this.batchManager.remove(string);
            this.metrics.setCount(this.batchedQueriesCount, this.batchManager.size());
            log.info("Received {} signal and killed query: {}", signal, string);
        } else if (TopologyConstants.isReplaySignal(signal)) {
            handleReplaySignal();
        }
    }

    private void handleReplaySignal() {
        log.info("Received {} signal.", Metadata.Signal.REPLAY);
        long currentTimeMillis = System.currentTimeMillis();
        this.metrics.clearCount(this.activeReplaysCount);
        this.replays.clear();
        this.batchManager.clear();
        this.batchManager.addAll(getStoredQueries());
        this.metrics.setCount(this.batchedQueriesCount, this.batchManager.size());
        log.info("Took {} ms to get and batch {} queries.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Integer.valueOf(this.batchManager.size()));
    }

    private void onReplay(Tuple tuple) {
        String string = tuple.getString(0);
        long longValue = tuple.getLong(1).longValue();
        tuple.getBoolean(2).booleanValue();
        Replay replay = this.replays.get(string);
        switch (classifyReplay(longValue, r0, tuple, replay)) {
            case START_REPLAY:
                startReplay(string, longValue, tuple);
                return;
            case PAST_REPLAY:
                log.warn("Received replay tuple for past replay.");
                this.collector.fail(tuple);
                return;
            case FINISHED_REPLAY:
                log.warn("Received replay tuple for finished replay.");
                this.collector.fail(tuple);
                return;
            case OLD_REPLAY:
                log.warn("Received replay tuple for old replay.");
                this.collector.fail(tuple);
                return;
            case ACK:
                handleAck(string, tuple, replay);
                return;
            case NEW_REPLAY:
                emitBatch(string, tuple, replay);
                return;
            default:
                return;
        }
    }

    private ReplayState classifyReplay(long j, boolean z, Tuple tuple, Replay replay) {
        return (replay == null || j > replay.timestamp) ? ReplayState.START_REPLAY : j < replay.timestamp ? ReplayState.PAST_REPLAY : replay.batches == null ? ReplayState.FINISHED_REPLAY : (z && replay.anchor == tuple.getSourceTask()) ? ReplayState.ACK : !z ? ReplayState.NEW_REPLAY : ReplayState.OLD_REPLAY;
    }

    private void startReplay(String str, long j, Tuple tuple) {
        Replay replay;
        log.info("Starting replay to {}", str);
        int intValue = Integer.valueOf(str.split(StormUtils.HYPHEN)[1]).intValue();
        Integer num = TaskIndexCaptureGrouping.TASK_INDEX_MAP.get(Integer.valueOf(intValue));
        if (num != null) {
            replay = new Replay(intValue, j, this.replayBatchCompressEnable ? this.batchManager.getCompressedBatchesForPartition(num.intValue()) : this.batchManager.getBatchesForPartition(num.intValue()));
        } else {
            replay = new Replay(intValue, j, this.replayBatchCompressEnable ? this.batchManager.getCompressedBatches() : this.batchManager.getBatches());
        }
        this.replays.put(str, replay);
        this.metrics.setCount(this.activeReplaysCount, str, 1L);
        this.metrics.updateCount(this.createdReplaysCount, str, 1L);
        emitBatch(str, tuple, replay);
    }

    private void handleAck(String str, Tuple tuple, Replay replay) {
        if (replay.index < replay.batches.size()) {
            Replay.access$308(replay);
            emitBatch(str, tuple, replay);
            return;
        }
        log.info("Ending replay to {}", str);
        replay.batches = null;
        replay.anchor = -1;
        this.metrics.setCount(this.activeReplaysCount, str, 0L);
        this.collector.fail(tuple);
    }

    private void emitBatch(String str, Tuple tuple, Replay replay) {
        int i = replay.index;
        if (i < replay.batches.size()) {
            log.info("Emitting replay batch with index {} to {}", Integer.valueOf(i), str);
            this.collector.emitDirect(replay.taskID, TopologyConstants.REPLAY_STREAM, tuple, new Values(new Object[]{str, Long.valueOf(replay.timestamp), Integer.valueOf(i), replay.batches.get(i)}));
        } else {
            log.info("Emitting replay batch NULL (index {}) to {}", Integer.valueOf(i), str);
            this.collector.emitDirect(replay.taskID, TopologyConstants.REPLAY_STREAM, tuple, new Values(new Object[]{str, Long.valueOf(replay.timestamp), Integer.valueOf(i), null}));
        }
        replay.anchor = tuple.getSourceTask();
        this.collector.ack(tuple);
    }

    BulletMetrics getMetrics() {
        return this.metrics;
    }

    AbsoluteCountMetric getBatchedQueriesCount() {
        return this.batchedQueriesCount;
    }

    MapCountMetric getActiveReplaysCount() {
        return this.activeReplaysCount;
    }

    MapCountMetric getCreatedReplaysCount() {
        return this.createdReplaysCount;
    }

    OutputCollector getCollector() {
        return this.collector;
    }

    TupleClassifier getClassifier() {
        return this.classifier;
    }

    StorageManager<PubSubMessage> getStorageManager() {
        return this.storageManager;
    }

    BatchManager<PubSubMessage> getBatchManager() {
        return this.batchManager;
    }

    Map<String, Replay> getReplays() {
        return this.replays;
    }

    boolean isReplayBatchCompressEnable() {
        return this.replayBatchCompressEnable;
    }
}
