package com.yahoo.bullet.storm;

import com.yahoo.bullet.common.BulletError;
import com.yahoo.bullet.common.SerializerDeserializer;
import com.yahoo.bullet.pubsub.Metadata;
import com.yahoo.bullet.pubsub.PubSubMessage;
import com.yahoo.bullet.query.Query;
import com.yahoo.bullet.querying.Querier;
import com.yahoo.bullet.querying.QueryCategorizer;
import com.yahoo.bullet.querying.RateLimitError;
import com.yahoo.bullet.result.Clip;
import com.yahoo.bullet.result.Meta;
import com.yahoo.bullet.storm.TupleClassifier;
import com.yahoo.bullet.storm.metric.AbsoluteCountMetric;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.RotatingMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/yahoo/bullet/storm/JoinBolt.class */
public class JoinBolt extends QueryBolt {
    private static final Logger log = LoggerFactory.getLogger(JoinBolt.class);
    private static final long serialVersionUID = 3312434064971532267L;
    private transient Map<String, Metadata> bufferedMetadata;
    private transient Map<String, Querier> queries;
    private transient RotatingMap<String, Querier> postFinishBuffer;
    private transient RotatingMap<String, Querier> preStartBuffer;
    private transient AbsoluteCountMetric activeQueriesCount;
    private transient AbsoluteCountMetric createdQueriesCount;
    private transient AbsoluteCountMetric improperQueriesCount;
    private transient AbsoluteCountMetric rateExceededQueries;
    private transient AbsoluteCountMetric duplicatedQueriesCount;

    public JoinBolt(BulletStormConfig bulletStormConfig) {
        super(bulletStormConfig);
    }

