package com.yahoo.bullet.storm;

import com.yahoo.bullet.common.BulletConfig;
import com.yahoo.bullet.common.SerializerDeserializer;
import com.yahoo.bullet.pubsub.ByteArrayPubSubMessageSerDe;
import com.yahoo.bullet.pubsub.Metadata;
import com.yahoo.bullet.pubsub.PubSubMessage;
import com.yahoo.bullet.pubsub.PubSubMessageSerDe;
import com.yahoo.bullet.query.Field;
import com.yahoo.bullet.query.Query;
import com.yahoo.bullet.query.QueryUtils;
import com.yahoo.bullet.query.Window;
import com.yahoo.bullet.query.aggregations.DistributionType;
import com.yahoo.bullet.query.expressions.BinaryExpression;
import com.yahoo.bullet.query.expressions.CastExpression;
import com.yahoo.bullet.query.expressions.FieldExpression;
import com.yahoo.bullet.query.expressions.ListExpression;
import com.yahoo.bullet.query.expressions.Operation;
import com.yahoo.bullet.query.expressions.ValueExpression;
import com.yahoo.bullet.querying.Querier;
import com.yahoo.bullet.querying.RateLimitError;
import com.yahoo.bullet.querying.aggregations.FrequentItemsSketchingStrategy;
import com.yahoo.bullet.querying.aggregations.FrequentItemsSketchingStrategyTest;
import com.yahoo.bullet.querying.aggregations.QuantileSketchingStrategy;
import com.yahoo.bullet.querying.aggregations.QuantileSketchingStrategyTest;
import com.yahoo.bullet.querying.aggregations.ThetaSketchingStrategy;
import com.yahoo.bullet.querying.aggregations.ThetaSketchingStrategyTest;
import com.yahoo.bullet.querying.aggregations.grouping.GroupData;
import com.yahoo.bullet.querying.aggregations.grouping.GroupOperation;
import com.yahoo.bullet.record.BulletRecord;
import com.yahoo.bullet.record.BulletRecordProvider;
import com.yahoo.bullet.result.RecordBox;
import com.yahoo.bullet.storm.TupleClassifier;
import com.yahoo.bullet.storm.testing.ComponentUtils;
import com.yahoo.bullet.storm.testing.CustomCollector;
import com.yahoo.bullet.storm.testing.CustomOutputFieldsDeclarer;
import com.yahoo.bullet.storm.testing.CustomTopologyContext;
import com.yahoo.bullet.storm.testing.TestHelpers;
import com.yahoo.bullet.storm.testing.TupleUtils;
import com.yahoo.bullet.typesystem.Type;
import com.yahoo.bullet.windowing.SlidingRecord;
import com.yahoo.sketches.frequencies.ErrorType;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.mockito.AdditionalAnswers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/yahoo/bullet/storm/FilterBoltTest.class */
public class FilterBoltTest {
    private CustomCollector collector;
    private FilterBolt bolt;
    private BulletStormConfig config;
    private static BulletRecordProvider provider = new BulletStormConfig().getBulletRecordProvider();
    private static PubSubMessageSerDe querySerDe = new ByteArrayPubSubMessageSerDe(new BulletConfig());

    /* loaded from: input_file:com/yahoo/bullet/storm/FilterBoltTest$DonableFilterBolt.class */
    private static class DonableFilterBolt extends FilterBolt {
        private int doneAfter;

        DonableFilterBolt() {
            this(1, 2, new BulletStormConfig());
        }

        DonableFilterBolt(int i, BulletStormConfig bulletStormConfig) {
            this(i, 2, bulletStormConfig);
        }

        private DonableFilterBolt(int i, int i2, BulletStormConfig bulletStormConfig) {
            super("DataSource", bulletStormConfig);
            this.doneAfter = 2;
            this.doneAfter = ((2 * i) + i2) - 1;
        }

        protected Querier createQuerier(Querier.Mode mode, String str, Query query, Metadata metadata, BulletConfig bulletConfig) {
            Querier querier = (Querier) Mockito.spy(super.createQuerier(mode, str, query, metadata, bulletConfig));
            List list = (List) IntStream.range(0, this.doneAfter).mapToObj(i -> {
                return false;
            }).collect(ArrayList::new, (v0, v1) -> {
                v0.add(v1);
            }, (v0, v1) -> {
                v0.addAll(v1);
            });
            list.add(true);
            ((Querier) Mockito.doAnswer(AdditionalAnswers.returnsElementsOf(list)).when(querier)).isDone();
            return querier;
        }
    }

    /* loaded from: input_file:com/yahoo/bullet/storm/FilterBoltTest$NoQueryFilterBolt.class */
    private static class NoQueryFilterBolt extends FilterBolt {
        NoQueryFilterBolt() {
            super("DataSource", new BulletStormConfig());
        }

        protected Querier createQuerier(Querier.Mode mode, String str, Query query, Metadata metadata, BulletConfig bulletConfig) {
            return null;
        }
    }

    /* loaded from: input_file:com/yahoo/bullet/storm/FilterBoltTest$RateLimitedFilterBolt.class */
    private static class RateLimitedFilterBolt extends FilterBolt {
        private final int limitedAfter;
        private final RateLimitError error;

        private RateLimitedFilterBolt(int i, RateLimitError rateLimitError, BulletStormConfig bulletStormConfig) {
            super("DataSource", bulletStormConfig);
            this.limitedAfter = i;
            this.error = rateLimitError;
        }

