package com.yahoo.bullet.storm;

import com.yahoo.bullet.common.SerializerDeserializer;
import com.yahoo.bullet.pubsub.Metadata;
import com.yahoo.bullet.pubsub.PubSubMessage;
import com.yahoo.bullet.query.QueryUtils;
import com.yahoo.bullet.query.Window;
import com.yahoo.bullet.querying.Querier;
import com.yahoo.bullet.storm.TupleClassifier;
import com.yahoo.bullet.storm.batching.BatchManager;
import com.yahoo.bullet.storm.metric.AbsoluteCountMetric;
import com.yahoo.bullet.storm.testing.ComponentUtils;
import com.yahoo.bullet.storm.testing.CustomCollector;
import com.yahoo.bullet.storm.testing.CustomTopologyContext;
import com.yahoo.bullet.storm.testing.TupleUtils;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metric.api.ReducedMetric;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/yahoo/bullet/storm/QueryBoltTest.class */
public class QueryBoltTest {

    /* loaded from: input_file:com/yahoo/bullet/storm/QueryBoltTest$TestQueryBolt.class */
    private static class TestQueryBolt extends QueryBolt {
        private boolean cleaned;
        private ReducedMetric averagingMetric;
        private AbsoluteCountMetric countMetric;
        private int tupleCount;
        private Map<String, Querier> queries;
        private int initializedQueryCount;

        TestQueryBolt(BulletStormConfig bulletStormConfig) {
            super(bulletStormConfig);
            this.cleaned = false;
        }

        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            super.prepare(map, topologyContext, outputCollector);
            this.queries = new HashMap();
            this.averagingMetric = this.metrics.registerAveragingMetric("foo", topologyContext);
            this.countMetric = this.metrics.registerAbsoluteCountMetric("bar", topologyContext);
        }

        public void execute(Tuple tuple) {
            this.tupleCount++;
            this.metrics.updateCount(this.countMetric, 1L);
            if (this.metrics.isEnabled()) {
                this.averagingMetric.update(Integer.valueOf(this.tupleCount));
            }
            if (tuple != null) {
                onMeta(tuple);
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        }

        public void cleanup() {
            super.cleanup();
            this.cleaned = true;
        }

        protected void initializeQuery(PubSubMessage pubSubMessage) {
            this.initializedQueryCount++;
        }

        protected void removeQuery(String str) {
            this.queries.remove(str);
        }

        boolean isMetricsEnabled() {
            return this.metrics.isEnabled();
        }

        Map<String, Number> getMetricsMapping() {
            return this.metrics.getMetricsIntervalMapping();
        }

        OutputCollector getCollector() {
            return this.collector;
        }

        TupleClassifier getClassifier() {
            return this.classifier;
        }

        Map<String, Querier> getQueries() {
            return this.queries;
        }

        public boolean isCleaned() {
            return this.cleaned;
        }

        public ReducedMetric getAveragingMetric() {
            return this.averagingMetric;
        }

        public AbsoluteCountMetric getCountMetric() {
            return this.countMetric;
        }

        public int getTupleCount() {
            return this.tupleCount;
        }

        public int getInitializedQueryCount() {
            return this.initializedQueryCount;
        }
    }

    @Test
    public void testCleanup() {
        TestQueryBolt testQueryBolt = new TestQueryBolt(new BulletStormConfig());
        testQueryBolt.cleanup();
        Assert.assertTrue(testQueryBolt.isCleaned());
    }

