package com.yahoo.bullet.storm;

import com.yahoo.bullet.common.BulletConfig;
import com.yahoo.bullet.pubsub.Metadata;
import com.yahoo.bullet.pubsub.PubSubMessage;
import com.yahoo.bullet.query.Query;
import com.yahoo.bullet.querying.Querier;
import com.yahoo.bullet.querying.RunningQuery;
import com.yahoo.bullet.storm.batching.BatchManager;
import com.yahoo.bullet.storm.metric.BulletMetrics;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/yahoo/bullet/storm/QueryBolt.class */
public abstract class QueryBolt extends ConfigComponent implements IRichBolt {
    private static final Logger log = LoggerFactory.getLogger(QueryBolt.class);
    private static final long serialVersionUID = 4567140628827887965L;
    protected transient BulletMetrics metrics;
    protected transient OutputCollector collector;
    protected transient TupleClassifier classifier;
    protected transient String componentTaskID;
    protected transient long startTimestamp;
    protected transient boolean replayCompleted;
    protected transient boolean replayEnabled;
    protected transient boolean replayBatchCompressEnable;
    protected transient long replayRequestInterval;
    protected transient long lastReplayRequest;
    protected transient int batchCount;
    protected transient int replayedQueriesCount;
    protected transient Set<String> removedIds;

    public QueryBolt(BulletStormConfig bulletStormConfig) {
        super(bulletStormConfig);
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        this.classifier = new TupleClassifier();
        this.componentTaskID = topologyContext.getThisComponentId() + StormUtils.HYPHEN + topologyContext.getThisTaskId();
        this.metrics = new BulletMetrics(this.config);
        this.startTimestamp = System.currentTimeMillis();
        this.replayEnabled = ((Boolean) this.config.getAs(BulletStormConfig.REPLAY_ENABLE, Boolean.class)).booleanValue();
        this.replayBatchCompressEnable = ((Boolean) this.config.getAs(BulletStormConfig.REPLAY_BATCH_COMPRESS_ENABLE, Boolean.class)).booleanValue();
        this.replayRequestInterval = ((Long) this.config.getAs(BulletStormConfig.REPLAY_REQUEST_INTERVAL, Long.class)).longValue();
        this.removedIds = new HashSet();
        if (this.replayEnabled) {
            emitReplayRequest();
        } else {
            this.replayCompleted = true;
        }
    }

    public void cleanup() {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Metadata onMeta(Tuple tuple) {
        String string = tuple.getString(0);
        Metadata metadata = (Metadata) tuple.getValue(1);
        if (metadata == null) {
            return null;
        }
        Metadata.Signal signal = metadata.getSignal();
        if (TopologyConstants.isKillSignal(signal)) {
            removeQuery(string);
            log.info("Received {} signal and killed query: {}", signal, string);
        } else if (TopologyConstants.isReplaySignal(signal)) {
            handleReplaySignal();
        }
        return metadata;
    }

    private void handleReplaySignal() {
        if (!this.replayEnabled) {
            log.warn("Received {} signal but replay is not enabled.", Metadata.Signal.REPLAY);
            return;
        }
        log.info("Received {} signal.", Metadata.Signal.REPLAY);
        this.startTimestamp = System.currentTimeMillis();
        this.replayCompleted = false;
        this.batchCount = 0;
        this.replayedQueriesCount = 0;
        emitReplayRequest();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onBatch(Tuple tuple) {
        if (this.replayCompleted) {
            log.warn("Batch arrived while no ongoing replay. Ignoring...");
            return;
        }
        long longValue = tuple.getLong(1).longValue();
        int intValue = tuple.getInteger(2).intValue();
        if (longValue != this.startTimestamp) {
            log.warn("Batch timestamp {} does not match bolt start timestamp {}. Ignoring...", Long.valueOf(longValue), Long.valueOf(this.startTimestamp));
            return;
        }
        log.info("Received batch with index {}", Integer.valueOf(intValue));
        Map map = this.replayBatchCompressEnable ? (Map) BatchManager.decompress((byte[]) tuple.getValue(3)) : (Map) tuple.getValue(3);
        if (map == null) {
            log.info("Total batches: {}. Total queries replayed: {}", Integer.valueOf(this.batchCount), Integer.valueOf(this.replayedQueriesCount));
            this.replayCompleted = true;
            this.removedIds.clear();
            return;
        }
        Set<String> set = this.removedIds;
        Set keySet = map.keySet();
        keySet.getClass();
        set.removeIf((v1) -> {
            return r1.remove(v1);
        });
        map.values().stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).forEach(this::initializeQuery);
        this.batchCount++;
        this.replayedQueriesCount += map.size();
        this.lastReplayRequest = System.currentTimeMillis();
        log.info("Initialized {} queries.", Integer.valueOf(map.size()));
    }

    protected abstract void initializeQuery(PubSubMessage pubSubMessage);

    /* JADX INFO: Access modifiers changed from: protected */
    public Querier createQuerier(Querier.Mode mode, String str, Query query, Metadata metadata, BulletConfig bulletConfig) {
        return new Querier(mode, new RunningQuery(str, query, metadata), bulletConfig);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void removeQuery(String str) {
        if (this.replayCompleted) {
            return;
        }
        this.removedIds.add(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void emitReplayRequestIfNecessary() {
        if (!this.replayEnabled || this.replayCompleted || System.currentTimeMillis() < this.lastReplayRequest + this.replayRequestInterval) {
            return;
        }
        emitReplayRequest();
    }

    private void emitReplayRequest() {
        log.info("Emitting replay request from {} with start time {}", this.componentTaskID, Long.valueOf(this.startTimestamp));
        this.collector.emit(TopologyConstants.FEEDBACK_STREAM, new Values(new Object[]{this.componentTaskID, new Metadata(Metadata.Signal.REPLAY, Long.valueOf(this.startTimestamp))}));
        this.lastReplayRequest = System.currentTimeMillis();
    }
}
