package com.yahoo.bullet.storm;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.yahoo.bullet.common.BulletConfig;
import com.yahoo.bullet.common.BulletError;
import com.yahoo.bullet.common.SerializerDeserializer;
import com.yahoo.bullet.pubsub.Metadata;
import com.yahoo.bullet.pubsub.PubSubMessage;
import com.yahoo.bullet.query.Query;
import com.yahoo.bullet.query.QueryUtils;
import com.yahoo.bullet.query.Window;
import com.yahoo.bullet.query.aggregations.DistributionType;
import com.yahoo.bullet.query.expressions.BinaryExpression;
import com.yahoo.bullet.query.expressions.FieldExpression;
import com.yahoo.bullet.query.expressions.Operation;
import com.yahoo.bullet.query.expressions.ValueExpression;
import com.yahoo.bullet.querying.Querier;
import com.yahoo.bullet.querying.RateLimitError;
import com.yahoo.bullet.querying.aggregations.FrequentItemsSketchingStrategy;
import com.yahoo.bullet.querying.aggregations.FrequentItemsSketchingStrategyTest;
import com.yahoo.bullet.querying.aggregations.QuantileSketchingStrategy;
import com.yahoo.bullet.querying.aggregations.QuantileSketchingStrategyTest;
import com.yahoo.bullet.querying.aggregations.ThetaSketchingStrategy;
import com.yahoo.bullet.querying.aggregations.ThetaSketchingStrategyTest;
import com.yahoo.bullet.querying.aggregations.TupleSketchingStrategy;
import com.yahoo.bullet.querying.aggregations.TupleSketchingStrategyTest;
import com.yahoo.bullet.querying.aggregations.grouping.GroupData;
import com.yahoo.bullet.querying.aggregations.grouping.GroupOperation;
import com.yahoo.bullet.record.BulletRecord;
import com.yahoo.bullet.result.Clip;
import com.yahoo.bullet.result.Meta;
import com.yahoo.bullet.result.RecordBox;
import com.yahoo.bullet.storm.TupleClassifier;
import com.yahoo.bullet.storm.testing.ComponentUtils;
import com.yahoo.bullet.storm.testing.CustomCollector;
import com.yahoo.bullet.storm.testing.CustomOutputFieldsDeclarer;
import com.yahoo.bullet.storm.testing.CustomTopologyContext;
import com.yahoo.bullet.storm.testing.TestHelpers;
import com.yahoo.bullet.storm.testing.TupleUtils;
import com.yahoo.bullet.windowing.SlidingRecord;
import com.yahoo.sketches.frequencies.ErrorType;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.mockito.AdditionalAnswers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/yahoo/bullet/storm/JoinBoltTest.class */
public class JoinBoltTest {
    private static final Metadata EMPTY = new Metadata();
    private static final Metadata COMPLETED = new Metadata(Metadata.Signal.COMPLETE, (Serializable) null);
    private static final Metadata FAILED = new Metadata(Metadata.Signal.FAIL, (Serializable) null);
    private static final int RAW_MAX_SIZE = 5;
    private BulletStormConfig config;
    private CustomCollector collector;
    private CustomTopologyContext context;
    private JoinBolt bolt;

    /* loaded from: input_file:com/yahoo/bullet/storm/JoinBoltTest$ClosableJoinBolt.class */
    private static class ClosableJoinBolt extends JoinBolt {
        private final int closeAfter;
        private boolean shouldBuffer;

        public ClosableJoinBolt(BulletStormConfig bulletStormConfig, int i, boolean z) {
            super(bulletStormConfig);
            this.closeAfter = i;
            this.shouldBuffer = z;
        }

        protected Querier createQuerier(Querier.Mode mode, String str, Query query, Metadata metadata, BulletConfig bulletConfig) {
            Querier querier = (Querier) Mockito.spy(super.createQuerier(mode, str, query, metadata, bulletConfig));
            List list = (List) IntStream.range(0, this.closeAfter).mapToObj(i -> {
                return false;
            }).collect(ArrayList::new, (v0, v1) -> {
                v0.add(v1);
            }, (v0, v1) -> {
                v0.addAll(v1);
            });
            list.add(true);
            ((Querier) Mockito.doAnswer(AdditionalAnswers.returnsElementsOf(list)).when(querier)).isClosed();
            ((Querier) Mockito.doReturn(Boolean.valueOf(this.shouldBuffer)).when(querier)).shouldBuffer();
            return querier;
        }

        public void setShouldBuffer(boolean z) {
            this.shouldBuffer = z;
        }
    }

    /* loaded from: input_file:com/yahoo/bullet/storm/JoinBoltTest$DonableJoinBolt.class */
    private static class DonableJoinBolt extends JoinBolt {
        private final int doneAfter;
        private boolean shouldBuffer;

        public DonableJoinBolt(BulletStormConfig bulletStormConfig, int i, boolean z) {
            super(bulletStormConfig);
            this.doneAfter = i;
            this.shouldBuffer = z;
        }

        protected Querier createQuerier(Querier.Mode mode, String str, Query query, Metadata metadata, BulletConfig bulletConfig) {
            Querier querier = (Querier) Mockito.spy(super.createQuerier(mode, str, query, metadata, bulletConfig));
            List list = (List) IntStream.range(0, this.doneAfter).mapToObj(i -> {
                return false;
            }).collect(ArrayList::new, (v0, v1) -> {
                v0.add(v1);
            }, (v0, v1) -> {
                v0.addAll(v1);
            });
            list.add(true);
            ((Querier) Mockito.doAnswer(AdditionalAnswers.returnsElementsOf(list)).when(querier)).isDone();
            ((Querier) Mockito.doReturn(Boolean.valueOf(this.shouldBuffer)).when(querier)).shouldBuffer();
            return querier;
        }

        public void setShouldBuffer(boolean z) {
            this.shouldBuffer = z;
        }
    }

    /* loaded from: input_file:com/yahoo/bullet/storm/JoinBoltTest$RateLimitedJoinBolt.class */
    private static class RateLimitedJoinBolt extends JoinBolt {
        private final int limitedAfter;
        private final RateLimitError error;

        private RateLimitedJoinBolt(int i, RateLimitError rateLimitError, BulletStormConfig bulletStormConfig) {
            super(bulletStormConfig);
            this.limitedAfter = i;
            this.error = rateLimitError;
        }

        protected Querier createQuerier(Querier.Mode mode, String str, Query query, Metadata metadata, BulletConfig bulletConfig) {
            Querier querier = (Querier) Mockito.spy(super.createQuerier(mode, str, query, metadata, bulletConfig));
            List list = (List) IntStream.range(0, this.limitedAfter).mapToObj(i -> {
                return false;
            }).collect(ArrayList::new, (v0, v1) -> {
                v0.add(v1);
            }, (v0, v1) -> {
                v0.addAll(v1);
            });
            list.add(true);
            ((Querier) Mockito.doAnswer(AdditionalAnswers.returnsElementsOf(list)).when(querier)).isExceedingRateLimit();
            ((Querier) Mockito.doReturn(this.error).when(querier)).getRateLimitError();
            return querier;
        }
    }