    @Override // com.yahoo.bullet.storm.QueryBolt
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        super.prepare(map, topologyContext, outputCollector);
        this.bufferedMetadata = new HashMap();
        this.queries = new HashMap();
        this.preStartBuffer = new RotatingMap<>(((Integer) this.config.getAs(BulletStormConfig.JOIN_BOLT_WINDOW_PRE_START_DELAY_TICKS, Integer.class)).intValue());
        this.postFinishBuffer = new RotatingMap<>(((Integer) this.config.getAs(BulletStormConfig.JOIN_BOLT_QUERY_POST_FINISH_BUFFER_TICKS, Integer.class)).intValue());
        if (this.metrics.isEnabled()) {
            this.activeQueriesCount = this.metrics.registerAbsoluteCountMetric(TopologyConstants.ACTIVE_QUERIES_METRIC, topologyContext);
            this.createdQueriesCount = this.metrics.registerAbsoluteCountMetric(TopologyConstants.CREATED_QUERIES_METRIC, topologyContext);
            this.improperQueriesCount = this.metrics.registerAbsoluteCountMetric(TopologyConstants.IMPROPER_QUERIES_METRIC, topologyContext);
            this.rateExceededQueries = this.metrics.registerAbsoluteCountMetric(TopologyConstants.RATE_EXCEEDED_QUERIES_METRIC, topologyContext);
            this.duplicatedQueriesCount = this.metrics.registerAbsoluteCountMetric(TopologyConstants.DUPLICATED_QUERIES_METRIC, topologyContext);
        }
    }

    public void execute(Tuple tuple) {
        TupleClassifier.Type orElse = this.classifier.classifyInternalTypes(tuple).orElse(TupleClassifier.Type.UNKNOWN_TUPLE);
        switch (orElse) {
            case TICK_TUPLE:
                onTick();
                break;
            case METADATA_TUPLE:
                onMeta(tuple);
                break;
            case QUERY_TUPLE:
                onQuery(tuple);
                break;
            case BATCH_TUPLE:
                onBatch(tuple);
                break;
            case ERROR_TUPLE:
                onError(tuple);
                break;
            case DATA_TUPLE:
                onData(tuple);
                break;
            default:
                log.error("Unknown tuple encountered in join: {}", orElse);
                return;
        }
        this.collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("default", new Fields(new String[]{TopologyConstants.ID_FIELD, TopologyConstants.RESULT_FIELD, "metadata"}));
        outputFieldsDeclarer.declareStream(TopologyConstants.FEEDBACK_STREAM, new Fields(new String[]{TopologyConstants.ID_FIELD, "metadata"}));
    }

    private void onTick() {
        this.postFinishBuffer.rotate().entrySet().forEach(this::emitFinished);
        this.metrics.updateCount(this.activeQueriesCount, -r0.size());
        this.preStartBuffer.rotate().entrySet().forEach(this::startDelayed);
        handleCategorizedQueries(new QueryCategorizer().categorize(this.queries));
        emitReplayRequestIfNecessary();
    }

    private void onQuery(Tuple tuple) {
        initializeQuery(tuple.getString(0), (byte[]) tuple.getValue(1), (Metadata) tuple.getValue(2));
    }

    private void initializeQuery(String str, byte[] bArr, Metadata metadata) {
        if (this.bufferedMetadata.containsKey(str)) {
            log.debug("Duplicate for request {}", str);
            this.metrics.updateCount(this.duplicatedQueriesCount, 1L);
            return;
        }
        try {
            setupQuery(str, metadata, createQuerier(Querier.Mode.ALL, str, (Query) SerializerDeserializer.fromBytes(bArr), metadata, this.config));
        } catch (RuntimeException e) {
            emitErrorsAsResult(str, metadata, BulletError.makeError(e.toString(), "Error initializing query"));
            log.error("Failed to initialize query for request {}", str);
        }
    }

    private void onData(Tuple tuple) {
        String string = tuple.getString(0);
        Querier query = getQuery(string);
        if (query == null) {
            log.debug("Received data for query {} before query. Ignoring...", string);
            return;
        }
        query.combine((byte[]) tuple.getValue(1));
        if (query.isDone()) {
            emitOrBufferFinished(string, query);
        } else if (query.isExceedingRateLimit()) {
            emitRateLimitError(string, query, query.getRateLimitError());
        } else if (query.isClosed()) {
            emitWindow(string, query);
        }
    }

    private void onError(Tuple tuple) {
        String string = tuple.getString(0);
        Querier query = getQuery(string);
        if (query == null) {
            log.debug("Received error for {} without the query existing", string);
        } else {
            emitRateLimitError(string, query, (RateLimitError) tuple.getValue(1));
        }
    }

    private void handleCategorizedQueries(QueryCategorizer queryCategorizer) {
        Map done = queryCategorizer.getDone();
        done.entrySet().forEach(this::emitOrBufferFinished);
        Map rateLimited = queryCategorizer.getRateLimited();
        rateLimited.entrySet().forEach(this::emitRateLimitError);
        Map closed = queryCategorizer.getClosed();
        closed.entrySet().forEach(this::emitWindow);
        log.debug("Done: {}, Rate limited: {}, Closed: {}, Starting delayed: {}, Buffered finished: {}, Active: {}", new Object[]{Integer.valueOf(done.size()), Integer.valueOf(rateLimited.size()), Integer.valueOf(closed.size()), Integer.valueOf(this.preStartBuffer.size()), Integer.valueOf(this.postFinishBuffer.size()), Integer.valueOf(this.queries.size())});
    }

    private void emitRateLimitError(Map.Entry<String, Querier> entry) {
        Querier value = entry.getValue();
        emitRateLimitError(entry.getKey(), value, value.getRateLimitError());
    }

    private void emitRateLimitError(String str, Querier querier, RateLimitError rateLimitError) {
        Metadata metadata = this.bufferedMetadata.get(str);
        Meta makeMeta = rateLimitError.makeMeta();
        Clip finish = querier.finish();
        finish.getMeta().merge(makeMeta);
        emitResult(str, withSignal(metadata, Metadata.Signal.FAIL), finish);
        emitMetaSignal(str, Metadata.Signal.KILL);
        this.metrics.updateCount(this.rateExceededQueries, 1L);
        removeQuery(str);
    }

    private void emitOrBufferFinished(Map.Entry<String, Querier> entry) {
        emitOrBufferFinished(entry.getKey(), entry.getValue());
    }

    private void emitOrBufferFinished(String str, Querier querier) {
        if (!querier.shouldBuffer()) {
            log.debug("Emitting query since it shouldn't be buffered {}", str);
            emitFinished(str, querier);
        } else if (querier.isClosed()) {
            log.debug("Emitting query since it finished but this is the last window for it {}", str);
            emitFinished(str, querier);
        } else if (this.queries.containsKey(str)) {
            log.debug("Starting to buffer while waiting for more final results for query {}", str);
            this.queries.remove(str);
            this.postFinishBuffer.put(str, querier);
        }
    }

    private void emitFinished(Map.Entry<String, Querier> entry) {
        emitFinished(entry.getKey(), entry.getValue());
    }

    private void emitFinished(String str, Querier querier) {
        log.info("Query is done {}...", str);
        emitResult(str, withSignal(this.bufferedMetadata.get(str), Metadata.Signal.COMPLETE), querier.finish());
        emitMetaSignal(str, Metadata.Signal.COMPLETE);
        removeQuery(str);
    }

    private void emitWindow(Map.Entry<String, Querier> entry) {
        emitWindow(entry.getKey(), entry.getValue());
    }

    private void emitWindow(String str, Querier querier) {
        log.debug("Emitting window for {} and resetting...", str);
        emitResult(str, this.bufferedMetadata.get(str), querier.getResult());
        querier.reset();
    }

    private void emitErrorsAsResult(String str, Metadata metadata, BulletError... bulletErrorArr) {
        emitErrorsAsResult(str, metadata, Arrays.asList(bulletErrorArr));
    }

    private void emitErrorsAsResult(String str, Metadata metadata, List<BulletError> list) {
        this.metrics.updateCount(this.improperQueriesCount, 1L);
        emitResult(str, withSignal(metadata, Metadata.Signal.FAIL), Clip.of(Meta.of(list)));
    }

    private void emitResult(String str, Metadata metadata, Clip clip) {
        this.collector.emit("default", new Values(new Object[]{str, clip.asJSON(), metadata}));
    }

    private void emitMetaSignal(String str, Metadata.Signal signal) {
        log.error("Emitting {} signal to the feedback stream for {}", signal, str);
        this.collector.emit(TopologyConstants.FEEDBACK_STREAM, new Values(new Object[]{str, new Metadata(signal, (Serializable) null)}));
    }

    @Override // com.yahoo.bullet.storm.QueryBolt
    protected void initializeQuery(PubSubMessage pubSubMessage) {
        initializeQuery(pubSubMessage.getId(), pubSubMessage.getContent(), pubSubMessage.getMetadata());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.yahoo.bullet.storm.QueryBolt
    public void removeQuery(String str) {
        super.removeQuery(str);
        if (this.queries.containsKey(str) || this.postFinishBuffer.containsKey(str)) {
            this.metrics.updateCount(this.activeQueriesCount, -1L);
        }
        this.queries.remove(str);
        this.postFinishBuffer.remove(str);
        this.bufferedMetadata.remove(str);
        this.preStartBuffer.remove(str);
    }

    private void setupQuery(String str, Metadata metadata, Querier querier) {
        this.metrics.updateCount(this.createdQueriesCount, 1L);
        this.bufferedMetadata.put(str, metadata);
        if (!querier.shouldBuffer()) {
            this.preStartBuffer.put(str, querier);
            log.info("Received but delaying starting query {}", str);
        } else {
            this.queries.put(str, querier);
            this.metrics.updateCount(this.activeQueriesCount, 1L);
            log.info("Received and started query {} : {}", querier.getRunningQuery().getId(), querier.getRunningQuery().getQueryString());
            log.debug("Received and started query {}", querier);
        }
    }

    private void startDelayed(Map.Entry<String, Querier> entry) {
        String key = entry.getKey();
        Querier value = entry.getValue();
        this.preStartBuffer.remove(key);
        value.restart();
        this.queries.put(key, value);
        this.metrics.updateCount(this.activeQueriesCount, 1L);
        log.info("Started delayed query {}", key);
    }

    private Metadata withSignal(Metadata metadata, Metadata.Signal signal) {
        if (metadata == null) {
            return new Metadata(signal, (Serializable) null);
        }
        Metadata copy = metadata.copy();
        copy.setSignal(signal);
        return copy;
    }

    private Querier getQuery(String str) {
        Querier querier = this.queries.get(str);
        if (querier == null) {
            log.debug("Query might be done: {}", str);
            querier = (Querier) this.postFinishBuffer.get(str);
        }
        if (querier == null) {
            log.debug("Fetching delayed query {}", str);
            querier = (Querier) this.preStartBuffer.get(str);
        }
        return querier;
    }

    Map<String, Querier> getQueries() {
        return this.queries;
    }
}