        protected Querier createQuerier(Querier.Mode mode, String str, Query query, Metadata metadata, BulletConfig bulletConfig) {
            Querier querier = (Querier) Mockito.spy(super.createQuerier(mode, str, query, metadata, bulletConfig));
            List list = (List) IntStream.range(0, this.limitedAfter).mapToObj(i -> {
                return false;
            }).collect(ArrayList::new, (v0, v1) -> {
                v0.add(v1);
            }, (v0, v1) -> {
                v0.addAll(v1);
            });
            list.add(true);
            ((Querier) Mockito.doAnswer(AdditionalAnswers.returnsElementsOf(list)).when(querier)).isExceedingRateLimit();
            ((Querier) Mockito.doReturn(this.error).when(querier)).getRateLimitError();
            return querier;
        }
    }

    private static Tuple makeRecordTuple(BulletRecord bulletRecord) {
        return TupleUtils.makeRawTuple("DataSource", "default", bulletRecord);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Tuple makeRecordTuple(BulletRecord bulletRecord, long j) {
        return TupleUtils.makeRawTuple("DataSource", "default", bulletRecord, Long.valueOf(j));
    }

    private static Tuple makeDataTuple(TupleClassifier.Type type, String str, BulletRecord... bulletRecordArr) {
        return TupleUtils.makeTuple(type, str, TestHelpers.getListBytes(bulletRecordArr));
    }

    private static Tuple makeSlidingTuple(TupleClassifier.Type type, String str, BulletRecord... bulletRecordArr) {
        return TupleUtils.makeTuple(type, str, SerializerDeserializer.toBytes(new SlidingRecord.Data(bulletRecordArr.length, TestHelpers.getListBytes(bulletRecordArr))));
    }

    private boolean isSameTuple(List<Object> list, List<Object> list2) {
        return (list.size() == 2) & (list.size() == list2.size()) & list.get(0).equals(list2.get(0));
    }

    private boolean tupleEquals(List<Object> list, Tuple tuple) {
        List<Object> values = tuple.getValues();
        return isSameTuple(list, values) && Arrays.equals((byte[]) list.get(1), (byte[]) values.get(1));
    }

    private boolean wasRawRecordEmittedTo(String str, int i, Tuple tuple) {
        return this.collector.getTuplesEmittedTo(str).filter(list -> {
            return tupleEquals(list, tuple);
        }).count() == ((long) i);
    }

    private boolean wasRawRecordEmittedTo(String str, Tuple tuple) {
        return this.collector.getAllEmittedTo(str).anyMatch(triplet -> {
            return tupleEquals(triplet.getTuple(), tuple);
        });
    }

    private boolean wasRawRecordEmitted(Tuple tuple) {
        return this.collector.getTuplesEmitted().anyMatch(list -> {
            return tupleEquals(list, tuple);
        });
    }

    private byte[] getRawPayloadOfNthTuple(int i) {
        return (byte[]) this.collector.getMthElementFromNthTupleEmittedTo("default", i, 1).orElse(null);
    }

    private boolean isEqual(GroupData groupData, BulletRecord bulletRecord) {
        return groupData.getMetricsAsBulletRecord(provider).equals(bulletRecord);
    }

    private static BulletStormConfig oneRecordConfig() {
        BulletStormConfig bulletStormConfig = new BulletStormConfig();
        bulletStormConfig.set("bullet.query.aggregation.default.size", 1);
        bulletStormConfig.validate();
        return bulletStormConfig;
    }

    private static PubSubMessage makeQueryPubSubMessage(String str, Query query) {
        return querySerDe.toMessage(str, query, (String) null);
    }

    @BeforeMethod
    public void setup() {
        this.collector = new CustomCollector();
        this.config = oneRecordConfig();
        this.bolt = ComponentUtils.prepare(new FilterBolt("DataSource", this.config), this.collector);
    }

    @Test
    public void testOutputFields() {
        CustomOutputFieldsDeclarer customOutputFieldsDeclarer = new CustomOutputFieldsDeclarer();
        this.bolt.declareOutputFields(customOutputFieldsDeclarer);
        Assert.assertTrue(customOutputFieldsDeclarer.areFieldsPresent("default", false, new Fields(new String[]{"id", "data"})));
    }

    @Test
    public void testUnknownTuple() {
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "", "");
        this.bolt.execute(makeTuple);
        Assert.assertFalse(this.collector.wasAcked(makeTuple));
    }