    @Test
    public void testPrepare() {
        CustomCollector customCollector = new CustomCollector();
        TestQueryBolt testQueryBolt = new TestQueryBolt(new BulletStormConfig());
        ComponentUtils.prepare(testQueryBolt, customCollector);
        Assert.assertEquals(customCollector.getAckedCount(), 0);
        testQueryBolt.getCollector().ack((Tuple) null);
        Assert.assertEquals(customCollector.getAckedCount(), 1);
        Assert.assertEquals(testQueryBolt.getQueries().size(), 0);
        Assert.assertEquals(testQueryBolt.getMetricsMapping(), BulletStormConfig.DEFAULT_TOPOLOGY_METRICS_BUILT_IN_EMIT_INTERVAL_MAPPING);
        Assert.assertEquals(testQueryBolt.isMetricsEnabled(), false);
        Tuple tuple = (Tuple) Mockito.mock(Tuple.class);
        ((Tuple) Mockito.doReturn("DataSource").when(tuple)).getSourceComponent();
        Assert.assertEquals(testQueryBolt.getClassifier().classify(tuple), Optional.of(TupleClassifier.Type.RECORD_TUPLE));
    }

    @Test
    public void testMetricsUpdateOnMetricsDisabled() {
        CustomTopologyContext customTopologyContext = new CustomTopologyContext();
        CustomCollector customCollector = new CustomCollector();
        TestQueryBolt testQueryBolt = new TestQueryBolt(new BulletStormConfig());
        ComponentUtils.prepare(new HashMap(), testQueryBolt, customTopologyContext, customCollector);
        Assert.assertFalse(testQueryBolt.isMetricsEnabled());
        testQueryBolt.execute(null);
        IMetric registeredMetricByName = customTopologyContext.getRegisteredMetricByName("foo");
        IMetric registeredMetricByName2 = customTopologyContext.getRegisteredMetricByName("bar");
        Assert.assertNull(registeredMetricByName.getValueAndReset());
        Assert.assertEquals(registeredMetricByName2.getValueAndReset(), 0L);
    }

    @Test
    public void testMetricsUpdateOnMetricsEnabled() {
        CustomTopologyContext customTopologyContext = new CustomTopologyContext();
        CustomCollector customCollector = new CustomCollector();
        BulletStormConfig bulletStormConfig = new BulletStormConfig();
        bulletStormConfig.set("bullet.topology.metrics.built.in.enable", true);
        bulletStormConfig.validate();
        TestQueryBolt testQueryBolt = new TestQueryBolt(bulletStormConfig);
        ComponentUtils.prepare(new HashMap(), testQueryBolt, customTopologyContext, customCollector);
        Assert.assertTrue(testQueryBolt.isMetricsEnabled());
        testQueryBolt.execute(null);
        IMetric registeredMetricByName = customTopologyContext.getRegisteredMetricByName("foo");
        IMetric registeredMetricByName2 = customTopologyContext.getRegisteredMetricByName("bar");
        Assert.assertEquals(registeredMetricByName.getValueAndReset(), Double.valueOf(1.0d));
        Assert.assertEquals(registeredMetricByName2.getValueAndReset(), 1L);
    }