    private static List<BulletRecord> sendRawRecordTuplesTo(IRichBolt iRichBolt, String str, int i, int i2) {
        ArrayList arrayList = new ArrayList();
        int i3 = 0;
        while (true) {
            int i4 = i3;
            if (i4 >= i) {
                return arrayList;
            }
            BulletRecord[] bulletRecordArr = new BulletRecord[i2];
            for (int i5 = 0; i5 < i2; i5++) {
                bulletRecordArr[i5] = RecordBox.get().add("field", String.valueOf(i4 + i5)).getRecord();
            }
            iRichBolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.DATA_TUPLE, str, TestHelpers.getListBytes(bulletRecordArr)));
            arrayList.addAll(Arrays.asList(bulletRecordArr));
            i3 = i4 + i2;
        }
    }

    private static List<BulletRecord> sendRawRecordTuplesTo(IRichBolt iRichBolt, String str, int i) {
        return sendRawRecordTuplesTo(iRichBolt, str, i, 1);
    }

    private static List<BulletRecord> sendRawRecordTuplesTo(IRichBolt iRichBolt, String str) {
        return sendRawRecordTuplesTo(iRichBolt, str, RAW_MAX_SIZE);
    }

    private static void sendRawByteTuplesTo(IRichBolt iRichBolt, String str, List<byte[]> list) {
        Iterator<byte[]> it = list.iterator();
        while (it.hasNext()) {
            iRichBolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.DATA_TUPLE, str, it.next()));
        }
    }

    private static List<BulletRecord> sendSlidingWindowWithRawRecordTuplesTo(IRichBolt iRichBolt, String str, int i) {
        BulletRecord[] bulletRecordArr = new BulletRecord[i];
        for (int i2 = 0; i2 < i; i2++) {
            bulletRecordArr[i2] = RecordBox.get().add("field", String.valueOf(i2)).getRecord();
        }
        iRichBolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.DATA_TUPLE, str, SerializerDeserializer.toBytes(new SlidingRecord.Data(bulletRecordArr.length, TestHelpers.getListBytes(bulletRecordArr)))));
        return Arrays.asList(bulletRecordArr);
    }

    private static byte[] getGroupDataWithCount(String str, int i) {
        GroupData groupData = new GroupData(new HashSet(Collections.singletonList(new GroupOperation(GroupOperation.GroupOperationType.COUNT, (String) null, str))));
        IntStream.range(0, i).forEach(i2 -> {
            groupData.consume(RecordBox.get().getRecord());
        });
        return SerializerDeserializer.toBytes(groupData);
    }

    private static void enableMetadataInConfig(BulletStormConfig bulletStormConfig, String str, String str2) {
        Map map = (Map) bulletStormConfig.getOrDefault("bullet.result.metadata.metrics", new HashMap());
        map.put(str, str2);
        bulletStormConfig.set("bullet.result.metadata.enable", true);
        bulletStormConfig.set("bullet.result.metadata.metrics", map);
    }

    private static BulletStormConfig configWithRawMaxAndNoMeta() {
        BulletStormConfig bulletStormConfig = new BulletStormConfig();
        bulletStormConfig.set("bullet.query.aggregation.raw.max.size", Integer.valueOf(RAW_MAX_SIZE));
        bulletStormConfig.set("bullet.result.metadata.enable", false);
        bulletStormConfig.validate();
        return bulletStormConfig;
    }

    private static BulletStormConfig configWithRawMaxAndEmptyMeta() {
        BulletStormConfig bulletStormConfig = new BulletStormConfig();
        bulletStormConfig.set("bullet.query.aggregation.raw.max.size", Integer.valueOf(RAW_MAX_SIZE));
        bulletStormConfig.set("bullet.result.metadata.enable", true);
        bulletStormConfig.validate();
        bulletStormConfig.set("bullet.result.metadata.metrics", new HashMap());
        return bulletStormConfig;
    }

    private static boolean isSameResult(List<Object> list, List<Object> list2) {
        return (list.size() == 3) & (list.size() == list2.size()) & list.get(0).equals(list2.get(0)) & list.get(1).equals(list2.get(1));
    }

    private static boolean isSameMetadata(Object obj, Object obj2) {
        Metadata metadata = (Metadata) obj;
        Metadata metadata2 = (Metadata) obj2;
        if (metadata.getSignal() != metadata2.getSignal()) {
            return false;
        }
        Serializable content = metadata.getContent();
        Serializable content2 = metadata2.getContent();
        return content == content2 || content.equals(content2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean resultTupleEquals(List<Object> list, Tuple tuple) {
        List values = tuple.getValues();
        return isSameResult(list, values) && isSameMetadata(list.get(2), values.get(2));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean metadataTupleEquals(List<Object> list, Tuple tuple) {
        List values = tuple.getValues();
        return (list.size() == 2) & (list.size() == values.size()) & list.get(0).equals(values.get(0));
    }

    private boolean wasResultEmittedTo(String str, Tuple tuple) {
        return this.collector.getAllEmittedTo(str).anyMatch(triplet -> {
            return resultTupleEquals(triplet.getTuple(), tuple);
        });
    }

    private boolean wasResultEmitted(Tuple tuple) {
        return this.collector.getTuplesEmittedTo("default").anyMatch(list -> {
            return resultTupleEquals(list, tuple);
        });
    }

    private boolean wasMetadataEmittedTo(String str, Tuple tuple) {
        return this.collector.getAllEmittedTo(str).anyMatch(triplet -> {
            return metadataTupleEquals(triplet.getTuple(), tuple);
        });
    }

    @BeforeMethod
    public void setup() {
        this.config = configWithRawMaxAndNoMeta();
        setup(new JoinBolt(this.config));
    }

    public void setup(JoinBolt joinBolt) {
        this.collector = new CustomCollector();
        this.context = new CustomTopologyContext();
        this.bolt = ComponentUtils.prepare(new HashMap(), joinBolt, this.context, this.collector);
    }

    @Test
    public void testOutputFields() {
        CustomOutputFieldsDeclarer customOutputFieldsDeclarer = new CustomOutputFieldsDeclarer();
        this.bolt.declareOutputFields(customOutputFieldsDeclarer);
        Fields fields = new Fields(new String[]{"id", "result", "metadata"});
        Fields fields2 = new Fields(new String[]{"id", "metadata"});
        Assert.assertTrue(customOutputFieldsDeclarer.areFieldsPresent("default", false, fields));
        Assert.assertTrue(customOutputFieldsDeclarer.areFieldsPresent("feedback", false, fields2));
    }

    @Test
    public void testUnknownTuple() {
        Tuple makeRawTuple = TupleUtils.makeRawTuple("DataSource", "default", RecordBox.get().add("a", "b").getRecord());
        this.bolt.execute(makeRawTuple);
        Assert.assertFalse(this.collector.wasAcked(makeRawTuple));
    }

    @Test
    public void testJoining() {
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeRawQuery(Integer.valueOf(RAW_MAX_SIZE))), EMPTY));
        Assert.assertTrue(wasResultEmittedTo("default", TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(sendRawRecordTuplesTo(this.bolt, "42")).asJSON(), COMPLETED)));
        Assert.assertTrue(wasMetadataEmittedTo("feedback", TupleUtils.makeTuple(TupleClassifier.Type.FEEDBACK_TUPLE, "42", new Metadata(Metadata.Signal.COMPLETE, (Serializable) null))));
        Assert.assertEquals(this.collector.getAllEmittedTo("default").count(), 1L);
        Assert.assertEquals(this.collector.getAllEmittedTo("feedback").count(), 1L);
    }

    @Test
    public void testFailJoiningForNoQuery() {
        Assert.assertFalse(wasResultEmitted(TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(sendRawRecordTuplesTo(this.bolt, "42")).asJSON(), COMPLETED)));
        Assert.assertEquals(this.collector.getEmittedCount(), 0);
    }

    @Test
    public void testQueryNotDoneButIsDurationBased() {
        this.bolt = new DonableJoinBolt(this.config, 4, true);
        setup(this.bolt);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeRawQuery(Integer.valueOf(RAW_MAX_SIZE))), EMPTY));
        List<BulletRecord> sendRawRecordTuplesTo = sendRawRecordTuplesTo(this.bolt, "42", 4);
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]);
        this.bolt.execute(makeTuple);
        Tuple makeTuple2 = TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(sendRawRecordTuplesTo).asJSON(), COMPLETED);
        for (int i = 0; i < 2; i++) {
            this.bolt.execute(makeTuple);
            Assert.assertFalse(wasResultEmittedTo("default", makeTuple2));
        }
        this.bolt.execute(makeTuple);
        Assert.assertTrue(wasResultEmittedTo("default", makeTuple2));
        Assert.assertTrue(wasMetadataEmittedTo("feedback", TupleUtils.makeTuple(TupleClassifier.Type.FEEDBACK_TUPLE, "42", new Metadata(Metadata.Signal.COMPLETE, (Serializable) null))));
        Assert.assertEquals(this.collector.getAllEmittedTo("default").count(), 1L);
        Assert.assertEquals(this.collector.getAllEmittedTo("feedback").count(), 1L);
    }

    @Test
    public void testJoiningAfterLateArrivalMakingQueryFinishBeforeTickout() {
        this.bolt = new DonableJoinBolt(this.config, 2, true);
        setup(this.bolt);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeRawQuery(3)), EMPTY));
        List<BulletRecord> sendRawRecordTuplesTo = sendRawRecordTuplesTo(this.bolt, "42", 2);
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(sendRawRecordTuplesTo).asJSON(), COMPLETED);
        this.bolt.execute(TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]));
        Assert.assertFalse(wasResultEmittedTo("default", makeTuple));
        sendRawRecordTuplesTo.addAll(sendRawRecordTuplesTo(this.bolt, "42", 1));
        Assert.assertTrue(wasResultEmittedTo("default", TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(sendRawRecordTuplesTo).asJSON(), COMPLETED)));
        Assert.assertTrue(wasMetadataEmittedTo("feedback", TupleUtils.makeTuple(TupleClassifier.Type.FEEDBACK_TUPLE, "42", new Metadata(Metadata.Signal.COMPLETE, (Serializable) null))));
        Assert.assertEquals(this.collector.getAllEmittedTo("default").count(), 1L);
        Assert.assertEquals(this.collector.getAllEmittedTo("feedback").count(), 1L);
    }

    @Test
    public void testJoiningAfterLateArrivalWithoutFinishingQueryBeforeTickout() {
        this.bolt = new DonableJoinBolt(this.config, 2, true);
        setup(this.bolt);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeRawQuery(Integer.valueOf(RAW_MAX_SIZE))), EMPTY));
        List<BulletRecord> sendRawRecordTuplesTo = sendRawRecordTuplesTo(this.bolt, "42", 2);
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]);
        this.bolt.execute(makeTuple);
        sendRawRecordTuplesTo.addAll(sendRawRecordTuplesTo(this.bolt, "42", 2));
        for (int i = 0; i < 2; i++) {
            this.bolt.execute(makeTuple);
            Assert.assertEquals(this.collector.getEmittedCount(), 0);
        }
        this.bolt.execute(makeTuple);
        Assert.assertTrue(wasResultEmittedTo("default", TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(sendRawRecordTuplesTo).asJSON(), COMPLETED)));
        Assert.assertTrue(wasMetadataEmittedTo("feedback", TupleUtils.makeTuple(TupleClassifier.Type.FEEDBACK_TUPLE, "42", new Metadata(Metadata.Signal.COMPLETE, (Serializable) null))));
        Assert.assertEquals(this.collector.getAllEmittedTo("default").count(), 1L);
        Assert.assertEquals(this.collector.getAllEmittedTo("feedback").count(), 1L);
    }

    @Test
    public void testMultiJoining() {
        DonableJoinBolt donableJoinBolt = new DonableJoinBolt(this.config, 2, true);
        this.bolt = donableJoinBolt;
        setup(this.bolt);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeRawQuery(3)), EMPTY));
        donableJoinBolt.shouldBuffer = false;
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "43", SerializerDeserializer.toBytes(QueryUtils.makeRawQuery(3)), EMPTY));
        List<BulletRecord> sendRawRecordTuplesTo = sendRawRecordTuplesTo(this.bolt, "42", 2);
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "43", Clip.of(sendRawRecordTuplesTo(this.bolt, "43", 3)).asJSON(), COMPLETED);
        Tuple makeTuple2 = TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(sendRawRecordTuplesTo).asJSON(), COMPLETED);
        Assert.assertTrue(wasResultEmitted(makeTuple));
        Assert.assertFalse(wasResultEmitted(makeTuple2));
        Tuple makeTuple3 = TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]);
        this.bolt.execute(makeTuple3);
        for (int i = 0; i < 2; i++) {
            this.bolt.execute(makeTuple3);
            Assert.assertFalse(wasResultEmitted(makeTuple2));
        }
        this.bolt.execute(makeTuple3);
        Assert.assertTrue(wasResultEmitted(makeTuple2));
        Assert.assertTrue(wasMetadataEmittedTo("feedback", TupleUtils.makeTuple(TupleClassifier.Type.FEEDBACK_TUPLE, "42", new Metadata(Metadata.Signal.COMPLETE, (Serializable) null))));
        Assert.assertTrue(wasMetadataEmittedTo("feedback", TupleUtils.makeTuple(TupleClassifier.Type.FEEDBACK_TUPLE, "43", new Metadata(Metadata.Signal.COMPLETE, (Serializable) null))));
        Assert.assertEquals(this.collector.getAllEmittedTo("default").count(), 2L);
        Assert.assertEquals(this.collector.getAllEmittedTo("feedback").count(), 2L);
    }

    @Test
    public void testErrorEmittedProperly() {
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", new byte[0], EMPTY));
        Assert.assertEquals(this.collector.getEmittedCount(), 1);
        Assert.assertTrue(isSameResult(this.collector.getNthTupleEmittedTo("default", 1).get(), TupleUtils.makeTuple("42", Clip.of(Meta.of(new BulletError[]{BulletError.makeError("java.lang.NullPointerException", "Error initializing query")})).asJSON(), FAILED).getValues()));
    }

    @Test
    public void testQueryIdentifierMetadata() {
        this.config = configWithRawMaxAndEmptyMeta();
        enableMetadataInConfig(this.config, Meta.Concept.QUERY_METADATA.getName(), "meta");
        enableMetadataInConfig(this.config, Meta.Concept.QUERY_ID.getName(), "id");
        setup(new JoinBolt(this.config));
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeRawQuery(Integer.valueOf(RAW_MAX_SIZE))), EMPTY));
        List<BulletRecord> sendRawRecordTuplesTo = sendRawRecordTuplesTo(this.bolt, "42");
        Meta meta = new Meta();
        meta.add("meta", Collections.singletonMap("id", "42"));
        Assert.assertTrue(wasResultEmittedTo("default", TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(sendRawRecordTuplesTo).add(meta).asJSON(), COMPLETED)));
        Assert.assertTrue(wasMetadataEmittedTo("feedback", TupleUtils.makeTuple(TupleClassifier.Type.FEEDBACK_TUPLE, "42", new Metadata(Metadata.Signal.COMPLETE, (Serializable) null))));
        Assert.assertEquals(this.collector.getAllEmittedTo("default").count(), 1L);
        Assert.assertEquals(this.collector.getAllEmittedTo("feedback").count(), 1L);
    }

    @Test
    public void testUnknownConceptMetadata() {
        this.config = configWithRawMaxAndEmptyMeta();
        enableMetadataInConfig(this.config, Meta.Concept.QUERY_METADATA.getName(), "meta");
        enableMetadataInConfig(this.config, Meta.Concept.QUERY_ID.getName(), "id");
        enableMetadataInConfig(this.config, "foo", "bar");
        setup(new JoinBolt(this.config));
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeRawQuery(Integer.valueOf(RAW_MAX_SIZE))), EMPTY));
        List<BulletRecord> sendRawRecordTuplesTo = sendRawRecordTuplesTo(this.bolt, "42");
        Meta meta = new Meta();
        meta.add("meta", Collections.singletonMap("id", "42"));
        Assert.assertTrue(wasResultEmittedTo("default", TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(sendRawRecordTuplesTo).add(meta).asJSON(), COMPLETED)));
        Assert.assertTrue(wasMetadataEmittedTo("feedback", TupleUtils.makeTuple(TupleClassifier.Type.FEEDBACK_TUPLE, "42", new Metadata(Metadata.Signal.COMPLETE, (Serializable) null))));
        Assert.assertEquals(this.collector.getAllEmittedTo("default").count(), 1L);
        Assert.assertEquals(this.collector.getAllEmittedTo("feedback").count(), 1L);
    }

    @Test
    public void testMultipleMeta() {
        this.config = configWithRawMaxAndEmptyMeta();
        enableMetadataInConfig(this.config, Meta.Concept.QUERY_METADATA.getName(), "meta");
        enableMetadataInConfig(this.config, Meta.Concept.QUERY_ID.getName(), "id");
        enableMetadataInConfig(this.config, Meta.Concept.QUERY_OBJECT.getName(), "query object");
        enableMetadataInConfig(this.config, Meta.Concept.QUERY_STRING.getName(), "query string");
        enableMetadataInConfig(this.config, Meta.Concept.QUERY_RECEIVE_TIME.getName(), "created");
        enableMetadataInConfig(this.config, Meta.Concept.QUERY_FINISH_TIME.getName(), "finished");
        setup(new JoinBolt(this.config));
        long currentTimeMillis = System.currentTimeMillis();
        Query makeRawQuery = QueryUtils.makeRawQuery(Integer.valueOf(RAW_MAX_SIZE));
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(makeRawQuery), new Metadata(Metadata.Signal.COMPLETE, "foo")));
        List<BulletRecord> sendRawRecordTuplesTo = sendRawRecordTuplesTo(this.bolt, "42");
        long currentTimeMillis2 = System.currentTimeMillis();
        Assert.assertEquals(this.collector.getAllEmittedTo("default").count(), 1L);
        Assert.assertEquals(this.collector.getAllEmittedTo("feedback").count(), 1L);
        String str = (String) this.collector.getMthElementFromNthTupleEmittedTo("default", 1, 1).get();
        JsonParser jsonParser = new JsonParser();
        JsonObject asJsonObject = jsonParser.parse(str).getAsJsonObject();
        TestHelpers.assertJSONEquals(asJsonObject.get("records").toString(), jsonParser.parse(Clip.of(sendRawRecordTuplesTo).asJSON()).getAsJsonObject().get("records").toString());
        JsonObject asJsonObject2 = asJsonObject.get("meta").getAsJsonObject().get("meta").getAsJsonObject();
        String asString = asJsonObject2.get("id").getAsString();
        String asString2 = asJsonObject2.get("query object").getAsString();
        String asString3 = asJsonObject2.get("query string").getAsString();
        long asLong = asJsonObject2.get("created").getAsLong();
        long asLong2 = asJsonObject2.get("finished").getAsLong();
        Assert.assertEquals(asString, "42");
        Assert.assertEquals(asString2, makeRawQuery.toString());
        Assert.assertEquals(asString3, "foo");
        Assert.assertTrue(asLong <= asLong2);
        Assert.assertTrue(asLong >= currentTimeMillis && asLong <= currentTimeMillis2);
    }

    @Test
    public void testBadBytes() {
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", new byte[0], EMPTY));
        Assert.assertEquals(this.collector.getAllEmittedTo("default").count(), 1L);
        Assert.assertEquals(this.collector.getAllEmittedTo("feedback").count(), 0L);
    }

    @Test
    public void testErrorInQueryWithoutMetadata() {
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeRawQuery(Integer.valueOf(RAW_MAX_SIZE)))));
        Assert.assertEquals(this.collector.getEmittedCount(), 1);
        Assert.assertTrue(isSameResult(this.collector.getNthTupleEmittedTo("default", 1).get(), TupleUtils.makeTuple("42", Clip.of(Meta.of(new BulletError[]{BulletError.makeError("java.lang.NullPointerException", "Error initializing query")})).asJSON(), FAILED).getValues()));
        Assert.assertEquals(this.collector.getAllEmittedTo("default").count(), 1L);
        Assert.assertEquals(this.collector.getEmittedCount(), 1);
    }

    @Test
    public void testRawQueryDoneButNotTimedOutWithExcessRecords() {
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeRawQuery(Integer.valueOf(RAW_MAX_SIZE))), EMPTY));
        Assert.assertTrue(wasResultEmittedTo("default", TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(sendRawRecordTuplesTo(this.bolt, "42", RAW_MAX_SIZE, 3).subList(0, RAW_MAX_SIZE)).asJSON(), COMPLETED)));
        Assert.assertTrue(wasMetadataEmittedTo("feedback", TupleUtils.makeTuple(TupleClassifier.Type.FEEDBACK_TUPLE, "42", new Metadata(Metadata.Signal.COMPLETE, (Serializable) null))));
        Assert.assertEquals(this.collector.getAllEmittedTo("default").count(), 1L);
        Assert.assertEquals(this.collector.getAllEmittedTo("feedback").count(), 1L);
    }

    @Test
    public void testCounting() {
        this.bolt = new DonableJoinBolt(this.config, RAW_MAX_SIZE, true);
        setup(this.bolt);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeGroupAllFieldFilterQuery("timestamp", Arrays.asList("1", "2"), Operation.EQUALS_ANY, Collections.singletonList(new GroupOperation(GroupOperation.GroupOperationType.COUNT, (String) null, "cnt")))), EMPTY));
        IntStream.range(1, 6).forEach(i -> {
            sendRawByteTuplesTo(this.bolt, "42", Collections.singletonList(getGroupDataWithCount("cnt", i)));
        });
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(Collections.singletonList(RecordBox.get().add("cnt", 15L).getRecord())).asJSON(), COMPLETED);
        Tuple makeTuple2 = TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]);
        this.bolt.execute(makeTuple2);
        for (int i2 = 0; i2 < 2; i2++) {
            this.bolt.execute(makeTuple2);
            Assert.assertFalse(wasResultEmittedTo("default", makeTuple));
        }
        this.bolt.execute(makeTuple2);
        Assert.assertTrue(wasResultEmittedTo("default", makeTuple));
        Assert.assertTrue(wasMetadataEmittedTo("feedback", TupleUtils.makeTuple(TupleClassifier.Type.FEEDBACK_TUPLE, "42", new Metadata(Metadata.Signal.COMPLETE, (Serializable) null))));
        Assert.assertEquals(this.collector.getAllEmittedTo("default").count(), 1L);
        Assert.assertEquals(this.collector.getAllEmittedTo("feedback").count(), 1L);
    }

    /* JADX WARN: Type inference failed for: r2v11, types: [byte[], java.lang.Object[]] */
    @Test
    public void testCountDistinct() {
        BulletConfig makeConfiguration = ThetaSketchingStrategyTest.makeConfiguration(8, 512);
        ThetaSketchingStrategy makeCountDistinct = ThetaSketchingStrategyTest.makeCountDistinct(makeConfiguration, Collections.singletonList("field"), "count", new Map.Entry[0]);
        Stream mapToObj = IntStream.range(0, 256).mapToObj(i -> {
            return RecordBox.get().add("field", Integer.valueOf(i)).getRecord();
        });
        makeCountDistinct.getClass();
        mapToObj.forEach(makeCountDistinct::consume);
        byte[] data = makeCountDistinct.getData();
        ThetaSketchingStrategy makeCountDistinct2 = ThetaSketchingStrategyTest.makeCountDistinct(makeConfiguration, Collections.singletonList("field"), "count", new Map.Entry[0]);
        Stream mapToObj2 = IntStream.range(128, 256).mapToObj(i2 -> {
            return RecordBox.get().add("field", Integer.valueOf(i2)).getRecord();
        });
        makeCountDistinct2.getClass();
        mapToObj2.forEach(makeCountDistinct2::consume);
        byte[] data2 = makeCountDistinct2.getData();
        this.bolt = new DonableJoinBolt(this.config, 2, true);
        setup(this.bolt);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeCountDistinctQuery(Collections.singletonList("field"), "count")), EMPTY));
        sendRawByteTuplesTo(this.bolt, "42", Arrays.asList(new byte[]{data, data2}));
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(Collections.singletonList(RecordBox.get().add("count", 256L).getRecord())).asJSON(), COMPLETED);
        Tuple makeTuple2 = TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]);
        this.bolt.execute(makeTuple2);
        for (int i3 = 0; i3 < 2; i3++) {
            this.bolt.execute(makeTuple2);
            Assert.assertFalse(wasResultEmittedTo("default", makeTuple));
        }
        this.bolt.execute(makeTuple2);
        Assert.assertTrue(wasResultEmittedTo("default", makeTuple));
        Assert.assertTrue(wasMetadataEmittedTo("feedback", TupleUtils.makeTuple(TupleClassifier.Type.FEEDBACK_TUPLE, "42", new Metadata(Metadata.Signal.COMPLETE, (Serializable) null))));
        Assert.assertEquals(this.collector.getAllEmittedTo("default").count(), 1L);
        Assert.assertEquals(this.collector.getAllEmittedTo("feedback").count(), 1L);
    }

    /* JADX WARN: Type inference failed for: r2v18, types: [byte[], java.lang.Object[]] */
    @Test
    public void testGroupBy() {
        BulletConfig makeConfiguration = TupleSketchingStrategyTest.makeConfiguration(16);
        TupleSketchingStrategy makeGroupBy = TupleSketchingStrategyTest.makeGroupBy(makeConfiguration, Collections.singletonMap("fieldA", "A"), 16, new GroupOperation[]{new GroupOperation(GroupOperation.GroupOperationType.COUNT, (String) null, "cnt"), new GroupOperation(GroupOperation.GroupOperationType.SUM, "fieldB", "sumB")});
        Stream mapToObj = IntStream.range(0, 256).mapToObj(i -> {
            return RecordBox.get().add("fieldA", Integer.valueOf(i % 16)).add("fieldB", Integer.valueOf(i / 16)).getRecord();
        });
        makeGroupBy.getClass();
        mapToObj.forEach(makeGroupBy::consume);
        byte[] data = makeGroupBy.getData();
        TupleSketchingStrategy makeGroupBy2 = TupleSketchingStrategyTest.makeGroupBy(makeConfiguration, Collections.singletonMap("fieldA", "A"), 16, new GroupOperation[]{new GroupOperation(GroupOperation.GroupOperationType.COUNT, (String) null, "cnt"), new GroupOperation(GroupOperation.GroupOperationType.SUM, "fieldB", "sumB")});
        Stream mapToObj2 = IntStream.range(256, 1024).mapToObj(i2 -> {
            return RecordBox.get().add("fieldA", Integer.valueOf(i2 % 16)).add("fieldB", Integer.valueOf(i2 / 16)).getRecord();
        });
        makeGroupBy2.getClass();
        mapToObj2.forEach(makeGroupBy2::consume);
        byte[] data2 = makeGroupBy2.getData();
        this.bolt = new DonableJoinBolt(this.config, 2, true);
        setup(this.bolt);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeGroupByFilterQuery(new BinaryExpression(new FieldExpression("ts"), new ValueExpression("1"), Operation.EQUALS), 16, Collections.singletonMap("fieldA", "A"), Arrays.asList(new GroupOperation(GroupOperation.GroupOperationType.COUNT, (String) null, "cnt"), new GroupOperation(GroupOperation.GroupOperationType.SUM, "fieldB", "sumB")))), EMPTY));
        sendRawByteTuplesTo(this.bolt, "42", Arrays.asList(new byte[]{data, data2}));
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]);
        this.bolt.execute(makeTuple);
        for (int i3 = 0; i3 < 2; i3++) {
            this.bolt.execute(makeTuple);
            Assert.assertEquals(this.collector.getEmittedCount(), 0);
        }
        this.bolt.execute(makeTuple);
        Assert.assertEquals(this.collector.getEmittedCount(), 2);
        Assert.assertEquals(new JsonParser().parse((String) this.collector.getMthElementFromNthTupleEmittedTo("default", 1, 1).get()).getAsJsonObject().get("records").getAsJsonArray().size(), 16);
    }

    /* JADX WARN: Type inference failed for: r2v11, types: [byte[], java.lang.Object[]] */
    @Test
    public void testDistribution() {
        BulletConfig makeConfiguration = QuantileSketchingStrategyTest.makeConfiguration(10, 128);
        QuantileSketchingStrategy makeDistribution = QuantileSketchingStrategyTest.makeDistribution(makeConfiguration, 10, "field", DistributionType.PMF, 3);
        Stream mapToObj = IntStream.range(0, 50).mapToObj(i -> {
            return RecordBox.get().add("field", Integer.valueOf(i)).getRecord();
        });
        makeDistribution.getClass();
        mapToObj.forEach(makeDistribution::consume);
        byte[] data = makeDistribution.getData();
        QuantileSketchingStrategy makeDistribution2 = QuantileSketchingStrategyTest.makeDistribution(makeConfiguration, 10, "field", DistributionType.PMF, 3);
        Stream mapToObj2 = IntStream.range(50, 101).mapToObj(i2 -> {
            return RecordBox.get().add("field", Integer.valueOf(i2)).getRecord();
        });
        makeDistribution2.getClass();
        mapToObj2.forEach(makeDistribution2::consume);
        byte[] data2 = makeDistribution2.getData();
        this.bolt = new DonableJoinBolt(this.config, 2, true);
        setup(this.bolt);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeDistributionQuery(10, DistributionType.PMF, "field", 3)), EMPTY));
        sendRawByteTuplesTo(this.bolt, "42", Arrays.asList(new byte[]{data, data2}));
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(Arrays.asList(RecordBox.get().add("Range", "(-∞ to 0.0)").add("Count", Double.valueOf(0.0d)).add("Probability", Double.valueOf(0.0d)).getRecord(), RecordBox.get().add("Range", "[0.0 to 50.0)").add("Count", Double.valueOf(50.0d)).add("Probability", Double.valueOf(0.49504950495049505d)).getRecord(), RecordBox.get().add("Range", "[50.0 to 100.0)").add("Count", Double.valueOf(50.0d)).add("Probability", Double.valueOf(0.49504950495049505d)).getRecord(), RecordBox.get().add("Range", "[100.0 to +∞)").add("Count", Double.valueOf(1.0d)).add("Probability", Double.valueOf(0.009900990099009901d)).getRecord())).asJSON(), COMPLETED);
        Tuple makeTuple2 = TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]);
        this.bolt.execute(makeTuple2);
        for (int i3 = 0; i3 < 2; i3++) {
            this.bolt.execute(makeTuple2);
            Assert.assertFalse(wasResultEmittedTo("default", makeTuple));
        }
        this.bolt.execute(makeTuple2);
        Assert.assertTrue(wasResultEmittedTo("default", makeTuple));
        Assert.assertTrue(wasMetadataEmittedTo("feedback", TupleUtils.makeTuple(TupleClassifier.Type.FEEDBACK_TUPLE, "42", new Metadata(Metadata.Signal.COMPLETE, (Serializable) null))));
        Assert.assertEquals(this.collector.getAllEmittedTo("default").count(), 1L);
        Assert.assertEquals(this.collector.getAllEmittedTo("feedback").count(), 1L);
    }

    /* JADX WARN: Type inference failed for: r2v14, types: [byte[], java.lang.Object[]] */
    @Test
    public void testTopK() {
        BulletConfig makeConfiguration = FrequentItemsSketchingStrategyTest.makeConfiguration(ErrorType.NO_FALSE_NEGATIVES, 16);
        HashMap hashMap = new HashMap();
        hashMap.put("A", "");
        hashMap.put("B", "foo");
        FrequentItemsSketchingStrategy makeTopK = FrequentItemsSketchingStrategyTest.makeTopK(makeConfiguration, hashMap, 2, "cnt", 5L, (List) null);
        Stream mapToObj = IntStream.range(0, 32).mapToObj(i -> {
            return RecordBox.get().add("A", Integer.valueOf(i % 8)).getRecord();
        });
        makeTopK.getClass();
        mapToObj.forEach(makeTopK::consume);
        byte[] data = makeTopK.getData();
        FrequentItemsSketchingStrategy makeTopK2 = FrequentItemsSketchingStrategyTest.makeTopK(makeConfiguration, hashMap, 2, "cnt", 5L, (List) null);
        Stream mapToObj2 = IntStream.range(0, 8).mapToObj(i2 -> {
            return RecordBox.get().add("A", Integer.valueOf(i2 % 2)).getRecord();
        });
        makeTopK2.getClass();
        mapToObj2.forEach(makeTopK2::consume);
        byte[] data2 = makeTopK2.getData();
        this.bolt = new DonableJoinBolt(this.config, 2, true);
        setup(this.bolt);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeTopKQuery(2, 5L, "cnt", hashMap)), EMPTY));
        sendRawByteTuplesTo(this.bolt, "42", Arrays.asList(new byte[]{data, data2}));
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(Arrays.asList(RecordBox.get().add("A", "0").add("foo", "null").add("cnt", 8L).getRecord(), RecordBox.get().add("A", "1").add("foo", "null").add("cnt", 8L).getRecord())).asJSON(), COMPLETED);
        Tuple makeTuple2 = TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]);
        this.bolt.execute(makeTuple2);
        for (int i3 = 0; i3 < 2; i3++) {
            this.bolt.execute(makeTuple2);
            Assert.assertFalse(wasResultEmittedTo("default", makeTuple));
        }
        this.bolt.execute(makeTuple2);
        Assert.assertTrue(wasResultEmittedTo("default", makeTuple));
        Assert.assertTrue(wasMetadataEmittedTo("feedback", TupleUtils.makeTuple(TupleClassifier.Type.FEEDBACK_TUPLE, "42", new Metadata(Metadata.Signal.COMPLETE, (Serializable) null))));
        Assert.assertEquals(this.collector.getAllEmittedTo("default").count(), 1L);
        Assert.assertEquals(this.collector.getAllEmittedTo("feedback").count(), 1L);
    }

    @Test
    public void testQueryCountingMetrics() {
        this.config.set("bullet.topology.metrics.built.in.enable", true);
        this.config.validate();
        this.bolt = new DonableJoinBolt(this.config, 2, true);
        setup(this.bolt);
        Tuple makeIDTuple = TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeGroupAllFieldFilterQuery("timestamp", Arrays.asList("1", "2"), Operation.EQUALS_ANY, Collections.singletonList(new GroupOperation(GroupOperation.GroupOperationType.COUNT, (String) null, "cnt")))), EMPTY);
        Assert.assertEquals(this.context.getLongMetric("bullet_created_queries"), 0L);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 0L);
        Assert.assertEquals(this.context.getLongMetric("bullet_improper_queries"), 0L);
        this.bolt.execute(makeIDTuple);
        Assert.assertEquals(this.context.getLongMetric("bullet_created_queries"), 1L);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 1L);
        sendRawByteTuplesTo(this.bolt, "42", Collections.singletonList(getGroupDataWithCount("cnt", 21)));
        sendRawByteTuplesTo(this.bolt, "42", Collections.singletonList(getGroupDataWithCount("cnt", 21)));
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(Collections.singletonList(RecordBox.get().add("cnt", 42L).getRecord())).asJSON(), COMPLETED);
        Tuple makeTuple2 = TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]);
        this.bolt.execute(makeTuple2);
        for (int i = 0; i < 2; i++) {
            this.bolt.execute(makeTuple2);
        }
        Assert.assertFalse(wasResultEmittedTo("default", makeTuple));
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 1L);
        this.bolt.execute(makeTuple2);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 0L);
        Assert.assertTrue(wasResultEmittedTo("default", makeTuple));
        Assert.assertTrue(wasMetadataEmittedTo("feedback", TupleUtils.makeTuple(TupleClassifier.Type.FEEDBACK_TUPLE, "42", new Metadata(Metadata.Signal.COMPLETE, (Serializable) null))));
        Assert.assertEquals(this.collector.getAllEmittedTo("default").count(), 1L);
        Assert.assertEquals(this.collector.getAllEmittedTo("feedback").count(), 1L);
        this.bolt.execute(makeIDTuple);
        Assert.assertEquals(this.context.getLongMetric("bullet_created_queries"), 2L);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 1L);
        Assert.assertEquals(this.context.getLongMetric("bullet_improper_queries"), 0L);
        Assert.assertEquals(this.context.getLongMetric("bullet_duplicated_queries"), 0L);
        this.bolt.execute(makeIDTuple);
        Assert.assertEquals(this.context.getLongMetric("bullet_created_queries"), 2L);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 1L);
        Assert.assertEquals(this.context.getLongMetric("bullet_improper_queries"), 0L);
        Assert.assertEquals(this.context.getLongMetric("bullet_duplicated_queries"), 1L);
    }

    @Test
    public void testImproperQueryCountingMetrics() {
        this.config.set("bullet.topology.metrics.built.in.enable", true);
        setup(new JoinBolt(this.config));
        Assert.assertEquals(this.context.getLongMetric("bullet_created_queries"), 0L);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 0L);
        Assert.assertEquals(this.context.getLongMetric("bullet_improper_queries"), 0L);
        Tuple makeIDTuple = TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", new byte[0], EMPTY);
        this.bolt.execute(makeIDTuple);
        this.bolt.execute(makeIDTuple);
        Assert.assertEquals(this.context.getLongMetric("bullet_created_queries"), 0L);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 0L);
        Assert.assertEquals(this.context.getLongMetric("bullet_improper_queries"), 2L);
    }

    @Test
    public void testCustomMetricEmitInterval() {
        HashMap hashMap = new HashMap();
        hashMap.put("bullet_active_queries", 1);
        hashMap.put("default", 10);
        this.config.set("bullet.topology.metrics.built.in.emit.interval.mapping", hashMap);
        this.config.set("bullet.topology.metrics.built.in.enable", true);
        this.bolt = new DonableJoinBolt(this.config, 3, true);
        setup(this.bolt);
        Tuple makeIDTuple = TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeGroupAllFieldFilterQuery("timestamp", Arrays.asList("1", "2"), Operation.EQUALS_ANY, Collections.singletonList(new GroupOperation(GroupOperation.GroupOperationType.COUNT, (String) null, "cnt")))), EMPTY);
        Assert.assertEquals(this.context.getLongMetric(10, "bullet_created_queries"), 0L);
        Assert.assertEquals(this.context.getLongMetric(1, "bullet_active_queries"), 0L);
        Assert.assertEquals(this.context.getLongMetric(10, "bullet_improper_queries"), 0L);
        this.bolt.execute(makeIDTuple);
        Assert.assertEquals(this.context.getLongMetric(10, "bullet_created_queries"), 1L);
        Assert.assertEquals(this.context.getLongMetric(1, "bullet_active_queries"), 1L);
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]);
        for (int i = 0; i <= 6; i++) {
            Assert.assertEquals(this.context.getLongMetric(1, "bullet_active_queries"), 1L);
            this.bolt.execute(makeTuple);
        }
        Assert.assertTrue(wasResultEmittedTo("default", TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(Collections.singletonList(RecordBox.get().add("cnt", 0L).getRecord())).asJSON(), COMPLETED)));
        Assert.assertTrue(wasMetadataEmittedTo("feedback", TupleUtils.makeTuple(TupleClassifier.Type.FEEDBACK_TUPLE, "42", new Metadata(Metadata.Signal.COMPLETE, (Serializable) null))));
        Assert.assertEquals(this.context.getLongMetric(10, "bullet_created_queries"), 1L);
        Assert.assertEquals(this.context.getLongMetric(1, "bullet_active_queries"), 0L);
        Assert.assertEquals(this.context.getLongMetric(10, "bullet_improper_queries"), 0L);
    }

    @Test
    public void testKillSignal() {
        this.config.set("bullet.topology.metrics.built.in.enable", true);
        this.config.validate();
        setup(this.bolt);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeRawQuery(Integer.valueOf(RAW_MAX_SIZE))), EMPTY));
        sendRawRecordTuplesTo(this.bolt, "42", 4);
        Assert.assertEquals(this.collector.getEmittedCount(), 0);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 1L);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.METADATA_TUPLE, "42", new Metadata(Metadata.Signal.KILL, (Serializable) null)));
        Assert.assertEquals(this.collector.getEmittedCount(), 0);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 0L);
    }

    @Test
    public void testCompleteSignal() {
        this.config.set("bullet.topology.metrics.built.in.enable", true);
        this.config.validate();
        setup(this.bolt);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeRawQuery(Integer.valueOf(RAW_MAX_SIZE))), EMPTY));
        sendRawRecordTuplesTo(this.bolt, "42", 4);
        Assert.assertEquals(this.collector.getEmittedCount(), 0);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 1L);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.METADATA_TUPLE, "42", new Metadata(Metadata.Signal.COMPLETE, (Serializable) null)));
        Assert.assertEquals(this.collector.getEmittedCount(), 0);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 0L);
    }

    @Test
    public void testRateLimitErrorFromUpstream() {
        this.config.set("bullet.topology.metrics.built.in.enable", true);
        this.config.validate();
        setup(this.bolt);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeRawQuery(Integer.valueOf(RAW_MAX_SIZE))), EMPTY));
        List<BulletRecord> sendRawRecordTuplesTo = sendRawRecordTuplesTo(this.bolt, "42", 4);
        Assert.assertEquals(this.collector.getEmittedCount(), 0);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 1L);
        RateLimitError rateLimitError = new RateLimitError(2000.0d, 1000.0d);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.ERROR_TUPLE, "42", rateLimitError));
        Assert.assertEquals(this.collector.getEmittedCount(), 2);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 0L);
        Assert.assertTrue(wasResultEmittedTo("default", TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(sendRawRecordTuplesTo).add(rateLimitError.makeMeta()).asJSON(), new Metadata(Metadata.Signal.FAIL, (Serializable) null))));
        Assert.assertTrue(wasMetadataEmittedTo("feedback", TupleUtils.makeTuple(TupleClassifier.Type.FEEDBACK_TUPLE, "42", new Metadata(Metadata.Signal.KILL, (Serializable) null))));
        Assert.assertEquals(this.collector.getAllEmittedTo("default").count(), 1L);
        Assert.assertEquals(this.collector.getAllEmittedTo("feedback").count(), 1L);
    }

    @Test
    public void testRateLimitErrorFromUpstreamWithoutQuery() {
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.ERROR_TUPLE, "42", new RateLimitError(2000.0d, 1000.0d)));
        Assert.assertEquals(this.collector.getEmittedCount(), 0);
    }

    @Test
    public void testRateLimitingWithTicks() {
        RateLimitError rateLimitError = new RateLimitError(42.0d, 5.0d);
        this.bolt = new RateLimitedJoinBolt(2, rateLimitError, this.config);
        setup(this.bolt);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeRawQuery(10)), EMPTY));
        List<BulletRecord> sendRawRecordTuplesTo = sendRawRecordTuplesTo(this.bolt, "42", 2);
        Assert.assertEquals(this.collector.getEmittedCount(), 0);
        this.bolt.execute(TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]));
        Assert.assertEquals(this.collector.getEmittedCount(), 2);
        Assert.assertTrue(wasResultEmittedTo("default", TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(sendRawRecordTuplesTo).add(rateLimitError.makeMeta()).asJSON(), new Metadata(Metadata.Signal.FAIL, (Serializable) null))));
        Assert.assertTrue(wasMetadataEmittedTo("feedback", TupleUtils.makeTuple(TupleClassifier.Type.FEEDBACK_TUPLE, "42", new Metadata(Metadata.Signal.KILL, (Serializable) null))));
    }

    @Test
    public void testRateLimitingOnCombine() {
        RateLimitError rateLimitError = new RateLimitError(42.0d, 5.0d);
        this.bolt = new RateLimitedJoinBolt(2, rateLimitError, this.config);
        setup(this.bolt);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeRawQuery(10)), EMPTY));
        List<BulletRecord> sendRawRecordTuplesTo = sendRawRecordTuplesTo(this.bolt, "42", 4);
        Assert.assertEquals(this.collector.getEmittedCount(), 2);
        Assert.assertTrue(wasResultEmittedTo("default", TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(sendRawRecordTuplesTo.subList(0, 3)).add(rateLimitError.makeMeta()).asJSON(), new Metadata(Metadata.Signal.FAIL, (Serializable) null))));
        Assert.assertTrue(wasMetadataEmittedTo("feedback", TupleUtils.makeTuple(TupleClassifier.Type.FEEDBACK_TUPLE, "42", new Metadata(Metadata.Signal.KILL, (Serializable) null))));
    }

    @Test
    public void testDataWithoutQuery() {
        sendRawRecordTuplesTo(this.bolt, "42", 3);
        Assert.assertEquals(this.collector.getEmittedCount(), 0);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeRawQuery(Integer.valueOf(RAW_MAX_SIZE))), EMPTY));
        List<BulletRecord> sendRawRecordTuplesTo = sendRawRecordTuplesTo(this.bolt, "42", RAW_MAX_SIZE);
        Assert.assertEquals(this.collector.getEmittedCount(), 2);
        Assert.assertTrue(wasResultEmittedTo("default", TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(sendRawRecordTuplesTo).asJSON(), COMPLETED)));
        Assert.assertTrue(wasMetadataEmittedTo("feedback", TupleUtils.makeTuple(TupleClassifier.Type.FEEDBACK_TUPLE, "42", new Metadata(Metadata.Signal.COMPLETE, (Serializable) null))));
    }

    @Test
    public void testMissingMetadataIsEmitted() {
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeRawQuery(Integer.valueOf(RAW_MAX_SIZE))), EMPTY));
        List<BulletRecord> sendRawRecordTuplesTo = sendRawRecordTuplesTo(this.bolt, "42", RAW_MAX_SIZE);
        Assert.assertEquals(this.collector.getEmittedCount(), 2);
        Assert.assertTrue(wasResultEmittedTo("default", TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(sendRawRecordTuplesTo).asJSON(), COMPLETED)));
        Assert.assertTrue(wasMetadataEmittedTo("feedback", TupleUtils.makeTuple(TupleClassifier.Type.FEEDBACK_TUPLE, "42", new Metadata(Metadata.Signal.COMPLETE, (Serializable) null))));
    }

    @Test
    public void testMetadataIsNotReplaced() {
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeRawQuery(Integer.valueOf(RAW_MAX_SIZE))), new Metadata((Metadata.Signal) null, "foo")));
        List<BulletRecord> sendRawRecordTuplesTo = sendRawRecordTuplesTo(this.bolt, "42", RAW_MAX_SIZE);
        Metadata metadata = new Metadata(Metadata.Signal.COMPLETE, "foo");
        Assert.assertEquals(this.collector.getEmittedCount(), 2);
        Assert.assertTrue(wasResultEmittedTo("default", TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(sendRawRecordTuplesTo).asJSON(), metadata)));
        Assert.assertTrue(wasMetadataEmittedTo("feedback", TupleUtils.makeTuple(TupleClassifier.Type.FEEDBACK_TUPLE, "42", new Metadata(Metadata.Signal.COMPLETE, "foo"))));
    }

    @Test
    public void testQueryBeingDelayed() {
        this.config.set("bullet.topology.metrics.built.in.enable", true);
        this.config.validate();
        this.bolt = new DonableJoinBolt(this.config, RAW_MAX_SIZE, false);
        setup(this.bolt);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeSimpleAggregationQuery(Integer.valueOf(RAW_MAX_SIZE), Window.Unit.RECORD, 1, Window.Unit.RECORD, 1)), EMPTY));
        Assert.assertEquals(this.context.getLongMetric("bullet_created_queries"), 1L);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 0L);
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]);
        for (int i = 0; i < 1; i++) {
            this.bolt.execute(makeTuple);
            Assert.assertEquals(this.collector.getEmittedCount(), 0);
            Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 0L);
        }
        this.bolt.execute(makeTuple);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 1L);
        List<BulletRecord> sendSlidingWindowWithRawRecordTuplesTo = sendSlidingWindowWithRawRecordTuplesTo(this.bolt, "42", 1);
        Assert.assertEquals(this.collector.getEmittedCount(), 1);
        Assert.assertTrue(wasResultEmittedTo("default", TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(sendSlidingWindowWithRawRecordTuplesTo).asJSON(), EMPTY)));
        List<BulletRecord> sendSlidingWindowWithRawRecordTuplesTo2 = sendSlidingWindowWithRawRecordTuplesTo(this.bolt, "42", 2);
        Assert.assertEquals(this.collector.getEmittedCount(), 2);
        Assert.assertTrue(wasResultEmittedTo("default", TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(sendSlidingWindowWithRawRecordTuplesTo2).asJSON(), EMPTY)));
        List<BulletRecord> sendSlidingWindowWithRawRecordTuplesTo3 = sendSlidingWindowWithRawRecordTuplesTo(this.bolt, "42", 3);
        Assert.assertEquals(this.collector.getEmittedCount(), 3);
        Assert.assertTrue(wasResultEmittedTo("default", TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(sendSlidingWindowWithRawRecordTuplesTo3).asJSON(), EMPTY)));
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 1L);
    }

    @Test
    public void testQueryClosedWhileFinishedTerminatesTheQuery() {
        this.config.set("bullet.topology.metrics.built.in.enable", true);
        this.config.validate();
        this.bolt = new DonableJoinBolt(this.config, 2, true);
        setup(this.bolt);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeSimpleAggregationQuery(Integer.valueOf(RAW_MAX_SIZE), Window.Unit.RECORD, 1, Window.Unit.RECORD, 1)), EMPTY));
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]);
        this.bolt.execute(makeTuple);
        this.bolt.execute(makeTuple);
        this.bolt.execute(makeTuple);
        Assert.assertEquals(this.collector.getEmittedCount(), 0);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 1L);
        List<BulletRecord> sendSlidingWindowWithRawRecordTuplesTo = sendSlidingWindowWithRawRecordTuplesTo(this.bolt, "42", 2);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 0L);
        Assert.assertTrue(wasResultEmittedTo("default", TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(sendSlidingWindowWithRawRecordTuplesTo).asJSON(), new Metadata(Metadata.Signal.COMPLETE, (Serializable) null))));
        Assert.assertTrue(wasMetadataEmittedTo("feedback", TupleUtils.makeTuple(TupleClassifier.Type.FEEDBACK_TUPLE, "42", new Metadata(Metadata.Signal.COMPLETE, (Serializable) null))));
        Assert.assertEquals(this.collector.getEmittedCount(), 2);
        Assert.assertEquals(this.collector.getAllEmittedTo("default").count(), 1L);
        Assert.assertEquals(this.collector.getAllEmittedTo("feedback").count(), 1L);
    }

    @Test
    public void testWindowClosedOnTickIsImmediatelyEmitted() {
        this.config.set("bullet.topology.metrics.built.in.enable", true);
        this.config.validate();
        this.bolt = new ClosableJoinBolt(this.config, 3, false);
        setup(this.bolt);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeSimpleAggregationQuery(Integer.valueOf(RAW_MAX_SIZE), Window.Unit.TIME, 100000, Window.Unit.TIME, 100000)), EMPTY));
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 0L);
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]);
        for (int i = 0; i < 1; i++) {
            this.bolt.execute(makeTuple);
            Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 0L);
        }
        this.bolt.execute(makeTuple);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 1L);
        List<BulletRecord> sendRawRecordTuplesTo = sendRawRecordTuplesTo(this.bolt, "42", 1);
        List<BulletRecord> sendRawRecordTuplesTo2 = sendRawRecordTuplesTo(this.bolt, "42", 1);
        this.bolt.execute(makeTuple);
        Assert.assertEquals(this.collector.getEmittedCount(), 1);
        ArrayList arrayList = new ArrayList(sendRawRecordTuplesTo);
        arrayList.addAll(sendRawRecordTuplesTo2);
        Tuple makeTuple2 = TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(arrayList).asJSON(), EMPTY);
        this.bolt.execute(makeTuple);
        Assert.assertTrue(wasResultEmittedTo("default", makeTuple2));
    }

    @Test
    public void testQueryFinishedWhileBeingDelayed() {
        this.config.set("bullet.topology.metrics.built.in.enable", true);
        this.config.validate();
        this.bolt = new DonableJoinBolt(this.config, 2, false);
        setup(this.bolt);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeRawQuery(Integer.valueOf(RAW_MAX_SIZE))), EMPTY));
        Assert.assertEquals(this.context.getLongMetric("bullet_created_queries"), 1L);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 0L);
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]);
        for (int i = 0; i < 1; i++) {
            this.bolt.execute(makeTuple);
            Assert.assertEquals(this.collector.getEmittedCount(), 0);
            Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 0L);
        }
        List<BulletRecord> sendRawRecordTuplesTo = sendRawRecordTuplesTo(this.bolt, "42", 1);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 0L);
        List<BulletRecord> sendRawRecordTuplesTo2 = sendRawRecordTuplesTo(this.bolt, "42", 1);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 0L);
        List<BulletRecord> sendRawRecordTuplesTo3 = sendRawRecordTuplesTo(this.bolt, "42", 1);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 0L);
        Assert.assertEquals(this.collector.getEmittedCount(), 2);
        ArrayList arrayList = new ArrayList(sendRawRecordTuplesTo);
        arrayList.addAll(sendRawRecordTuplesTo2);
        arrayList.addAll(sendRawRecordTuplesTo3);
        Assert.assertTrue(wasResultEmittedTo("default", TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(arrayList).asJSON(), COMPLETED)));
    }

    @Test
    public void testQueryGettingDataWhileBeingDelayed() {
        this.config.set("bullet.topology.join.bolt.query.pre.start.delay.ticks", 3);
        this.config.set("bullet.topology.metrics.built.in.enable", true);
        this.config.validate();
        this.bolt = new DonableJoinBolt(this.config, 2, false);
        setup(this.bolt);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "42", SerializerDeserializer.toBytes(QueryUtils.makeRawQuery(Integer.valueOf(RAW_MAX_SIZE))), EMPTY));
        Assert.assertEquals(this.context.getLongMetric("bullet_created_queries"), 1L);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 0L);
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.TICK_TUPLE, new Object[0]);
        this.bolt.execute(makeTuple);
        List<BulletRecord> sendRawRecordTuplesTo = sendRawRecordTuplesTo(this.bolt, "42", 1);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 0L);
        this.bolt.execute(makeTuple);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 0L);
        this.bolt.execute(makeTuple);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 1L);
        Assert.assertEquals(this.collector.getEmittedCount(), 0);
        this.bolt.execute(makeTuple);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_queries"), 0L);
        Assert.assertEquals(this.collector.getEmittedCount(), 2);
        Assert.assertTrue(wasResultEmittedTo("default", TupleUtils.makeTuple(TupleClassifier.Type.RESULT_TUPLE, "42", Clip.of(sendRawRecordTuplesTo).asJSON(), COMPLETED)));
    }

    @Test
    public void testBatchTuple() {
        this.bolt = ComponentUtils.prepare(new JoinBolt(new BulletStormConfig("test_config.yaml")), this.collector);
        this.bolt.replayCompleted = true;
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.BATCH_TUPLE, "JoinBolt-18"));
        Assert.assertEquals(this.collector.getAckedCount(), 1);
    }

    @Test
    public void testBatchInitializeAndRemoveQuery() {
        this.bolt = ComponentUtils.prepare(new JoinBolt(new BulletStormConfig("test_config.yaml")), this.collector);
        Assert.assertEquals(this.bolt.replayedQueriesCount, 0);
        Assert.assertEquals(this.bolt.removedIds.size(), 0);
        Assert.assertEquals(this.bolt.getQueries().size(), 0);
        this.bolt.removeQuery("42");
        Assert.assertEquals(this.bolt.removedIds.size(), 1);
        HashMap hashMap = new HashMap();
        hashMap.put("42", new PubSubMessage("42", SerializerDeserializer.toBytes(QueryUtils.makeSimpleAggregationFieldFilterQuery("b235gf23b", Integer.valueOf(RAW_MAX_SIZE), Window.Unit.RECORD, 1, Window.Unit.RECORD, 1)), new Metadata()));
        hashMap.put("43", new PubSubMessage("43", SerializerDeserializer.toBytes(QueryUtils.makeSimpleAggregationFieldFilterQuery("b235gf23b", Integer.valueOf(RAW_MAX_SIZE), Window.Unit.RECORD, 1, Window.Unit.RECORD, 1)), new Metadata()));
        Tuple makeIDTuple = TupleUtils.makeIDTuple(TupleClassifier.Type.BATCH_TUPLE, "JoinBolt-18");
        Mockito.when(makeIDTuple.getLong(1)).thenReturn(Long.valueOf(this.bolt.startTimestamp));
        Mockito.when(makeIDTuple.getInteger(2)).thenReturn(0);
        Mockito.when(makeIDTuple.getValue(3)).thenReturn(hashMap);
        this.bolt.onBatch(makeIDTuple);
        Assert.assertEquals(this.bolt.replayedQueriesCount, 1);
        Assert.assertEquals(this.bolt.removedIds.size(), 0);
        Assert.assertEquals(this.bolt.getQueries().size(), 1);
        this.bolt.removeQuery("43");
        Assert.assertEquals(this.bolt.removedIds.size(), 1);
        Assert.assertEquals(this.bolt.getQueries().size(), 0);
        Tuple makeIDTuple2 = TupleUtils.makeIDTuple(TupleClassifier.Type.BATCH_TUPLE, "JoinBolt-18");
        Mockito.when(makeIDTuple2.getLong(1)).thenReturn(Long.valueOf(this.bolt.startTimestamp));
        Mockito.when(makeIDTuple2.getInteger(2)).thenReturn(0);
        Mockito.when(makeIDTuple2.getValue(3)).thenReturn((Object) null);
        this.bolt.onBatch(makeIDTuple2);
        Assert.assertEquals(this.bolt.removedIds.size(), 0);
        Assert.assertEquals(this.bolt.getQueries().size(), 0);
    }
}
