package com.yahoo.bullet.storm;

import com.yahoo.bullet.common.SerializerDeserializer;
import com.yahoo.bullet.pubsub.Metadata;
import com.yahoo.bullet.pubsub.PubSubMessage;
import com.yahoo.bullet.query.Query;
import com.yahoo.bullet.querying.Querier;
import com.yahoo.bullet.querying.QueryCategorizer;
import com.yahoo.bullet.querying.QueryManager;
import com.yahoo.bullet.record.BulletRecord;
import com.yahoo.bullet.storm.TupleClassifier;
import java.util.Map;
import org.apache.storm.metric.api.ReducedMetric;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/yahoo/bullet/storm/FilterBolt.class */
public class FilterBolt extends QueryBolt {
    private static final Logger log = LoggerFactory.getLogger(FilterBolt.class);
    private static final long serialVersionUID = -4357269268404488793L;
    private String recordComponent;
    private transient QueryManager manager;
    private transient ReducedMetric averageLatency;
    private transient int statsTickInterval;
    private transient int statsTickCount;
    private transient int duplicatedCount;

    public FilterBolt(String str, BulletStormConfig bulletStormConfig) {
        super(bulletStormConfig);
        this.recordComponent = str;
    }

    @Override // com.yahoo.bullet.storm.QueryBolt
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        super.prepare(map, topologyContext, outputCollector);
        this.classifier.setRecordComponent(this.recordComponent);
        this.statsTickInterval = ((Integer) this.config.getAs(BulletStormConfig.FILTER_BOLT_STATS_REPORT_TICKS, Integer.class)).intValue();
        this.statsTickCount = 0;
        this.manager = new QueryManager(this.config);
        if (this.metrics.isEnabled()) {
            this.averageLatency = this.metrics.registerAveragingMetric(TopologyConstants.LATENCY_METRIC, topologyContext);
        }
    }

    public void execute(Tuple tuple) {
        TupleClassifier.Type orElse = this.classifier.classify(tuple).orElse(TupleClassifier.Type.UNKNOWN_TUPLE);
        switch (orElse) {
            case TICK_TUPLE:
                onTick();
                break;
            case METADATA_TUPLE:
                onMeta(tuple);
                break;
            case QUERY_TUPLE:
                onQuery(tuple);
                break;
            case RECORD_TUPLE:
                onRecord(tuple);
                updateLatency(tuple);
                break;
            case BATCH_TUPLE:
                onBatch(tuple);
                break;
            default:
                log.error("Unknown tuple encountered: {}", orElse);
                return;
        }
        this.collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("default", new Fields(new String[]{TopologyConstants.ID_FIELD, TopologyConstants.DATA_FIELD}));
        outputFieldsDeclarer.declareStream("error", new Fields(new String[]{TopologyConstants.ID_FIELD, "error"}));
        outputFieldsDeclarer.declareStream(TopologyConstants.FEEDBACK_STREAM, new Fields(new String[]{TopologyConstants.ID_FIELD, "metadata"}));
    }

    @Override // com.yahoo.bullet.storm.QueryBolt
    protected void initializeQuery(PubSubMessage pubSubMessage) {
        initializeQuery(pubSubMessage.getId(), pubSubMessage.getContent(), pubSubMessage.getMetadata());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.yahoo.bullet.storm.QueryBolt
    public void removeQuery(String str) {
        super.removeQuery(str);
        this.manager.removeAndGetQuery(str);
    }

    private void onQuery(Tuple tuple) {
        initializeQuery(tuple.getString(0), (byte[]) tuple.getValue(1), (Metadata) tuple.getValue(2));
    }

    private void initializeQuery(String str, byte[] bArr, Metadata metadata) {
        if (this.manager.hasQuery(str)) {
            log.debug("Duplicate for request {}", str);
            this.duplicatedCount++;
            return;
        }
        try {
            Querier createQuerier = createQuerier(Querier.Mode.PARTITION, str, (Query) SerializerDeserializer.fromBytes(bArr), metadata, this.config);
            this.manager.addQuery(str, createQuerier);
            log.info("Initialized query {} : {}", createQuerier.getRunningQuery().getId(), createQuerier.getRunningQuery().getQueryString());
            log.debug("Initialized query {}", createQuerier);
        } catch (RuntimeException e) {
            log.error("Failed to initialize query for request {}", str);
        }
    }

    private void onRecord(Tuple tuple) {
        handleCategorizedQueries(this.manager.categorize((BulletRecord) tuple.getValue(0)));
    }

    private void onTick() {
        handleCategorizedQueries(this.manager.categorize());
        handleStats();
        emitReplayRequestIfNecessary();
    }

    private void handleCategorizedQueries(QueryCategorizer queryCategorizer) {
        Map done = queryCategorizer.getDone();
        done.entrySet().forEach(this::emitData);
        this.manager.removeQueries(done.keySet());
        Map rateLimited = queryCategorizer.getRateLimited();
        rateLimited.entrySet().forEach(this::emitError);
        this.manager.removeQueries(rateLimited.keySet());
        Map closed = queryCategorizer.getClosed();
        closed.entrySet().forEach(this::emitData);
        closed.values().forEach((v0) -> {
            v0.reset();
        });
        log.debug("Done: {}, Rate limited: {}, Closed: {}, Active: {}", new Object[]{Integer.valueOf(done.size()), Integer.valueOf(rateLimited.size()), Integer.valueOf(closed.size()), Integer.valueOf(this.manager.size())});
    }

    private void handleStats() {
        this.statsTickCount++;
        if (this.statsTickCount < this.statsTickInterval) {
            return;
        }
        this.statsTickCount = 0;
        log.info("Query Manager Statistics:\n{}\n", this.manager.getStats());
        log.info("Duplicated queries count: {}", Integer.valueOf(this.duplicatedCount));
    }

    private void emitData(Map.Entry<String, Querier> entry) {
        emit("default", entry.getKey(), entry.getValue().getData());
    }

    private void emitError(Map.Entry<String, Querier> entry) {
        emit("error", entry.getKey(), entry.getValue().getRateLimitError());
    }

    private void emit(String str, String str2, Object obj) {
        if (obj != null) {
            this.collector.emit(str, new Values(new Object[]{str2, obj}));
        }
    }

    private void updateLatency(Tuple tuple) {
        if (!this.metrics.isEnabled() || tuple.size() <= 1) {
            return;
        }
        this.averageLatency.update(Long.valueOf(System.currentTimeMillis() - ((Long) tuple.getValue(1)).longValue()));
    }

    QueryManager getManager() {
        return this.manager;
    }

    int getStatsTickCount() {
        return this.statsTickCount;
    }
}