    @Test
    public void testMetaTupleRemovingQueries() {
        CustomCollector customCollector = new CustomCollector();
        TestQueryBolt testQueryBolt = new TestQueryBolt(new BulletStormConfig());
        ComponentUtils.prepare(testQueryBolt, customCollector);
        Map<String, Querier> queries = testQueryBolt.getQueries();
        queries.put("foo", null);
        testQueryBolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.METADATA_TUPLE, "foo", new Metadata(Metadata.Signal.COMPLETE, (Serializable) null)));
        Assert.assertFalse(queries.containsKey("foo"));
        queries.put("foo", null);
        testQueryBolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.METADATA_TUPLE, "foo", new Metadata(Metadata.Signal.KILL, (Serializable) null)));
        Assert.assertFalse(queries.containsKey("foo"));
    }

    @Test
    public void testRegularMetaTupleIgnored() {
        CustomCollector customCollector = new CustomCollector();
        TestQueryBolt testQueryBolt = new TestQueryBolt(new BulletStormConfig());
        ComponentUtils.prepare(testQueryBolt, customCollector);
        Map<String, Querier> queries = testQueryBolt.getQueries();
        queries.put("foo", null);
        testQueryBolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.METADATA_TUPLE, "foo", new Metadata(Metadata.Signal.ACKNOWLEDGE, (Serializable) null)));
        Assert.assertTrue(queries.containsKey("foo"));
    }

    @Test
    public void testNullMetaTupleIgnored() {
        CustomCollector customCollector = new CustomCollector();
        TestQueryBolt testQueryBolt = new TestQueryBolt(new BulletStormConfig());
        ComponentUtils.prepare(testQueryBolt, customCollector);
        Map<String, Querier> queries = testQueryBolt.getQueries();
        queries.put("foo", null);
        testQueryBolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.METADATA_TUPLE, "foo", null));
        Assert.assertTrue(queries.containsKey("foo"));
    }

    @Test
    public void testBatchInitializeQuery() {
        CustomCollector customCollector = new CustomCollector();
        TestQueryBolt testQueryBolt = new TestQueryBolt(new BulletStormConfig("test_config.yaml"));
        ComponentUtils.prepare(testQueryBolt, customCollector);
        Assert.assertEquals(testQueryBolt.replayedQueriesCount, 0);
        Assert.assertEquals(testQueryBolt.initializedQueryCount, 0);
        HashMap hashMap = new HashMap();
        hashMap.put("42", new PubSubMessage("42", SerializerDeserializer.toBytes(QueryUtils.makeSimpleAggregationFieldFilterQuery("b235gf23b", 5, Window.Unit.RECORD, 1, Window.Unit.RECORD, 1)), new Metadata()));
        hashMap.put("43", new PubSubMessage("43", SerializerDeserializer.toBytes(QueryUtils.makeSimpleAggregationFieldFilterQuery("b235gf23b", 5, Window.Unit.RECORD, 1, Window.Unit.RECORD, 1)), new Metadata()));
        Tuple makeIDTuple = TupleUtils.makeIDTuple(TupleClassifier.Type.BATCH_TUPLE, "FilterBolt-18");
        Mockito.when(makeIDTuple.getLong(1)).thenReturn(Long.valueOf(testQueryBolt.startTimestamp));
        Mockito.when(makeIDTuple.getInteger(2)).thenReturn(0);
        Mockito.when(makeIDTuple.getValue(3)).thenReturn(hashMap);
        testQueryBolt.onBatch(makeIDTuple);
        Assert.assertEquals(testQueryBolt.replayedQueriesCount, 2);
        Assert.assertEquals(testQueryBolt.initializedQueryCount, 2);
    }

    @Test
    public void testBatchInitializeQueryWithCompression() {
        CustomCollector customCollector = new CustomCollector();
        BulletStormConfig bulletStormConfig = new BulletStormConfig("test_config.yaml");
        bulletStormConfig.set("bullet.topology.replay.batch.compress.enable", true);
        bulletStormConfig.validate();
        TestQueryBolt testQueryBolt = new TestQueryBolt(bulletStormConfig);
        ComponentUtils.prepare(testQueryBolt, customCollector);
        Assert.assertEquals(testQueryBolt.replayedQueriesCount, 0);
        Assert.assertEquals(testQueryBolt.initializedQueryCount, 0);
        HashMap hashMap = new HashMap();
        hashMap.put("42", new PubSubMessage("42", SerializerDeserializer.toBytes(QueryUtils.makeSimpleAggregationFieldFilterQuery("b235gf23b", 5, Window.Unit.RECORD, 1, Window.Unit.RECORD, 1)), new Metadata()));
        hashMap.put("43", new PubSubMessage("43", SerializerDeserializer.toBytes(QueryUtils.makeSimpleAggregationFieldFilterQuery("b235gf23b", 5, Window.Unit.RECORD, 1, Window.Unit.RECORD, 1)), new Metadata()));
        byte[] compress = BatchManager.compress(hashMap);
        Tuple makeIDTuple = TupleUtils.makeIDTuple(TupleClassifier.Type.BATCH_TUPLE, "FilterBolt-18");
        Mockito.when(makeIDTuple.getLong(1)).thenReturn(Long.valueOf(testQueryBolt.startTimestamp));
        Mockito.when(makeIDTuple.getInteger(2)).thenReturn(0);
        Mockito.when(makeIDTuple.getValue(3)).thenReturn(compress);
        testQueryBolt.onBatch(makeIDTuple);
        Assert.assertEquals(testQueryBolt.replayedQueriesCount, 2);
        Assert.assertEquals(testQueryBolt.initializedQueryCount, 2);
    }

    @Test
    public void testBatchReplayCompleted() {
        CustomTopologyContext customTopologyContext = new CustomTopologyContext();
        CustomCollector customCollector = new CustomCollector();
        BulletStormConfig bulletStormConfig = new BulletStormConfig("test_config.yaml");
        bulletStormConfig.set("bullet.topology.metrics.built.in.enable", true);
        bulletStormConfig.validate();
        TestQueryBolt testQueryBolt = new TestQueryBolt(bulletStormConfig);
        ComponentUtils.prepare(new HashMap(), testQueryBolt, customTopologyContext, customCollector);
        testQueryBolt.replayCompleted = true;
        testQueryBolt.onBatch(TupleUtils.makeIDTuple(TupleClassifier.Type.BATCH_TUPLE, "FilterBolt-18"));
        Assert.assertEquals(testQueryBolt.replayedQueriesCount, 0);
        Assert.assertEquals(testQueryBolt.initializedQueryCount, 0);
    }

    @Test
    public void testBatchNonMatchingTimestamp() {
        CustomTopologyContext customTopologyContext = new CustomTopologyContext();
        CustomCollector customCollector = new CustomCollector();
        BulletStormConfig bulletStormConfig = new BulletStormConfig("test_config.yaml");
        bulletStormConfig.set("bullet.topology.metrics.built.in.enable", true);
        bulletStormConfig.validate();
        TestQueryBolt testQueryBolt = new TestQueryBolt(bulletStormConfig);
        ComponentUtils.prepare(new HashMap(), testQueryBolt, customTopologyContext, customCollector);
        Tuple makeIDTuple = TupleUtils.makeIDTuple(TupleClassifier.Type.BATCH_TUPLE, "FilterBolt-18");
        Mockito.when(makeIDTuple.getLong(1)).thenReturn(0L);
        Mockito.when(makeIDTuple.getInteger(2)).thenReturn(0);
        Mockito.when(makeIDTuple.getValue(3)).thenReturn((Object) null);
        testQueryBolt.onBatch(makeIDTuple);
        Assert.assertEquals(testQueryBolt.replayedQueriesCount, 0);
        Assert.assertEquals(testQueryBolt.initializedQueryCount, 0);
    }

    @Test
    public void testBatchNullEndsReplay() {
        CustomCollector customCollector = new CustomCollector();
        TestQueryBolt testQueryBolt = new TestQueryBolt(new BulletStormConfig("test_config.yaml"));
        ComponentUtils.prepare(testQueryBolt, customCollector);
        Assert.assertFalse(testQueryBolt.replayCompleted);
        Tuple makeIDTuple = TupleUtils.makeIDTuple(TupleClassifier.Type.BATCH_TUPLE, "FilterBolt-18");
        Mockito.when(makeIDTuple.getLong(1)).thenReturn(Long.valueOf(testQueryBolt.startTimestamp));
        Mockito.when(makeIDTuple.getInteger(2)).thenReturn(0);
        Mockito.when(makeIDTuple.getValue(3)).thenReturn((Object) null);
        testQueryBolt.onBatch(makeIDTuple);
        Assert.assertTrue(testQueryBolt.replayCompleted);
    }

    @Test
    public void testEmitReplayRequest() {
        CustomCollector customCollector = new CustomCollector();
        TestQueryBolt testQueryBolt = new TestQueryBolt(new BulletStormConfig("test_config.yaml"));
        Assert.assertEquals(customCollector.getEmittedCount(), 0);
        ComponentUtils.prepare(new HashMap(), testQueryBolt, new CustomTopologyContext(null, "QueryBolt", 0, 5), customCollector);
        Assert.assertEquals(customCollector.getEmittedCount(), 1);
        CustomCollector.Triplet triplet = customCollector.getEmitted().get(0);
        Assert.assertEquals(triplet.getStreamId(), "feedback");
        Assert.assertEquals(triplet.getTuple().size(), 2);
        Assert.assertEquals(triplet.getTuple().get(0), "QueryBolt-5");
        Assert.assertEquals(((Metadata) triplet.getTuple().get(1)).getSignal(), Metadata.Signal.REPLAY);
        Assert.assertEquals(((Metadata) triplet.getTuple().get(1)).getContent(), Long.valueOf(testQueryBolt.startTimestamp));
        testQueryBolt.emitReplayRequestIfNecessary();
        Assert.assertEquals(customCollector.getEmittedCount(), 1);
        testQueryBolt.lastReplayRequest -= testQueryBolt.replayRequestInterval;
        testQueryBolt.emitReplayRequestIfNecessary();
        Assert.assertEquals(customCollector.getEmittedCount(), 2);
    }

    @Test
    public void testHandleReplaySignal() {
        CustomCollector customCollector = new CustomCollector();
        TestQueryBolt testQueryBolt = new TestQueryBolt(new BulletStormConfig());
        ComponentUtils.prepare(testQueryBolt, customCollector);
        testQueryBolt.replayEnabled = true;
        testQueryBolt.startTimestamp = 0L;
        testQueryBolt.replayCompleted = true;
        testQueryBolt.batchCount = 1;
        testQueryBolt.replayedQueriesCount = 1;
        Assert.assertEquals(testQueryBolt.lastReplayRequest, 0L);
        Assert.assertEquals(customCollector.getEmittedCount(), 0);
        testQueryBolt.onMeta(TupleUtils.makeIDTuple(TupleClassifier.Type.METADATA_TUPLE, "123", new Metadata(Metadata.Signal.REPLAY, (Serializable) null)));
        Assert.assertNotEquals(Long.valueOf(testQueryBolt.startTimestamp), 0);
        Assert.assertFalse(testQueryBolt.replayCompleted);
        Assert.assertEquals(testQueryBolt.batchCount, 0);
        Assert.assertEquals(testQueryBolt.replayedQueriesCount, 0);
        Assert.assertNotEquals(Long.valueOf(testQueryBolt.lastReplayRequest), 0);
        Assert.assertEquals(customCollector.getEmittedCount(), 1);
    }

    @Test
    public void testHandleReplaySignalDoesNothingWhenReplayDisabled() {
        CustomCollector customCollector = new CustomCollector();
        TestQueryBolt testQueryBolt = new TestQueryBolt(new BulletStormConfig());
        ComponentUtils.prepare(testQueryBolt, customCollector);
        Assert.assertFalse(testQueryBolt.replayEnabled);
        Assert.assertTrue(testQueryBolt.replayCompleted);
        Assert.assertEquals(testQueryBolt.lastReplayRequest, 0L);
        Assert.assertEquals(customCollector.getEmittedCount(), 0);
        testQueryBolt.onMeta(TupleUtils.makeIDTuple(TupleClassifier.Type.METADATA_TUPLE, "123", new Metadata(Metadata.Signal.REPLAY, (Serializable) null)));
        Assert.assertFalse(testQueryBolt.replayEnabled);
        Assert.assertTrue(testQueryBolt.replayCompleted);
        Assert.assertEquals(testQueryBolt.lastReplayRequest, 0L);
        Assert.assertEquals(customCollector.getEmittedCount(), 0);
    }
}