    @Test
    public void testProjection() {
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage("42", QueryUtils.makeProjectionQuery(Arrays.asList(new Field("id", new FieldExpression("field")), new Field("mid", new FieldExpression("map_field", "id")))))));
        this.bolt.execute(makeRecordTuple(RecordBox.get().add("field", "b235gf23b").add("timestamp", 92L).addMap("map_field", new Pair[]{Pair.of("id", "123"), Pair.of("bar", "foo")}).getRecord()));
        Assert.assertTrue(wasRawRecordEmittedTo("default", 1, makeDataTuple(TupleClassifier.Type.DATA_TUPLE, "42", RecordBox.get().add("id", "b235gf23b").add("mid", "123").getRecord())));
    }

    /* JADX WARN: Type inference failed for: r1v2, types: [byte[], java.io.Serializable] */
    @Test
    public void testBadBytes() {
        PubSubMessage makeQueryPubSubMessage = makeQueryPubSubMessage("42", null);
        makeQueryPubSubMessage.setContent((Serializable) new byte[0]);
        Tuple makeIDTuple = TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage);
        this.bolt.execute(makeIDTuple);
        BulletRecord record = RecordBox.get().add("field", "b235gf23b").getRecord();
        Tuple makeRecordTuple = makeRecordTuple(record);
        this.bolt.execute(makeRecordTuple);
        Tuple makeDataTuple = makeDataTuple(TupleClassifier.Type.DATA_TUPLE, "42", record);
        Assert.assertTrue(this.collector.wasAcked(makeIDTuple));
        Assert.assertTrue(this.collector.wasAcked(makeRecordTuple));
        Assert.assertFalse(wasRawRecordEmittedTo("default", makeDataTuple));
    }

    @Test
    public void testFiltering() {
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage("42", QueryUtils.makeFieldFilterQuery("b235gf23b"))));
        BulletRecord record = RecordBox.get().add("field", "b235gf23b").getRecord();
        this.bolt.execute(makeRecordTuple(record));
        BulletRecord record2 = RecordBox.get().add("field", "wontmatch").getRecord();
        this.bolt.execute(makeRecordTuple(record2));
        Assert.assertTrue(wasRawRecordEmittedTo("default", 1, makeDataTuple(TupleClassifier.Type.DATA_TUPLE, "42", record)));
        Assert.assertFalse(wasRawRecordEmittedTo("default", makeDataTuple(TupleClassifier.Type.DATA_TUPLE, "42", record2)));
    }

    @Test
    public void testProjectionAndFiltering() {
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage("42", QueryUtils.makeProjectionFilterQuery(new BinaryExpression(new FieldExpression("map_field", "id"), new ValueExpression("123"), Operation.EQUALS), Arrays.asList(new Field("id", new FieldExpression("field")), new Field("mid", new FieldExpression("map_field", "id")))))));
        this.bolt.execute(makeRecordTuple(RecordBox.get().add("field", "b235gf23b").add("timestamp", 92L).addMap("map_field", new Pair[]{Pair.of("id", "123"), Pair.of("bar", "foo")}).getRecord()));
        Assert.assertTrue(wasRawRecordEmittedTo("default", 1, makeDataTuple(TupleClassifier.Type.DATA_TUPLE, "42", RecordBox.get().add("id", "b235gf23b").add("mid", "123").getRecord())));
    }

    @Test
    public void testFilteringUsingProjectedName() {
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage("42", QueryUtils.makeProjectionFilterQuery(new BinaryExpression(new FieldExpression("mid"), new ValueExpression("123"), Operation.EQUALS), Arrays.asList(new Field("id", new FieldExpression("field")), new Field("mid", new FieldExpression("map_field", "id")))))));
        this.bolt.execute(makeRecordTuple(RecordBox.get().add("field", "b235gf23b").add("timestamp", 92L).addMap("map_field", new Pair[]{Pair.of("id", "123"), Pair.of("bar", "foo")}).getRecord()));
        Assert.assertFalse(wasRawRecordEmittedTo("default", makeDataTuple(TupleClassifier.Type.DATA_TUPLE, "42", RecordBox.get().add("id", "b235gf23b").add("mid", "123").getRecord())));
    }

    @Test
    public void testProjectionNotLosingFilterColumn() {
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage("42", QueryUtils.makeProjectionFilterQuery(new BinaryExpression(new FieldExpression("timestamp"), new ValueExpression(92L), Operation.EQUALS), Arrays.asList(new Field("id", new FieldExpression("field")), new Field("mid", new FieldExpression("map_field", "id")))))));
        this.bolt.execute(makeRecordTuple(RecordBox.get().add("field", "b235gf23b").add("timestamp", 92L).addMap("map_field", new Pair[]{Pair.of("id", "123"), Pair.of("bar", "foo")}).getRecord()));
        Assert.assertTrue(wasRawRecordEmittedTo("default", makeDataTuple(TupleClassifier.Type.DATA_TUPLE, "42", RecordBox.get().add("id", "b235gf23b").add("mid", "123").getRecord())));
    }

    @Test
    public void testFilteringSlidingWindow() {
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage("42", QueryUtils.makeSimpleAggregationFieldFilterQuery("b235gf23b", 5, Window.Unit.RECORD, 1, Window.Unit.RECORD, 1))));
        BulletRecord record = RecordBox.get().add("field", "b235gf23b").getRecord();
        Tuple makeRecordTuple = makeRecordTuple(record);
        this.bolt.execute(makeRecordTuple);
        this.bolt.execute(makeRecordTuple);
        this.bolt.execute(makeRecordTuple);
        this.bolt.execute(makeRecordTuple);
        Assert.assertTrue(wasRawRecordEmittedTo("default", 4, makeSlidingTuple(TupleClassifier.Type.DATA_TUPLE, "42", record)));
    }

    @Test
    public void testDifferentQueryMatchingSameTuple() {
        Tuple makeIDTuple = TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage("42", QueryUtils.makeFieldFilterQuery("b235gf23b")));
        Tuple makeIDTuple2 = TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "43", makeQueryPubSubMessage("43", QueryUtils.makeFilterQuery("timestamp", Arrays.asList(1L, 2L, 3L, 45L), Operation.EQUALS_ANY)));
        this.bolt.execute(makeIDTuple);
        this.bolt.execute(makeIDTuple2);
        BulletRecord record = RecordBox.get().add("field", "b235gf23b").add("timestamp", 45L).getRecord();
        this.bolt.execute(makeRecordTuple(record));
        Tuple makeDataTuple = makeDataTuple(TupleClassifier.Type.DATA_TUPLE, "42", record);
        Tuple makeDataTuple2 = makeDataTuple(TupleClassifier.Type.DATA_TUPLE, "43", record);
        Assert.assertTrue(wasRawRecordEmittedTo("default", 1, makeDataTuple));
        Assert.assertTrue(wasRawRecordEmittedTo("default", 1, makeDataTuple2));
    }

    @Test
    public void testDifferentQueryMatchingDifferentTuple() {
        Tuple makeIDTuple = TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage("42", QueryUtils.makeFieldFilterQuery("b235gf23b")));
        Tuple makeIDTuple2 = TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "43", makeQueryPubSubMessage("43", QueryUtils.makeFilterQuery("timestamp", Arrays.asList(1L, 2L, 3L, 45L), Operation.NOT_EQUALS_ALL)));
        this.bolt.execute(makeIDTuple);
        this.bolt.execute(makeIDTuple2);
        BulletRecord record = RecordBox.get().add("field", "b235gf23b").add("timestamp", 45L).getRecord();
        BulletRecord record2 = RecordBox.get().add("field", "b235gf23b").add("timestamp", 42L).getRecord();
        Tuple makeRecordTuple = makeRecordTuple(record);
        Tuple makeRecordTuple2 = makeRecordTuple(record2);
        this.bolt.execute(makeRecordTuple);
        this.bolt.execute(makeRecordTuple2);
        Tuple makeDataTuple = makeDataTuple(TupleClassifier.Type.DATA_TUPLE, "42", record);
        Tuple makeDataTuple2 = makeDataTuple(TupleClassifier.Type.DATA_TUPLE, "42", record2);
        Tuple makeDataTuple3 = makeDataTuple(TupleClassifier.Type.DATA_TUPLE, "43", record2);
        Assert.assertTrue(wasRawRecordEmittedTo("default", 1, makeDataTuple));
        Assert.assertFalse(wasRawRecordEmittedTo("default", makeDataTuple2));
        Assert.assertTrue(wasRawRecordEmittedTo("default", 1, makeDataTuple3));
    }

    @Test
    public void testDuplicateQueryIds() {
        Tuple makeIDTuple = TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage("42", QueryUtils.makeFieldFilterQuery("b235gf23b")));
        Tuple makeIDTuple2 = TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "43", makeQueryPubSubMessage("43", QueryUtils.makeFilterQuery("timestamp", Arrays.asList("1", "2", "3", "45"), Operation.NOT_EQUALS)));
        Assert.assertEquals(this.bolt.getManager().size(), 0);
        this.bolt.execute(makeIDTuple);
        this.bolt.execute(makeIDTuple2);
        Assert.assertEquals(this.bolt.getManager().size(), 2);
        this.bolt.execute(makeIDTuple);
        this.bolt.execute(makeIDTuple2);
        Assert.assertEquals(this.bolt.getManager().size(), 2);
    }

    @Test
    public void testFailQueryInitialization() {
        this.bolt = ComponentUtils.prepare(new NoQueryFilterBolt(), this.collector);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage("42", QueryUtils.makeFieldFilterQuery("b235gf23b"))));
        BulletRecord record = RecordBox.get().add("field", "b235gf23b").getRecord();
        this.bolt.execute(makeRecordTuple(record));
        Assert.assertFalse(wasRawRecordEmittedTo("default", makeDataTuple(TupleClassifier.Type.DATA_TUPLE, "42", record)));
    }

    @Test
    public void testQueryNotDone() {
        this.bolt = ComponentUtils.prepare(new DonableFilterBolt(), this.collector);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage("42", QueryUtils.makeFieldFilterQuery("b235gf23b"))));
        BulletRecord record = RecordBox.get().add("field", "b235gf23b").getRecord();
        this.bolt.execute(makeRecordTuple(record));
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]);
        this.bolt.execute(makeTuple);
        this.bolt.execute(makeTuple);
        Assert.assertTrue(wasRawRecordEmittedTo("default", 1, makeDataTuple(TupleClassifier.Type.DATA_TUPLE, "42", record)));
    }

    @Test
    public void testQueryDone() {
        this.bolt = ComponentUtils.prepare(new DonableFilterBolt(), this.collector);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage("42", QueryUtils.makeFieldFilterQuery("b235gf23b"))));
        this.bolt.execute(makeRecordTuple(RecordBox.get().add("field", "foo").getRecord()));
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]);
        this.bolt.execute(makeTuple);
        this.bolt.execute(makeTuple);
        BulletRecord record = RecordBox.get().add("field", "b235gf23b").getRecord();
        this.bolt.execute(makeRecordTuple(record));
        Assert.assertFalse(wasRawRecordEmittedTo("default", makeDataTuple(TupleClassifier.Type.DATA_TUPLE, "42", record)));
    }

    @Test
    public void testQueryNotDoneAndThenDone() {
        this.bolt = ComponentUtils.prepare(new DonableFilterBolt(), this.collector);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage("42", QueryUtils.makeFieldFilterQuery("b235gf23b"))));
        BulletRecord record = RecordBox.get().add("field", "b235gf23b").getRecord();
        this.bolt.execute(makeRecordTuple(record));
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]);
        this.bolt.execute(makeTuple);
        this.bolt.execute(makeTuple);
        Assert.assertTrue(wasRawRecordEmittedTo("default", 1, makeDataTuple(TupleClassifier.Type.DATA_TUPLE, "42", record)));
        Assert.assertFalse(wasRawRecordEmittedTo("default", makeDataTuple(TupleClassifier.Type.DATA_TUPLE, "42", RecordBox.get().add("field", "b235gf23b").add("mid", "2342").getRecord())));
    }

    @Test
    public void testComplexFilterQuery() {
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage("42", QueryUtils.makeFilterQuery(new BinaryExpression(new BinaryExpression(new BinaryExpression(new FieldExpression("field"), new ValueExpression("abc"), Operation.EQUALS), new BinaryExpression(new BinaryExpression(new FieldExpression("experience"), new ListExpression(Arrays.asList(new ValueExpression("app"), new ValueExpression("tv"))), Operation.EQUALS_ANY), new BinaryExpression(new FieldExpression("mid"), new ValueExpression(10), Operation.GREATER_THAN), Operation.OR), Operation.AND), new BinaryExpression(new BinaryExpression(new CastExpression(new FieldExpression("demographic_map", "age"), Type.INTEGER), new ValueExpression(65), Operation.GREATER_THAN), new BinaryExpression(new FieldExpression("filter_map", "is_fake_event"), new ValueExpression(true), Operation.EQUALS), Operation.AND), Operation.OR)))));
        BulletRecord record = RecordBox.get().add("field", "abc").add("experience", "tv").add("mid", 11).getRecord();
        BulletRecord record2 = RecordBox.get().add("field", "").add("experience", "").add("mid", 0).addMap("demographic_map", new Pair[]{Pair.of("age", "67")}).addMap("filter_map", new Pair[]{Pair.of("is_fake_event", false)}).getRecord();
        Tuple makeRecordTuple = makeRecordTuple(record2);
        this.bolt.execute(makeRecordTuple);
        this.bolt.execute(makeRecordTuple);
        this.bolt.execute(makeRecordTuple(record));
        Tuple makeDataTuple = makeDataTuple(TupleClassifier.Type.DATA_TUPLE, "42", record);
        Tuple makeDataTuple2 = makeDataTuple(TupleClassifier.Type.DATA_TUPLE, "42", record2);
        Assert.assertTrue(wasRawRecordEmittedTo("default", 1, makeDataTuple));
        Assert.assertFalse(wasRawRecordEmitted(makeDataTuple2));
    }

    @Test
    public void testTuplesCustomSource() {
        this.bolt = ComponentUtils.prepare(new FilterBolt("CustomSource", oneRecordConfig()), this.collector);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage("42", QueryUtils.makeFieldFilterQuery("b235gf23b"))));
        BulletRecord record = RecordBox.get().add("field", "b235gf23b").getRecord();
        this.bolt.execute(TupleUtils.makeRawTuple("CustomSource", "default", record));
        this.bolt.execute(TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]));
        BulletRecord record2 = RecordBox.get().add("field", "wontmatch").getRecord();
        this.bolt.execute(TupleUtils.makeRawTuple("CustomSource", "default", record2));
        Assert.assertTrue(wasRawRecordEmittedTo("default", 1, makeDataTuple(TupleClassifier.Type.DATA_TUPLE, "42", record)));
        Assert.assertFalse(wasRawRecordEmitted(makeDataTuple(TupleClassifier.Type.DATA_TUPLE, "42", record2)));
    }

    @Test
    public void testGroupAllCount() {
        this.bolt = ComponentUtils.prepare(new DonableFilterBolt(15, new BulletStormConfig()), this.collector);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage("42", QueryUtils.makeGroupAllFieldFilterQuery("timestamp", Arrays.asList("1", "2"), Operation.EQUALS_ANY, Collections.singletonList(new GroupOperation(GroupOperation.GroupOperationType.COUNT, (String) null, "cnt"))))));
        Tuple makeRecordTuple = makeRecordTuple(RecordBox.get().add("timestamp", "1").getRecord());
        IntStream.range(0, 10).forEach(i -> {
            this.bolt.execute(makeRecordTuple);
        });
        Tuple makeRecordTuple2 = makeRecordTuple(RecordBox.get().getRecord());
        IntStream.range(0, 5).forEach(i2 -> {
            this.bolt.execute(makeRecordTuple2);
        });
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]);
        this.bolt.execute(makeTuple);
        this.bolt.execute(makeTuple);
        Assert.assertEquals(this.collector.getEmittedCount(), 1);
        GroupData groupData = (GroupData) SerializerDeserializer.fromBytes(getRawPayloadOfNthTuple(1));
        BulletRecord record = RecordBox.get().add("cnt", 10L).getRecord();
        Assert.assertEquals(groupData.getMetricsAsBulletRecord(provider), record);
        Assert.assertTrue(isEqual(groupData, record));
    }

    @Test
    public void testCountDistinct() {
        BulletStormConfig bulletStormConfig = new BulletStormConfig(ThetaSketchingStrategyTest.makeConfiguration(8, 512));
        this.bolt = ComponentUtils.prepare(new DonableFilterBolt(256, bulletStormConfig), this.collector);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage("42", QueryUtils.makeCountDistinctQuery(Collections.singletonList("field"), "count"))));
        Stream map = IntStream.range(0, 256).mapToObj(i -> {
            return RecordBox.get().add("field", Integer.valueOf(i)).getRecord();
        }).map(FilterBoltTest::makeRecordTuple);
        FilterBolt filterBolt = this.bolt;
        filterBolt.getClass();
        map.forEach(filterBolt::execute);
        Assert.assertEquals(this.collector.getEmittedCount(), 0);
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]);
        this.bolt.execute(makeTuple);
        this.bolt.execute(makeTuple);
        Assert.assertEquals(this.collector.getEmittedCount(), 1);
        byte[] rawPayloadOfNthTuple = getRawPayloadOfNthTuple(1);
        Assert.assertNotNull(rawPayloadOfNthTuple);
        ThetaSketchingStrategy makeCountDistinct = ThetaSketchingStrategyTest.makeCountDistinct(bulletStormConfig, Collections.singletonList("field"), "count", new Map.Entry[0]);
        makeCountDistinct.combine(rawPayloadOfNthTuple);
        Assert.assertEquals((BulletRecord) makeCountDistinct.getRecords().get(0), RecordBox.get().add("count", 256L).getRecord());
    }

    @Test
    public void testNoConsumptionAfterDone() {
        Tuple makeIDTuple = TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage("42", QueryUtils.makeSimpleAggregationFieldFilterQuery("b235gf23b", 5, Window.Unit.RECORD, 1, Window.Unit.RECORD, 1)));
        this.bolt.execute(makeIDTuple);
        BulletRecord record = RecordBox.get().add("field", "b235gf23b").getRecord();
        Tuple makeRecordTuple = makeRecordTuple(record);
        this.bolt.execute(makeRecordTuple);
        this.bolt.execute(makeRecordTuple);
        this.bolt.execute(makeRecordTuple);
        Tuple makeSlidingTuple = makeSlidingTuple(TupleClassifier.Type.DATA_TUPLE, "42", record);
        Assert.assertTrue(wasRawRecordEmittedTo("default", 3, makeSlidingTuple));
        this.collector = new CustomCollector();
        this.bolt = ComponentUtils.prepare(new DonableFilterBolt(2, 1, new BulletStormConfig()), this.collector);
        this.bolt.execute(makeIDTuple);
        this.bolt.execute(makeRecordTuple);
        this.bolt.execute(makeRecordTuple);
        this.bolt.execute(makeRecordTuple);
        Assert.assertTrue(wasRawRecordEmittedTo("default", 2, makeSlidingTuple));
    }

    @Test
    public void testDistribution() {
        BulletStormConfig bulletStormConfig = new BulletStormConfig(QuantileSketchingStrategyTest.makeConfiguration(20, 128));
        this.bolt = ComponentUtils.prepare(new DonableFilterBolt(101, bulletStormConfig), this.collector);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage("42", QueryUtils.makeDistributionQuery(10, DistributionType.PMF, "field", 3))));
        Stream map = IntStream.range(0, 101).mapToObj(i -> {
            return RecordBox.get().add("field", Integer.valueOf(i)).getRecord();
        }).map(FilterBoltTest::makeRecordTuple);
        FilterBolt filterBolt = this.bolt;
        filterBolt.getClass();
        map.forEach(filterBolt::execute);
        Assert.assertEquals(this.collector.getEmittedCount(), 0);
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]);
        this.bolt.execute(makeTuple);
        this.bolt.execute(makeTuple);
        Assert.assertEquals(this.collector.getEmittedCount(), 1);
        byte[] rawPayloadOfNthTuple = getRawPayloadOfNthTuple(1);
        Assert.assertNotNull(rawPayloadOfNthTuple);
        QuantileSketchingStrategy makeDistribution = QuantileSketchingStrategyTest.makeDistribution(bulletStormConfig, 10, "field", DistributionType.PMF, 3);
        makeDistribution.combine(rawPayloadOfNthTuple);
        List records = makeDistribution.getRecords();
        BulletRecord record = RecordBox.get().add("Range", "(-∞ to 0.0)").add("Count", Double.valueOf(0.0d)).add("Probability", Double.valueOf(0.0d)).getRecord();
        BulletRecord record2 = RecordBox.get().add("Range", "[0.0 to 50.0)").add("Count", Double.valueOf(50.0d)).add("Probability", Double.valueOf(0.49504950495049505d)).getRecord();
        BulletRecord record3 = RecordBox.get().add("Range", "[50.0 to 100.0)").add("Count", Double.valueOf(50.0d)).add("Probability", Double.valueOf(0.49504950495049505d)).getRecord();
        BulletRecord record4 = RecordBox.get().add("Range", "[100.0 to +∞)").add("Count", Double.valueOf(1.0d)).add("Probability", Double.valueOf(0.009900990099009901d)).getRecord();
        Assert.assertEquals((Iterable) records.get(0), record);
        Assert.assertEquals((Iterable) records.get(1), record2);
        Assert.assertEquals((Iterable) records.get(2), record3);
        Assert.assertEquals((Iterable) records.get(3), record4);
    }

    @Test
    public void testTopK() {
        BulletStormConfig bulletStormConfig = new BulletStormConfig(FrequentItemsSketchingStrategyTest.makeConfiguration(ErrorType.NO_FALSE_NEGATIVES, 32));
        this.bolt = ComponentUtils.prepare(new DonableFilterBolt(16, bulletStormConfig), this.collector);
        HashMap hashMap = new HashMap();
        hashMap.put("A", "");
        hashMap.put("B", "foo");
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage("42", QueryUtils.makeTopKQuery(5, (Long) null, "cnt", hashMap))));
        Stream map = IntStream.range(0, 8).mapToObj(i -> {
            return RecordBox.get().add("A", Integer.valueOf(i)).getRecord();
        }).map(FilterBoltTest::makeRecordTuple);
        FilterBolt filterBolt = this.bolt;
        filterBolt.getClass();
        map.forEach(filterBolt::execute);
        Stream map2 = IntStream.range(0, 6).mapToObj(i2 -> {
            return RecordBox.get().add("A", 0).getRecord();
        }).map(FilterBoltTest::makeRecordTuple);
        FilterBolt filterBolt2 = this.bolt;
        filterBolt2.getClass();
        map2.forEach(filterBolt2::execute);
        Stream map3 = IntStream.range(0, 2).mapToObj(i3 -> {
            return RecordBox.get().add("A", 3).getRecord();
        }).map(FilterBoltTest::makeRecordTuple);
        FilterBolt filterBolt3 = this.bolt;
        filterBolt3.getClass();
        map3.forEach(filterBolt3::execute);
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]);
        this.bolt.execute(makeTuple);
        this.bolt.execute(makeTuple);
        Assert.assertEquals(this.collector.getEmittedCount(), 1);
        byte[] rawPayloadOfNthTuple = getRawPayloadOfNthTuple(1);
        Assert.assertNotNull(rawPayloadOfNthTuple);
        FrequentItemsSketchingStrategy makeTopK = FrequentItemsSketchingStrategyTest.makeTopK(bulletStormConfig, hashMap, 2, "cnt", (Long) null, (List) null);
        makeTopK.combine(rawPayloadOfNthTuple);
        List records = makeTopK.getRecords();
        Assert.assertEquals(records.size(), 2);
        BulletRecord record = RecordBox.get().add("A", "0").add("foo", "null").add("cnt", 7L).getRecord();
        BulletRecord record2 = RecordBox.get().add("A", "3").add("foo", "null").add("cnt", 3L).getRecord();
        Assert.assertEquals((Iterable) records.get(0), record);
        Assert.assertEquals((Iterable) records.get(1), record2);
    }

    @Test
    public void testFilteringLatency() {
        this.config = new BulletStormConfig();
        this.config.set("bullet.topology.metrics.built.in.enable", true);
        this.collector = new CustomCollector();
        CustomTopologyContext customTopologyContext = new CustomTopologyContext();
        this.bolt = new FilterBolt("DataSource", this.config);
        ComponentUtils.prepare(new HashMap(), this.bolt, customTopologyContext, this.collector);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage("42", QueryUtils.makeFieldFilterQuery("bar"))));
        BulletRecord record = RecordBox.get().add("field", "foo").getRecord();
        long currentTimeMillis = System.currentTimeMillis();
        Stream mapToObj = IntStream.range(0, 10).mapToObj(i -> {
            return makeRecordTuple(record, System.currentTimeMillis());
        });
        FilterBolt filterBolt = this.bolt;
        filterBolt.getClass();
        mapToObj.forEach(filterBolt::execute);
        Assert.assertTrue(customTopologyContext.getDoubleMetric("bullet_filter_latency").doubleValue() <= ((double) (System.currentTimeMillis() - currentTimeMillis)));
    }

    @Test
    public void testRateLimiting() {
        this.config = new BulletStormConfig();
        RateLimitError rateLimitError = new RateLimitError(42.0d, 5.0d);
        this.bolt = new RateLimitedFilterBolt(2, rateLimitError, this.config);
        this.bolt = ComponentUtils.prepare(new HashMap(), this.bolt, this.collector);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage("42", QueryUtils.makeSimpleAggregationFieldFilterQuery("b235gf23b", 100, Window.Unit.RECORD, 1, Window.Unit.RECORD, 1))));
        BulletRecord record = RecordBox.get().add("field", "b235gf23b").getRecord();
        Tuple makeRecordTuple = makeRecordTuple(record);
        this.bolt.execute(makeRecordTuple);
        this.bolt.execute(makeRecordTuple);
        Assert.assertTrue(wasRawRecordEmittedTo("default", 2, makeSlidingTuple(TupleClassifier.Type.DATA_TUPLE, "42", record)));
        this.bolt.execute(makeRecordTuple);
        Assert.assertTrue(this.collector.wasNthEmitted(TupleUtils.makeIDTuple(TupleClassifier.Type.ERROR_TUPLE, "42", rateLimitError), 3));
    }

    @Test
    public void testMissingRateLimit() {
        this.config = new BulletStormConfig();
        this.bolt = new RateLimitedFilterBolt(2, null, this.config);
        this.bolt = ComponentUtils.prepare(new HashMap(), this.bolt, this.collector);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage("42", QueryUtils.makeSimpleAggregationFieldFilterQuery("b235gf23b", 100, Window.Unit.RECORD, 1, Window.Unit.RECORD, 1))));
        BulletRecord record = RecordBox.get().add("field", "b235gf23b").getRecord();
        Tuple makeRecordTuple = makeRecordTuple(record);
        this.bolt.execute(makeRecordTuple);
        this.bolt.execute(makeRecordTuple);
        Assert.assertTrue(wasRawRecordEmittedTo("default", 2, makeSlidingTuple(TupleClassifier.Type.DATA_TUPLE, "42", record)));
        Assert.assertEquals(this.collector.getEmittedCount(), 2);
        this.bolt.execute(makeRecordTuple);
        Assert.assertEquals(this.collector.getEmittedCount(), 2);
    }

    @Test
    public void testKillSignal() {
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage("42", QueryUtils.makeSimpleAggregationFieldFilterQuery("b235gf23b", 5, Window.Unit.RECORD, 1, Window.Unit.RECORD, 1))));
        BulletRecord record = RecordBox.get().add("field", "b235gf23b").getRecord();
        Tuple makeRecordTuple = makeRecordTuple(record);
        this.bolt.execute(makeRecordTuple);
        this.bolt.execute(makeRecordTuple);
        Assert.assertTrue(wasRawRecordEmittedTo("default", 2, makeSlidingTuple(TupleClassifier.Type.DATA_TUPLE, "42", record)));
        Assert.assertEquals(this.collector.getEmittedCount(), 2);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.METADATA_TUPLE, "42", new Metadata(Metadata.Signal.KILL, (Serializable) null)));
        this.bolt.execute(makeRecordTuple);
        this.bolt.execute(makeRecordTuple);
        Assert.assertEquals(this.collector.getEmittedCount(), 2);
    }

    @Test
    public void testCompleteSignal() {
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", makeQueryPubSubMessage("42", QueryUtils.makeSimpleAggregationFieldFilterQuery("b235gf23b", 5, Window.Unit.RECORD, 1, Window.Unit.RECORD, 1))));
        BulletRecord record = RecordBox.get().add("field", "b235gf23b").getRecord();
        Tuple makeRecordTuple = makeRecordTuple(record);
        this.bolt.execute(makeRecordTuple);
        this.bolt.execute(makeRecordTuple);
        Assert.assertTrue(wasRawRecordEmittedTo("default", 2, makeSlidingTuple(TupleClassifier.Type.DATA_TUPLE, "42", record)));
        Assert.assertEquals(this.collector.getEmittedCount(), 2);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.METADATA_TUPLE, "42", new Metadata(Metadata.Signal.COMPLETE, (Serializable) null)));
        this.bolt.execute(makeRecordTuple);
        this.bolt.execute(makeRecordTuple);
        Assert.assertEquals(this.collector.getEmittedCount(), 2);
    }

    @Test
    public void testStatisticsReporting() {
        this.config.set("bullet.topology.filter.bolt.stats.report.ticks", 10);
        this.config.validate();
        this.bolt = ComponentUtils.prepare(new HashMap(), new FilterBolt("DataSource", this.config), this.collector);
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]);
        for (int i = 0; i < 10; i++) {
            Assert.assertEquals(this.bolt.getStatsTickCount(), i);
            this.bolt.execute(makeTuple);
        }
        Assert.assertEquals(this.bolt.getStatsTickCount(), 0);
    }

    @Test
    public void testBatchTuple() {
        this.bolt = ComponentUtils.prepare(new FilterBolt("DataSource", new BulletStormConfig("test_config.yaml")), this.collector);
        this.bolt.replayCompleted = true;
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.BATCH_TUPLE, "FilterBolt-18"));
        Assert.assertEquals(this.collector.getAckedCount(), 1);
    }

    @Test
    public void testBatchInitializeAndRemoveQuery() {
        this.bolt = ComponentUtils.prepare(new FilterBolt("DataSource", new BulletStormConfig("test_config.yaml")), this.collector);
        Assert.assertEquals(this.bolt.replayedQueriesCount, 0);
        Assert.assertEquals(this.bolt.getManager().size(), 0);
        Assert.assertEquals(this.bolt.removedIds.size(), 0);
        this.bolt.removeQuery("42");
        Assert.assertEquals(this.bolt.removedIds.size(), 1);
        HashMap hashMap = new HashMap();
        hashMap.put("42", makeQueryPubSubMessage("42", QueryUtils.makeSimpleAggregationFieldFilterQuery("b235gf23b", 5, Window.Unit.RECORD, 1, Window.Unit.RECORD, 1)));
        hashMap.put("43", makeQueryPubSubMessage("43", QueryUtils.makeSimpleAggregationFieldFilterQuery("b235gf23b", 5, Window.Unit.RECORD, 1, Window.Unit.RECORD, 1)));
        Tuple makeIDTuple = TupleUtils.makeIDTuple(TupleClassifier.Type.BATCH_TUPLE, "FilterBolt-18");
        Mockito.when(makeIDTuple.getLong(1)).thenReturn(Long.valueOf(this.bolt.startTimestamp));
        Mockito.when(makeIDTuple.getInteger(2)).thenReturn(0);
        Mockito.when(makeIDTuple.getValue(3)).thenReturn(hashMap);
        this.bolt.onBatch(makeIDTuple);
        Assert.assertEquals(this.bolt.replayedQueriesCount, 1);
        Assert.assertEquals(this.bolt.removedIds.size(), 0);
        Assert.assertEquals(this.bolt.getManager().size(), 1);
        this.bolt.removeQuery("43");
        Assert.assertEquals(this.bolt.removedIds.size(), 1);
        Assert.assertEquals(this.bolt.getManager().size(), 0);
        Tuple makeIDTuple2 = TupleUtils.makeIDTuple(TupleClassifier.Type.BATCH_TUPLE, "FilterBolt-18");
        Mockito.when(makeIDTuple2.getLong(1)).thenReturn(Long.valueOf(this.bolt.startTimestamp));
        Mockito.when(makeIDTuple2.getInteger(2)).thenReturn(0);
        Mockito.when(makeIDTuple2.getValue(3)).thenReturn((Object) null);
        this.bolt.onBatch(makeIDTuple2);
        Assert.assertEquals(this.bolt.removedIds.size(), 0);
        Assert.assertEquals(this.bolt.getManager().size(), 0);
    }
}
