package com.yahoo.bullet.storm;

import com.yahoo.bullet.storm.grouping.IDGrouping;
import com.yahoo.bullet.storm.grouping.TaskIndexCaptureGrouping;
import com.yahoo.bullet.storm.testing.CallCountingCredentialsSpout;
import com.yahoo.bullet.storm.testing.CallCountingSpoutConnector;
import com.yahoo.bullet.storm.testing.CustomBoltDeclarer;
import com.yahoo.bullet.storm.testing.CustomIMetricsConsumer;
import com.yahoo.bullet.storm.testing.CustomIRichSpout;
import com.yahoo.bullet.storm.testing.CustomSpoutDeclarer;
import com.yahoo.bullet.storm.testing.CustomTopologyBuilder;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.tuple.Fields;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/yahoo/bullet/storm/StormUtilsTest.class */
public class StormUtilsTest {
    private static final int NUM_TESTS = 10000;
    private static final int HASH_COUNT = 10;
    private CustomTopologyBuilder builder;
    private BulletStormConfig config;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/yahoo/bullet/storm/StormUtilsTest$Unserializable.class */
    public static class Unserializable {
        private Unserializable() {
        }
    }

    private BulletStormConfig makeInvalidConfig(BulletStormConfig bulletStormConfig) {
        bulletStormConfig.set("bullet.topology.custom.unserializable", new Unserializable());
        return bulletStormConfig;
    }

    private void submitWithTopology(String str) {
        try {
            StormUtils.submit(makeInvalidConfig(this.config), str, this.builder);
        } catch (Exception e) {
        }
    }

    private void submitWithConfig(BulletStormConfig bulletStormConfig) {
        try {
            StormUtils.submit(makeInvalidConfig(bulletStormConfig), this.builder);
        } catch (Exception e) {
        }
    }

    private void submitWithTopologyAndConfig(String str, BulletStormConfig bulletStormConfig) {
        try {
            StormUtils.submit(makeInvalidConfig(bulletStormConfig), str, this.builder);
        } catch (Exception e) {
        }
    }

    private CustomBoltDeclarer getBolt(String str) {
        return this.builder.getCreatedBolts().stream().filter(customBoltDeclarer -> {
            return str.equals(customBoltDeclarer.getId());
        }).findFirst().orElse(null);
    }

    private CustomSpoutDeclarer getSpout(String str) {
        return this.builder.getCreatedSpouts().stream().filter(customSpoutDeclarer -> {
            return str.equals(customSpoutDeclarer.getId());
        }).findFirst().orElse(null);
    }

    private void assertContains(List<Pair<String, String>> list, String str, String str2) {
        Pair of = Pair.of(str, str2);
        Stream<Pair<String, String>> stream = list.stream();
        of.getClass();
        Assert.assertTrue(stream.anyMatch((v1) -> {
            return r1.equals(v1);
        }));
    }

    private void assertContains(Map<Pair<String, String>, List<Fields>> map, String str, String str2, Fields... fieldsArr) {
        List<Fields> list = map.get(Pair.of(str, str2));
        Assert.assertNotNull(list);
        List asList = Arrays.asList(fieldsArr);
        Assert.assertEquals(list.size(), asList.size());
        for (int i = 0; i < list.size(); i++) {
            Assert.assertEquals(list.get(i).toList(), ((Fields) asList.get(i)).toList());
        }
    }

    private void assertContains(Map<Pair<String, String>, CustomStreamGrouping> map, String str, String str2, Class<? extends CustomStreamGrouping> cls) {
        Assert.assertTrue(cls.isInstance(map.get(Pair.of(str, str2))));
    }

    @BeforeMethod
    public void setup() {
        this.builder = new CustomTopologyBuilder();
        this.builder.setThrowExceptionOnCreate(false);
        this.config = new BulletStormConfig();
    }

    @Test
    public void testHookingIntoExistingRecordSource() {
        this.builder.setSpout("source", new CustomIRichSpout(), Integer.valueOf(HASH_COUNT));
        Assert.assertFalse(this.builder.isTopologyCreated());
        submitWithTopology("source");
        Assert.assertTrue(this.builder.isTopologyCreated());
        Assert.assertEquals(this.builder.getCreatedSpouts().size(), 3);
        Assert.assertEquals(this.builder.getCreatedBolts().size(), 4);
        CustomSpoutDeclarer spout = getSpout("source");
        Assert.assertNotNull(spout);
        Assert.assertEquals(spout.getSpout().getClass(), CustomIRichSpout.class);
        Assert.assertEquals(spout.getParallelism(), Integer.valueOf(HASH_COUNT));
        Assert.assertNull(spout.getCpuLoad());
        Assert.assertNull(spout.getOnHeap());
        Assert.assertNull(spout.getOffHeap());
        CustomSpoutDeclarer spout2 = getSpout(TopologyConstants.TICK_COMPONENT);
        Assert.assertNotNull(spout2);
        Assert.assertEquals(spout2.getSpout().getClass(), TickSpout.class);
        Assert.assertEquals(spout2.getParallelism(), 1);
        Assert.assertEquals(spout2.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(spout2.getOnHeap(), Double.valueOf(128.0d));
        Assert.assertEquals(spout2.getOffHeap(), Double.valueOf(160.0d));
        CustomSpoutDeclarer spout3 = getSpout(TopologyConstants.QUERY_COMPONENT);
        Assert.assertNotNull(spout3);
        Assert.assertEquals(spout3.getSpout().getClass(), QuerySpout.class);
        Assert.assertEquals(spout3.getParallelism(), 2);
        Assert.assertEquals(spout3.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(spout3.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(spout3.getOffHeap(), Double.valueOf(160.0d));
        CustomBoltDeclarer bolt = getBolt(TopologyConstants.FILTER_COMPONENT);
        Assert.assertNotNull(bolt);
        Assert.assertEquals(bolt.getBolt().getClass(), FilterBolt.class);
        Assert.assertEquals(bolt.getParallelism(), BulletStormConfig.DEFAULT_FILTER_BOLT_PARALLELISM);
        Assert.assertEquals(bolt.getCpuLoad(), Double.valueOf(100.0d));
        Assert.assertEquals(bolt.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(bolt.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> allGroupings = bolt.getAllGroupings();
        Assert.assertEquals(allGroupings.size(), 3);
        assertContains(allGroupings, TopologyConstants.TICK_COMPONENT, "default");
        assertContains(allGroupings, TopologyConstants.QUERY_COMPONENT, "default");
        assertContains(allGroupings, TopologyConstants.QUERY_COMPONENT, "metadata");
        List<Pair<String, String>> shuffleGroupings = bolt.getShuffleGroupings();
        Assert.assertEquals(shuffleGroupings.size(), 1);
        assertContains(shuffleGroupings, "source", "default");
        Assert.assertTrue(bolt.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt.getFieldsGroupings().isEmpty());
        Assert.assertTrue(bolt.getCustomGroupings().isEmpty());
        CustomBoltDeclarer bolt2 = getBolt(TopologyConstants.JOIN_COMPONENT);
        Assert.assertNotNull(bolt2);
        Assert.assertEquals(bolt2.getBolt().getClass(), JoinBolt.class);
        Assert.assertEquals(bolt2.getParallelism(), 2);
        Assert.assertEquals(bolt2.getCpuLoad(), Double.valueOf(100.0d));
        Assert.assertEquals(bolt2.getOnHeap(), Double.valueOf(512.0d));
        Assert.assertEquals(bolt2.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> allGroupings2 = bolt2.getAllGroupings();
        Assert.assertEquals(allGroupings2.size(), 1);
        assertContains(allGroupings2, TopologyConstants.TICK_COMPONENT, "default");
        Map<Pair<String, String>, List<Fields>> fieldsGroupings = bolt2.getFieldsGroupings();
        Assert.assertEquals(fieldsGroupings.size(), 4);
        assertContains(fieldsGroupings, TopologyConstants.QUERY_COMPONENT, "default", new Fields(new String[]{"id"}));
        assertContains(fieldsGroupings, TopologyConstants.QUERY_COMPONENT, "metadata", new Fields(new String[]{"id"}));
        assertContains(fieldsGroupings, TopologyConstants.FILTER_COMPONENT, "default", new Fields(new String[]{"id"}));
        assertContains(fieldsGroupings, TopologyConstants.FILTER_COMPONENT, "error", new Fields(new String[]{"id"}));
        Assert.assertTrue(bolt2.getShuffleGroupings().isEmpty());
        Assert.assertTrue(bolt2.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt2.getCustomGroupings().isEmpty());
        CustomBoltDeclarer bolt3 = getBolt(TopologyConstants.RESULT_COMPONENT);
        Assert.assertNotNull(bolt3);
        Assert.assertEquals(bolt3.getBolt().getClass(), ResultBolt.class);
        Assert.assertEquals(bolt3.getParallelism(), 2);
        Assert.assertEquals(bolt3.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(bolt3.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(bolt3.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> shuffleGroupings2 = bolt3.getShuffleGroupings();
        Assert.assertEquals(shuffleGroupings2.size(), 1);
        assertContains(shuffleGroupings2, TopologyConstants.JOIN_COMPONENT, "default");
        Assert.assertTrue(bolt3.getAllGroupings().isEmpty());
        Assert.assertTrue(bolt3.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt3.getFieldsGroupings().isEmpty());
        Assert.assertTrue(bolt3.getCustomGroupings().isEmpty());
        CustomBoltDeclarer bolt4 = getBolt(TopologyConstants.LOOP_COMPONENT);
        Assert.assertNotNull(bolt4);
        Assert.assertEquals(bolt4.getBolt().getClass(), LoopBolt.class);
        Assert.assertEquals(bolt4.getParallelism(), 2);
        Assert.assertEquals(bolt4.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(bolt4.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(bolt4.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> shuffleGroupings3 = bolt4.getShuffleGroupings();
        Assert.assertEquals(shuffleGroupings3.size(), 2);
        assertContains(shuffleGroupings3, TopologyConstants.FILTER_COMPONENT, "feedback");
        assertContains(shuffleGroupings3, TopologyConstants.JOIN_COMPONENT, "feedback");
        Assert.assertTrue(bolt4.getAllGroupings().isEmpty());
        Assert.assertTrue(bolt4.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt4.getFieldsGroupings().isEmpty());
        Assert.assertTrue(bolt4.getCustomGroupings().isEmpty());
        Assert.assertNull(getBolt(TopologyConstants.REPLAY_COMPONENT));
    }

    @Test
    public void testHookingWithReplay() {
        this.config.set("bullet.topology.replay.enable", true);
        this.config.validate();
        this.builder.setSpout("source", new CustomIRichSpout(), Integer.valueOf(HASH_COUNT));
        Assert.assertFalse(this.builder.isTopologyCreated());
        submitWithTopologyAndConfig("source", this.config);
        Assert.assertTrue(this.builder.isTopologyCreated());
        Assert.assertEquals(this.builder.getCreatedSpouts().size(), 3);
        Assert.assertEquals(this.builder.getCreatedBolts().size(), 5);
        CustomSpoutDeclarer spout = getSpout("source");
        Assert.assertNotNull(spout);
        Assert.assertEquals(spout.getSpout().getClass(), CustomIRichSpout.class);
        Assert.assertEquals(spout.getParallelism(), Integer.valueOf(HASH_COUNT));
        Assert.assertNull(spout.getCpuLoad());
        Assert.assertNull(spout.getOnHeap());
        Assert.assertNull(spout.getOffHeap());
        CustomSpoutDeclarer spout2 = getSpout(TopologyConstants.TICK_COMPONENT);
        Assert.assertNotNull(spout2);
        Assert.assertEquals(spout2.getSpout().getClass(), TickSpout.class);
        Assert.assertEquals(spout2.getParallelism(), 1);
        Assert.assertEquals(spout2.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(spout2.getOnHeap(), Double.valueOf(128.0d));
        Assert.assertEquals(spout2.getOffHeap(), Double.valueOf(160.0d));
        CustomSpoutDeclarer spout3 = getSpout(TopologyConstants.QUERY_COMPONENT);
        Assert.assertNotNull(spout3);
        Assert.assertEquals(spout3.getSpout().getClass(), QuerySpout.class);
        Assert.assertEquals(spout3.getParallelism(), 2);
        Assert.assertEquals(spout3.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(spout3.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(spout3.getOffHeap(), Double.valueOf(160.0d));
        CustomBoltDeclarer bolt = getBolt(TopologyConstants.FILTER_COMPONENT);
        Assert.assertNotNull(bolt);
        Assert.assertEquals(bolt.getBolt().getClass(), FilterBolt.class);
        Assert.assertEquals(bolt.getParallelism(), BulletStormConfig.DEFAULT_FILTER_BOLT_PARALLELISM);
        Assert.assertEquals(bolt.getCpuLoad(), Double.valueOf(100.0d));
        Assert.assertEquals(bolt.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(bolt.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> allGroupings = bolt.getAllGroupings();
        Assert.assertEquals(allGroupings.size(), 3);
        assertContains(allGroupings, TopologyConstants.TICK_COMPONENT, "default");
        assertContains(allGroupings, TopologyConstants.QUERY_COMPONENT, "default");
        assertContains(allGroupings, TopologyConstants.QUERY_COMPONENT, "metadata");
        List<Pair<String, String>> shuffleGroupings = bolt.getShuffleGroupings();
        Assert.assertEquals(shuffleGroupings.size(), 1);
        assertContains(shuffleGroupings, "source", "default");
        List<Pair<String, String>> directGroupings = bolt.getDirectGroupings();
        Assert.assertEquals(directGroupings.size(), 1);
        assertContains(directGroupings, TopologyConstants.REPLAY_COMPONENT, "replay");
        Assert.assertTrue(bolt.getFieldsGroupings().isEmpty());
        Assert.assertTrue(bolt.getCustomGroupings().isEmpty());
        CustomBoltDeclarer bolt2 = getBolt(TopologyConstants.JOIN_COMPONENT);
        Assert.assertNotNull(bolt2);
        Assert.assertEquals(bolt2.getBolt().getClass(), JoinBolt.class);
        Assert.assertEquals(bolt2.getParallelism(), 2);
        Assert.assertEquals(bolt2.getCpuLoad(), Double.valueOf(100.0d));
        Assert.assertEquals(bolt2.getOnHeap(), Double.valueOf(512.0d));
        Assert.assertEquals(bolt2.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> allGroupings2 = bolt2.getAllGroupings();
        Assert.assertEquals(allGroupings2.size(), 2);
        assertContains(allGroupings2, TopologyConstants.TICK_COMPONENT, "default");
        assertContains(allGroupings2, TopologyConstants.QUERY_COMPONENT, "metadata");
        List<Pair<String, String>> directGroupings2 = bolt2.getDirectGroupings();
        Assert.assertEquals(directGroupings2.size(), 1);
        assertContains(directGroupings2, TopologyConstants.REPLAY_COMPONENT, "replay");
        Map<Pair<String, String>, CustomStreamGrouping> customGroupings = bolt2.getCustomGroupings();
        Assert.assertEquals(customGroupings.size(), 4);
        assertContains(customGroupings, TopologyConstants.QUERY_COMPONENT, "default", IDGrouping.class);
        assertContains(customGroupings, TopologyConstants.FILTER_COMPONENT, "default", IDGrouping.class);
        assertContains(customGroupings, TopologyConstants.FILTER_COMPONENT, "error", IDGrouping.class);
        assertContains(customGroupings, TopologyConstants.REPLAY_COMPONENT, "capture", TaskIndexCaptureGrouping.class);
        Assert.assertTrue(bolt2.getShuffleGroupings().isEmpty());
        Assert.assertTrue(bolt2.getFieldsGroupings().isEmpty());
        CustomBoltDeclarer bolt3 = getBolt(TopologyConstants.RESULT_COMPONENT);
        Assert.assertNotNull(bolt3);
        Assert.assertEquals(bolt3.getBolt().getClass(), ResultBolt.class);
        Assert.assertEquals(bolt3.getParallelism(), 2);
        Assert.assertEquals(bolt3.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(bolt3.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(bolt3.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> shuffleGroupings2 = bolt3.getShuffleGroupings();
        Assert.assertEquals(shuffleGroupings2.size(), 1);
        assertContains(shuffleGroupings2, TopologyConstants.JOIN_COMPONENT, "default");
        Assert.assertTrue(bolt3.getAllGroupings().isEmpty());
        Assert.assertTrue(bolt3.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt3.getFieldsGroupings().isEmpty());
        Assert.assertTrue(bolt3.getCustomGroupings().isEmpty());
        CustomBoltDeclarer bolt4 = getBolt(TopologyConstants.LOOP_COMPONENT);
        Assert.assertNotNull(bolt4);
        Assert.assertEquals(bolt4.getBolt().getClass(), LoopBolt.class);
        Assert.assertEquals(bolt4.getParallelism(), 2);
        Assert.assertEquals(bolt4.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(bolt4.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(bolt4.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> shuffleGroupings3 = bolt4.getShuffleGroupings();
        Assert.assertEquals(shuffleGroupings3.size(), 2);
        assertContains(shuffleGroupings3, TopologyConstants.FILTER_COMPONENT, "feedback");
        assertContains(shuffleGroupings3, TopologyConstants.JOIN_COMPONENT, "feedback");
        Assert.assertTrue(bolt4.getAllGroupings().isEmpty());
        Assert.assertTrue(bolt4.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt4.getFieldsGroupings().isEmpty());
        Assert.assertTrue(bolt4.getCustomGroupings().isEmpty());
        CustomBoltDeclarer bolt5 = getBolt(TopologyConstants.REPLAY_COMPONENT);
        Assert.assertNotNull(bolt5);
        Assert.assertEquals(bolt5.getBolt().getClass(), ReplayBolt.class);
        Assert.assertEquals(bolt5.getParallelism(), 2);
        Assert.assertEquals(bolt5.getCpuLoad(), Double.valueOf(100.0d));
        Assert.assertEquals(bolt5.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(bolt5.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> allGroupings3 = bolt5.getAllGroupings();
        Assert.assertEquals(allGroupings3.size(), 2);
        assertContains(allGroupings3, TopologyConstants.QUERY_COMPONENT, "default");
        assertContains(allGroupings3, TopologyConstants.QUERY_COMPONENT, "metadata");
        Map<Pair<String, String>, List<Fields>> fieldsGroupings = bolt5.getFieldsGroupings();
        Assert.assertEquals(fieldsGroupings.size(), 1);
        assertContains(fieldsGroupings, TopologyConstants.QUERY_COMPONENT, "replay", new Fields(new String[]{"id"}));
        Assert.assertTrue(bolt5.getShuffleGroupings().isEmpty());
        Assert.assertTrue(bolt5.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt5.getCustomGroupings().isEmpty());
    }

    @Test
    public void testHookingInDSLSpout() {
        this.config = new BulletStormConfig("test_dsl_config.yaml");
        this.config.set("bullet.topology.dsl.spout.enable", true);
        Assert.assertFalse(this.builder.isTopologyCreated());
        submitWithConfig(this.config);
        Assert.assertTrue(this.builder.isTopologyCreated());
        Assert.assertEquals(this.builder.getCreatedSpouts().size(), 3);
        Assert.assertEquals(this.builder.getCreatedBolts().size(), 4);
        CustomSpoutDeclarer spout = getSpout("DataSource");
        Assert.assertNotNull(spout);
        Assert.assertEquals(spout.getSpout().getClass(), DSLSpout.class);
        Assert.assertEquals(spout.getParallelism(), Integer.valueOf(HASH_COUNT));
        Assert.assertEquals(spout.getCpuLoad(), Double.valueOf(50.0d));
        Assert.assertEquals(spout.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(spout.getOffHeap(), Double.valueOf(160.0d));
        CustomSpoutDeclarer spout2 = getSpout(TopologyConstants.TICK_COMPONENT);
        Assert.assertNotNull(spout2);
        Assert.assertEquals(spout2.getSpout().getClass(), TickSpout.class);
        Assert.assertEquals(spout2.getParallelism(), 1);
        Assert.assertEquals(spout2.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(spout2.getOnHeap(), Double.valueOf(128.0d));
        Assert.assertEquals(spout2.getOffHeap(), Double.valueOf(160.0d));
        CustomSpoutDeclarer spout3 = getSpout(TopologyConstants.QUERY_COMPONENT);
        Assert.assertNotNull(spout3);
        Assert.assertEquals(spout3.getSpout().getClass(), QuerySpout.class);
        Assert.assertEquals(spout3.getParallelism(), 2);
        Assert.assertEquals(spout3.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(spout3.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(spout3.getOffHeap(), Double.valueOf(160.0d));
        CustomBoltDeclarer bolt = getBolt(TopologyConstants.FILTER_COMPONENT);
        Assert.assertNotNull(bolt);
        Assert.assertEquals(bolt.getBolt().getClass(), FilterBolt.class);
        Assert.assertEquals(bolt.getParallelism(), BulletStormConfig.DEFAULT_FILTER_BOLT_PARALLELISM);
        Assert.assertEquals(bolt.getCpuLoad(), Double.valueOf(100.0d));
        Assert.assertEquals(bolt.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(bolt.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> allGroupings = bolt.getAllGroupings();
        Assert.assertEquals(allGroupings.size(), 3);
        assertContains(allGroupings, TopologyConstants.TICK_COMPONENT, "default");
        assertContains(allGroupings, TopologyConstants.QUERY_COMPONENT, "default");
        assertContains(allGroupings, TopologyConstants.QUERY_COMPONENT, "metadata");
        List<Pair<String, String>> shuffleGroupings = bolt.getShuffleGroupings();
        Assert.assertEquals(shuffleGroupings.size(), 1);
        assertContains(shuffleGroupings, "DataSource", "default");
        Assert.assertTrue(bolt.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt.getFieldsGroupings().isEmpty());
        Assert.assertTrue(bolt.getCustomGroupings().isEmpty());
        CustomBoltDeclarer bolt2 = getBolt(TopologyConstants.JOIN_COMPONENT);
        Assert.assertNotNull(bolt2);
        Assert.assertEquals(bolt2.getBolt().getClass(), JoinBolt.class);
        Assert.assertEquals(bolt2.getParallelism(), 2);
        Assert.assertEquals(bolt2.getCpuLoad(), Double.valueOf(100.0d));
        Assert.assertEquals(bolt2.getOnHeap(), Double.valueOf(512.0d));
        Assert.assertEquals(bolt2.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> allGroupings2 = bolt2.getAllGroupings();
        Assert.assertEquals(allGroupings2.size(), 1);
        assertContains(allGroupings2, TopologyConstants.TICK_COMPONENT, "default");
        Map<Pair<String, String>, List<Fields>> fieldsGroupings = bolt2.getFieldsGroupings();
        Assert.assertEquals(fieldsGroupings.size(), 4);
        assertContains(fieldsGroupings, TopologyConstants.QUERY_COMPONENT, "default", new Fields(new String[]{"id"}));
        assertContains(fieldsGroupings, TopologyConstants.QUERY_COMPONENT, "metadata", new Fields(new String[]{"id"}));
        assertContains(fieldsGroupings, TopologyConstants.FILTER_COMPONENT, "default", new Fields(new String[]{"id"}));
        assertContains(fieldsGroupings, TopologyConstants.FILTER_COMPONENT, "error", new Fields(new String[]{"id"}));
        Assert.assertTrue(bolt2.getShuffleGroupings().isEmpty());
        Assert.assertTrue(bolt2.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt2.getCustomGroupings().isEmpty());
        CustomBoltDeclarer bolt3 = getBolt(TopologyConstants.RESULT_COMPONENT);
        Assert.assertNotNull(bolt3);
        Assert.assertEquals(bolt3.getBolt().getClass(), ResultBolt.class);
        Assert.assertEquals(bolt3.getParallelism(), 2);
        Assert.assertEquals(bolt3.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(bolt3.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(bolt3.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> shuffleGroupings2 = bolt3.getShuffleGroupings();
        Assert.assertEquals(shuffleGroupings2.size(), 1);
        assertContains(shuffleGroupings2, TopologyConstants.JOIN_COMPONENT, "default");
        Assert.assertTrue(bolt3.getAllGroupings().isEmpty());
        Assert.assertTrue(bolt3.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt3.getFieldsGroupings().isEmpty());
        Assert.assertTrue(bolt3.getCustomGroupings().isEmpty());
        CustomBoltDeclarer bolt4 = getBolt(TopologyConstants.LOOP_COMPONENT);
        Assert.assertNotNull(bolt4);
        Assert.assertEquals(bolt4.getBolt().getClass(), LoopBolt.class);
        Assert.assertEquals(bolt4.getParallelism(), 2);
        Assert.assertEquals(bolt4.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(bolt4.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(bolt4.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> shuffleGroupings3 = bolt4.getShuffleGroupings();
        Assert.assertEquals(shuffleGroupings3.size(), 2);
        assertContains(shuffleGroupings3, TopologyConstants.FILTER_COMPONENT, "feedback");
        assertContains(shuffleGroupings3, TopologyConstants.JOIN_COMPONENT, "feedback");
        Assert.assertTrue(bolt4.getAllGroupings().isEmpty());
        Assert.assertTrue(bolt4.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt4.getFieldsGroupings().isEmpty());
        Assert.assertTrue(bolt4.getCustomGroupings().isEmpty());
        Assert.assertNull(getBolt(TopologyConstants.REPLAY_COMPONENT));
    }

    @Test
    public void testHookingInDSLSpoutAndBolt() {
        this.config = new BulletStormConfig("test_dsl_config.yaml");
        this.config.set("bullet.topology.dsl.spout.enable", true);
        this.config.set("bullet.topology.dsl.bolt.enable", true);
        Assert.assertFalse(this.builder.isTopologyCreated());
        submitWithConfig(this.config);
        Assert.assertTrue(this.builder.isTopologyCreated());
        Assert.assertEquals(this.builder.getCreatedSpouts().size(), 3);
        Assert.assertEquals(this.builder.getCreatedBolts().size(), 5);
        CustomSpoutDeclarer spout = getSpout("DataSpout");
        Assert.assertNotNull(spout);
        Assert.assertEquals(spout.getSpout().getClass(), DSLSpout.class);
        Assert.assertEquals(spout.getParallelism(), Integer.valueOf(HASH_COUNT));
        Assert.assertEquals(spout.getCpuLoad(), Double.valueOf(50.0d));
        Assert.assertEquals(spout.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(spout.getOffHeap(), Double.valueOf(160.0d));
        CustomBoltDeclarer bolt = getBolt("DataSource");
        Assert.assertNotNull(bolt);
        Assert.assertEquals(bolt.getBolt().getClass(), DSLBolt.class);
        Assert.assertEquals(bolt.getParallelism(), Integer.valueOf(HASH_COUNT));
        Assert.assertEquals(bolt.getCpuLoad(), Double.valueOf(50.0d));
        Assert.assertEquals(bolt.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(bolt.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> shuffleGroupings = bolt.getShuffleGroupings();
        Assert.assertEquals(shuffleGroupings.size(), 1);
        assertContains(shuffleGroupings, "DataSpout", "default");
        Assert.assertTrue(bolt.getAllGroupings().isEmpty());
        Assert.assertTrue(bolt.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt.getFieldsGroupings().isEmpty());
        Assert.assertTrue(bolt.getCustomGroupings().isEmpty());
        CustomSpoutDeclarer spout2 = getSpout(TopologyConstants.TICK_COMPONENT);
        Assert.assertNotNull(spout2);
        Assert.assertEquals(spout2.getSpout().getClass(), TickSpout.class);
        Assert.assertEquals(spout2.getParallelism(), 1);
        Assert.assertEquals(spout2.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(spout2.getOnHeap(), Double.valueOf(128.0d));
        Assert.assertEquals(spout2.getOffHeap(), Double.valueOf(160.0d));
        CustomSpoutDeclarer spout3 = getSpout(TopologyConstants.QUERY_COMPONENT);
        Assert.assertNotNull(spout3);
        Assert.assertEquals(spout3.getSpout().getClass(), QuerySpout.class);
        Assert.assertEquals(spout3.getParallelism(), 2);
        Assert.assertEquals(spout3.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(spout3.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(spout3.getOffHeap(), Double.valueOf(160.0d));
        CustomBoltDeclarer bolt2 = getBolt(TopologyConstants.FILTER_COMPONENT);
        Assert.assertNotNull(bolt2);
        Assert.assertEquals(bolt2.getBolt().getClass(), FilterBolt.class);
        Assert.assertEquals(bolt2.getParallelism(), BulletStormConfig.DEFAULT_FILTER_BOLT_PARALLELISM);
        Assert.assertEquals(bolt2.getCpuLoad(), Double.valueOf(100.0d));
        Assert.assertEquals(bolt2.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(bolt2.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> allGroupings = bolt2.getAllGroupings();
        Assert.assertEquals(allGroupings.size(), 3);
        assertContains(allGroupings, TopologyConstants.TICK_COMPONENT, "default");
        assertContains(allGroupings, TopologyConstants.QUERY_COMPONENT, "default");
        assertContains(allGroupings, TopologyConstants.QUERY_COMPONENT, "metadata");
        List<Pair<String, String>> shuffleGroupings2 = bolt2.getShuffleGroupings();
        Assert.assertEquals(shuffleGroupings2.size(), 1);
        assertContains(shuffleGroupings2, "DataSource", "default");
        Assert.assertTrue(bolt2.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt2.getFieldsGroupings().isEmpty());
        Assert.assertTrue(bolt2.getCustomGroupings().isEmpty());
        CustomBoltDeclarer bolt3 = getBolt(TopologyConstants.JOIN_COMPONENT);
        Assert.assertNotNull(bolt3);
        Assert.assertEquals(bolt3.getBolt().getClass(), JoinBolt.class);
        Assert.assertEquals(bolt3.getParallelism(), 2);
        Assert.assertEquals(bolt3.getCpuLoad(), Double.valueOf(100.0d));
        Assert.assertEquals(bolt3.getOnHeap(), Double.valueOf(512.0d));
        Assert.assertEquals(bolt3.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> allGroupings2 = bolt3.getAllGroupings();
        Assert.assertEquals(allGroupings2.size(), 1);
        assertContains(allGroupings2, TopologyConstants.TICK_COMPONENT, "default");
        Map<Pair<String, String>, List<Fields>> fieldsGroupings = bolt3.getFieldsGroupings();
        Assert.assertEquals(fieldsGroupings.size(), 4);
        assertContains(fieldsGroupings, TopologyConstants.QUERY_COMPONENT, "default", new Fields(new String[]{"id"}));
        assertContains(fieldsGroupings, TopologyConstants.QUERY_COMPONENT, "metadata", new Fields(new String[]{"id"}));
        assertContains(fieldsGroupings, TopologyConstants.FILTER_COMPONENT, "default", new Fields(new String[]{"id"}));
        assertContains(fieldsGroupings, TopologyConstants.FILTER_COMPONENT, "error", new Fields(new String[]{"id"}));
        Assert.assertTrue(bolt3.getShuffleGroupings().isEmpty());
        Assert.assertTrue(bolt3.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt3.getCustomGroupings().isEmpty());
        CustomBoltDeclarer bolt4 = getBolt(TopologyConstants.RESULT_COMPONENT);
        Assert.assertNotNull(bolt4);
        Assert.assertEquals(bolt4.getBolt().getClass(), ResultBolt.class);
        Assert.assertEquals(bolt4.getParallelism(), 2);
        Assert.assertEquals(bolt4.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(bolt4.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(bolt4.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> shuffleGroupings3 = bolt4.getShuffleGroupings();
        Assert.assertEquals(shuffleGroupings3.size(), 1);
        assertContains(shuffleGroupings3, TopologyConstants.JOIN_COMPONENT, "default");
        Assert.assertTrue(bolt4.getAllGroupings().isEmpty());
        Assert.assertTrue(bolt4.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt4.getFieldsGroupings().isEmpty());
        Assert.assertTrue(bolt4.getCustomGroupings().isEmpty());
        CustomBoltDeclarer bolt5 = getBolt(TopologyConstants.LOOP_COMPONENT);
        Assert.assertNotNull(bolt5);
        Assert.assertEquals(bolt5.getBolt().getClass(), LoopBolt.class);
        Assert.assertEquals(bolt5.getParallelism(), 2);
        Assert.assertEquals(bolt5.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(bolt5.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(bolt5.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> shuffleGroupings4 = bolt5.getShuffleGroupings();
        Assert.assertEquals(shuffleGroupings4.size(), 2);
        assertContains(shuffleGroupings4, TopologyConstants.FILTER_COMPONENT, "feedback");
        assertContains(shuffleGroupings4, TopologyConstants.JOIN_COMPONENT, "feedback");
        Assert.assertTrue(bolt5.getAllGroupings().isEmpty());
        Assert.assertTrue(bolt5.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt5.getFieldsGroupings().isEmpty());
        Assert.assertTrue(bolt5.getCustomGroupings().isEmpty());
        Assert.assertNull(getBolt(TopologyConstants.REPLAY_COMPONENT));
    }

    @Test
    public void testHookingInDSLConnectorSpout() {
        this.config = new BulletStormConfig("test_dsl_config.yaml");
        this.config.set("bullet.topology.dsl.spout.enable", true);
        this.config.set("bullet.topology.dsl.spout.connector.as.spout.enable", true);
        this.config.set("bullet.topology.dsl.bolt.enable", false);
        this.config.set("bullet.dsl.connector.class.name", CallCountingSpoutConnector.class.getName());
        this.config.set("bullet.topology.dsl.spout.connector.class.name", CallCountingCredentialsSpout.class.getName());
        Assert.assertFalse(this.builder.isTopologyCreated());
        submitWithConfig(this.config);
        Assert.assertTrue(this.builder.isTopologyCreated());
        Assert.assertEquals(this.builder.getCreatedSpouts().size(), 3);
        Assert.assertEquals(this.builder.getCreatedBolts().size(), 4);
        CustomSpoutDeclarer spout = getSpout("DataSource");
        Assert.assertNotNull(spout);
        Assert.assertEquals(spout.getSpout().getClass(), DSLConnectorSpout.class);
        Assert.assertEquals(spout.getParallelism(), Integer.valueOf(HASH_COUNT));
        Assert.assertEquals(spout.getCpuLoad(), Double.valueOf(50.0d));
        Assert.assertEquals(spout.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(spout.getOffHeap(), Double.valueOf(160.0d));
        CustomSpoutDeclarer spout2 = getSpout(TopologyConstants.TICK_COMPONENT);
        Assert.assertNotNull(spout2);
        Assert.assertEquals(spout2.getSpout().getClass(), TickSpout.class);
        Assert.assertEquals(spout2.getParallelism(), 1);
        Assert.assertEquals(spout2.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(spout2.getOnHeap(), Double.valueOf(128.0d));
        Assert.assertEquals(spout2.getOffHeap(), Double.valueOf(160.0d));
        CustomSpoutDeclarer spout3 = getSpout(TopologyConstants.QUERY_COMPONENT);
        Assert.assertNotNull(spout3);
        Assert.assertEquals(spout3.getSpout().getClass(), QuerySpout.class);
        Assert.assertEquals(spout3.getParallelism(), 2);
        Assert.assertEquals(spout3.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(spout3.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(spout3.getOffHeap(), Double.valueOf(160.0d));
        CustomBoltDeclarer bolt = getBolt(TopologyConstants.FILTER_COMPONENT);
        Assert.assertNotNull(bolt);
        Assert.assertEquals(bolt.getBolt().getClass(), FilterBolt.class);
        Assert.assertEquals(bolt.getParallelism(), BulletStormConfig.DEFAULT_FILTER_BOLT_PARALLELISM);
        Assert.assertEquals(bolt.getCpuLoad(), Double.valueOf(100.0d));
        Assert.assertEquals(bolt.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(bolt.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> allGroupings = bolt.getAllGroupings();
        Assert.assertEquals(allGroupings.size(), 3);
        assertContains(allGroupings, TopologyConstants.TICK_COMPONENT, "default");
        assertContains(allGroupings, TopologyConstants.QUERY_COMPONENT, "default");
        assertContains(allGroupings, TopologyConstants.QUERY_COMPONENT, "metadata");
        List<Pair<String, String>> shuffleGroupings = bolt.getShuffleGroupings();
        Assert.assertEquals(shuffleGroupings.size(), 1);
        assertContains(shuffleGroupings, "DataSource", "default");
        Assert.assertTrue(bolt.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt.getFieldsGroupings().isEmpty());
        Assert.assertTrue(bolt.getCustomGroupings().isEmpty());
        CustomBoltDeclarer bolt2 = getBolt(TopologyConstants.JOIN_COMPONENT);
        Assert.assertNotNull(bolt2);
        Assert.assertEquals(bolt2.getBolt().getClass(), JoinBolt.class);
        Assert.assertEquals(bolt2.getParallelism(), 2);
        Assert.assertEquals(bolt2.getCpuLoad(), Double.valueOf(100.0d));
        Assert.assertEquals(bolt2.getOnHeap(), Double.valueOf(512.0d));
        Assert.assertEquals(bolt2.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> allGroupings2 = bolt2.getAllGroupings();
        Assert.assertEquals(allGroupings2.size(), 1);
        assertContains(allGroupings2, TopologyConstants.TICK_COMPONENT, "default");
        Map<Pair<String, String>, List<Fields>> fieldsGroupings = bolt2.getFieldsGroupings();
        Assert.assertEquals(fieldsGroupings.size(), 4);
        assertContains(fieldsGroupings, TopologyConstants.QUERY_COMPONENT, "default", new Fields(new String[]{"id"}));
        assertContains(fieldsGroupings, TopologyConstants.QUERY_COMPONENT, "metadata", new Fields(new String[]{"id"}));
        assertContains(fieldsGroupings, TopologyConstants.FILTER_COMPONENT, "default", new Fields(new String[]{"id"}));
        assertContains(fieldsGroupings, TopologyConstants.FILTER_COMPONENT, "error", new Fields(new String[]{"id"}));
        Assert.assertTrue(bolt2.getShuffleGroupings().isEmpty());
        Assert.assertTrue(bolt2.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt2.getCustomGroupings().isEmpty());
        CustomBoltDeclarer bolt3 = getBolt(TopologyConstants.RESULT_COMPONENT);
        Assert.assertNotNull(bolt3);
        Assert.assertEquals(bolt3.getBolt().getClass(), ResultBolt.class);
        Assert.assertEquals(bolt3.getParallelism(), 2);
        Assert.assertEquals(bolt3.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(bolt3.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(bolt3.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> shuffleGroupings2 = bolt3.getShuffleGroupings();
        Assert.assertEquals(shuffleGroupings2.size(), 1);
        assertContains(shuffleGroupings2, TopologyConstants.JOIN_COMPONENT, "default");
        Assert.assertTrue(bolt3.getAllGroupings().isEmpty());
        Assert.assertTrue(bolt3.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt3.getFieldsGroupings().isEmpty());
        Assert.assertTrue(bolt3.getCustomGroupings().isEmpty());
        CustomBoltDeclarer bolt4 = getBolt(TopologyConstants.LOOP_COMPONENT);
        Assert.assertNotNull(bolt4);
        Assert.assertEquals(bolt4.getBolt().getClass(), LoopBolt.class);
        Assert.assertEquals(bolt4.getParallelism(), 2);
        Assert.assertEquals(bolt4.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(bolt4.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(bolt4.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> shuffleGroupings3 = bolt4.getShuffleGroupings();
        Assert.assertEquals(shuffleGroupings3.size(), 2);
        assertContains(shuffleGroupings3, TopologyConstants.FILTER_COMPONENT, "feedback");
        assertContains(shuffleGroupings3, TopologyConstants.JOIN_COMPONENT, "feedback");
        Assert.assertTrue(bolt4.getAllGroupings().isEmpty());
        Assert.assertTrue(bolt4.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt4.getFieldsGroupings().isEmpty());
        Assert.assertTrue(bolt4.getCustomGroupings().isEmpty());
        Assert.assertNull(getBolt(TopologyConstants.REPLAY_COMPONENT));
    }

    @Test
    public void testHookingInDSLConnectorSpoutAndBolt() {
        this.config = new BulletStormConfig("test_dsl_config.yaml");
        this.config.set("bullet.topology.dsl.spout.enable", true);
        this.config.set("bullet.topology.dsl.spout.connector.as.spout.enable", true);
        this.config.set("bullet.topology.dsl.bolt.enable", true);
        this.config.set("bullet.dsl.connector.class.name", CallCountingSpoutConnector.class.getName());
        this.config.set("bullet.topology.dsl.spout.connector.class.name", CallCountingCredentialsSpout.class.getName());
        Assert.assertFalse(this.builder.isTopologyCreated());
        submitWithConfig(this.config);
        Assert.assertTrue(this.builder.isTopologyCreated());
        Assert.assertEquals(this.builder.getCreatedSpouts().size(), 3);
        Assert.assertEquals(this.builder.getCreatedBolts().size(), 5);
        CustomSpoutDeclarer spout = getSpout("DataSpout");
        Assert.assertNotNull(spout);
        Assert.assertEquals(spout.getSpout().getClass(), DSLConnectorSpout.class);
        Assert.assertEquals(spout.getParallelism(), Integer.valueOf(HASH_COUNT));
        Assert.assertEquals(spout.getCpuLoad(), Double.valueOf(50.0d));
        Assert.assertEquals(spout.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(spout.getOffHeap(), Double.valueOf(160.0d));
        CustomBoltDeclarer bolt = getBolt("DataSource");
        Assert.assertNotNull(bolt);
        Assert.assertEquals(bolt.getBolt().getClass(), DSLBolt.class);
        Assert.assertEquals(bolt.getParallelism(), Integer.valueOf(HASH_COUNT));
        Assert.assertEquals(bolt.getCpuLoad(), Double.valueOf(50.0d));
        Assert.assertEquals(bolt.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(bolt.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> shuffleGroupings = bolt.getShuffleGroupings();
        Assert.assertEquals(shuffleGroupings.size(), 1);
        assertContains(shuffleGroupings, "DataSpout", "default");
        Assert.assertTrue(bolt.getAllGroupings().isEmpty());
        Assert.assertTrue(bolt.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt.getFieldsGroupings().isEmpty());
        Assert.assertTrue(bolt.getCustomGroupings().isEmpty());
        CustomSpoutDeclarer spout2 = getSpout(TopologyConstants.TICK_COMPONENT);
        Assert.assertNotNull(spout2);
        Assert.assertEquals(spout2.getSpout().getClass(), TickSpout.class);
        Assert.assertEquals(spout2.getParallelism(), 1);
        Assert.assertEquals(spout2.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(spout2.getOnHeap(), Double.valueOf(128.0d));
        Assert.assertEquals(spout2.getOffHeap(), Double.valueOf(160.0d));
        CustomSpoutDeclarer spout3 = getSpout(TopologyConstants.QUERY_COMPONENT);
        Assert.assertNotNull(spout3);
        Assert.assertEquals(spout3.getSpout().getClass(), QuerySpout.class);
        Assert.assertEquals(spout3.getParallelism(), 2);
        Assert.assertEquals(spout3.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(spout3.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(spout3.getOffHeap(), Double.valueOf(160.0d));
        CustomBoltDeclarer bolt2 = getBolt(TopologyConstants.FILTER_COMPONENT);
        Assert.assertNotNull(bolt2);
        Assert.assertEquals(bolt2.getBolt().getClass(), FilterBolt.class);
        Assert.assertEquals(bolt2.getParallelism(), BulletStormConfig.DEFAULT_FILTER_BOLT_PARALLELISM);
        Assert.assertEquals(bolt2.getCpuLoad(), Double.valueOf(100.0d));
        Assert.assertEquals(bolt2.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(bolt2.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> allGroupings = bolt2.getAllGroupings();
        Assert.assertEquals(allGroupings.size(), 3);
        assertContains(allGroupings, TopologyConstants.TICK_COMPONENT, "default");
        assertContains(allGroupings, TopologyConstants.QUERY_COMPONENT, "default");
        assertContains(allGroupings, TopologyConstants.QUERY_COMPONENT, "metadata");
        List<Pair<String, String>> shuffleGroupings2 = bolt2.getShuffleGroupings();
        Assert.assertEquals(shuffleGroupings2.size(), 1);
        assertContains(shuffleGroupings2, "DataSource", "default");
        Assert.assertTrue(bolt2.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt2.getFieldsGroupings().isEmpty());
        Assert.assertTrue(bolt2.getCustomGroupings().isEmpty());
        CustomBoltDeclarer bolt3 = getBolt(TopologyConstants.JOIN_COMPONENT);
        Assert.assertNotNull(bolt3);
        Assert.assertEquals(bolt3.getBolt().getClass(), JoinBolt.class);
        Assert.assertEquals(bolt3.getParallelism(), 2);
        Assert.assertEquals(bolt3.getCpuLoad(), Double.valueOf(100.0d));
        Assert.assertEquals(bolt3.getOnHeap(), Double.valueOf(512.0d));
        Assert.assertEquals(bolt3.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> allGroupings2 = bolt3.getAllGroupings();
        Assert.assertEquals(allGroupings2.size(), 1);
        assertContains(allGroupings2, TopologyConstants.TICK_COMPONENT, "default");
        Map<Pair<String, String>, List<Fields>> fieldsGroupings = bolt3.getFieldsGroupings();
        Assert.assertEquals(fieldsGroupings.size(), 4);
        assertContains(fieldsGroupings, TopologyConstants.QUERY_COMPONENT, "default", new Fields(new String[]{"id"}));
        assertContains(fieldsGroupings, TopologyConstants.QUERY_COMPONENT, "metadata", new Fields(new String[]{"id"}));
        assertContains(fieldsGroupings, TopologyConstants.FILTER_COMPONENT, "default", new Fields(new String[]{"id"}));
        assertContains(fieldsGroupings, TopologyConstants.FILTER_COMPONENT, "error", new Fields(new String[]{"id"}));
        Assert.assertTrue(bolt3.getShuffleGroupings().isEmpty());
        Assert.assertTrue(bolt3.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt3.getCustomGroupings().isEmpty());
        CustomBoltDeclarer bolt4 = getBolt(TopologyConstants.RESULT_COMPONENT);
        Assert.assertNotNull(bolt4);
        Assert.assertEquals(bolt4.getBolt().getClass(), ResultBolt.class);
        Assert.assertEquals(bolt4.getParallelism(), 2);
        Assert.assertEquals(bolt4.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(bolt4.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(bolt4.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> shuffleGroupings3 = bolt4.getShuffleGroupings();
        Assert.assertEquals(shuffleGroupings3.size(), 1);
        assertContains(shuffleGroupings3, TopologyConstants.JOIN_COMPONENT, "default");
        Assert.assertTrue(bolt4.getAllGroupings().isEmpty());
        Assert.assertTrue(bolt4.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt4.getFieldsGroupings().isEmpty());
        Assert.assertTrue(bolt4.getCustomGroupings().isEmpty());
        CustomBoltDeclarer bolt5 = getBolt(TopologyConstants.LOOP_COMPONENT);
        Assert.assertNotNull(bolt5);
        Assert.assertEquals(bolt5.getBolt().getClass(), LoopBolt.class);
        Assert.assertEquals(bolt5.getParallelism(), 2);
        Assert.assertEquals(bolt5.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(bolt5.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(bolt5.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> shuffleGroupings4 = bolt5.getShuffleGroupings();
        Assert.assertEquals(shuffleGroupings4.size(), 2);
        assertContains(shuffleGroupings4, TopologyConstants.FILTER_COMPONENT, "feedback");
        assertContains(shuffleGroupings4, TopologyConstants.JOIN_COMPONENT, "feedback");
        Assert.assertTrue(bolt5.getAllGroupings().isEmpty());
        Assert.assertTrue(bolt5.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt5.getFieldsGroupings().isEmpty());
        Assert.assertTrue(bolt5.getCustomGroupings().isEmpty());
        Assert.assertNull(getBolt(TopologyConstants.REPLAY_COMPONENT));
    }

    @Test
    public void testHookingInBulletSpout() {
        this.config.set("bullet.topology.bullet.spout.class.name", CustomIRichSpout.class.getName());
        Assert.assertFalse(this.builder.isTopologyCreated());
        submitWithConfig(this.config);
        Assert.assertTrue(this.builder.isTopologyCreated());
        Assert.assertEquals(this.builder.getCreatedSpouts().size(), 3);
        Assert.assertEquals(this.builder.getCreatedBolts().size(), 4);
        CustomSpoutDeclarer spout = getSpout("DataSource");
        Assert.assertNotNull(spout);
        Assert.assertEquals(spout.getSpout().getClass(), CustomIRichSpout.class);
        Assert.assertEquals(spout.getParallelism(), Integer.valueOf(HASH_COUNT));
        Assert.assertEquals(spout.getCpuLoad(), Double.valueOf(50.0d));
        Assert.assertEquals(spout.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(spout.getOffHeap(), Double.valueOf(160.0d));
        CustomSpoutDeclarer spout2 = getSpout(TopologyConstants.TICK_COMPONENT);
        Assert.assertNotNull(spout2);
        Assert.assertEquals(spout2.getSpout().getClass(), TickSpout.class);
        Assert.assertEquals(spout2.getParallelism(), 1);
        Assert.assertEquals(spout2.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(spout2.getOnHeap(), Double.valueOf(128.0d));
        Assert.assertEquals(spout2.getOffHeap(), Double.valueOf(160.0d));
        CustomSpoutDeclarer spout3 = getSpout(TopologyConstants.QUERY_COMPONENT);
        Assert.assertNotNull(spout3);
        Assert.assertEquals(spout3.getSpout().getClass(), QuerySpout.class);
        Assert.assertEquals(spout3.getParallelism(), 2);
        Assert.assertEquals(spout3.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(spout3.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(spout3.getOffHeap(), Double.valueOf(160.0d));
        CustomBoltDeclarer bolt = getBolt(TopologyConstants.FILTER_COMPONENT);
        Assert.assertNotNull(bolt);
        Assert.assertEquals(bolt.getBolt().getClass(), FilterBolt.class);
        Assert.assertEquals(bolt.getParallelism(), BulletStormConfig.DEFAULT_FILTER_BOLT_PARALLELISM);
        Assert.assertEquals(bolt.getCpuLoad(), Double.valueOf(100.0d));
        Assert.assertEquals(bolt.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(bolt.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> allGroupings = bolt.getAllGroupings();
        Assert.assertEquals(allGroupings.size(), 3);
        assertContains(allGroupings, TopologyConstants.TICK_COMPONENT, "default");
        assertContains(allGroupings, TopologyConstants.QUERY_COMPONENT, "default");
        assertContains(allGroupings, TopologyConstants.QUERY_COMPONENT, "metadata");
        List<Pair<String, String>> shuffleGroupings = bolt.getShuffleGroupings();
        Assert.assertEquals(shuffleGroupings.size(), 1);
        assertContains(shuffleGroupings, "DataSource", "default");
        Assert.assertTrue(bolt.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt.getFieldsGroupings().isEmpty());
        Assert.assertTrue(bolt.getCustomGroupings().isEmpty());
        CustomBoltDeclarer bolt2 = getBolt(TopologyConstants.JOIN_COMPONENT);
        Assert.assertNotNull(bolt2);
        Assert.assertEquals(bolt2.getBolt().getClass(), JoinBolt.class);
        Assert.assertEquals(bolt2.getParallelism(), 2);
        Assert.assertEquals(bolt2.getCpuLoad(), Double.valueOf(100.0d));
        Assert.assertEquals(bolt2.getOnHeap(), Double.valueOf(512.0d));
        Assert.assertEquals(bolt2.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> allGroupings2 = bolt2.getAllGroupings();
        Assert.assertEquals(allGroupings2.size(), 1);
        assertContains(allGroupings2, TopologyConstants.TICK_COMPONENT, "default");
        Map<Pair<String, String>, List<Fields>> fieldsGroupings = bolt2.getFieldsGroupings();
        Assert.assertEquals(fieldsGroupings.size(), 4);
        assertContains(fieldsGroupings, TopologyConstants.QUERY_COMPONENT, "default", new Fields(new String[]{"id"}));
        assertContains(fieldsGroupings, TopologyConstants.QUERY_COMPONENT, "metadata", new Fields(new String[]{"id"}));
        assertContains(fieldsGroupings, TopologyConstants.FILTER_COMPONENT, "default", new Fields(new String[]{"id"}));
        assertContains(fieldsGroupings, TopologyConstants.FILTER_COMPONENT, "error", new Fields(new String[]{"id"}));
        Assert.assertTrue(bolt2.getShuffleGroupings().isEmpty());
        Assert.assertTrue(bolt2.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt2.getCustomGroupings().isEmpty());
        CustomBoltDeclarer bolt3 = getBolt(TopologyConstants.RESULT_COMPONENT);
        Assert.assertNotNull(bolt3);
        Assert.assertEquals(bolt3.getBolt().getClass(), ResultBolt.class);
        Assert.assertEquals(bolt3.getParallelism(), 2);
        Assert.assertEquals(bolt3.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(bolt3.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(bolt3.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> shuffleGroupings2 = bolt3.getShuffleGroupings();
        Assert.assertEquals(shuffleGroupings2.size(), 1);
        assertContains(shuffleGroupings2, TopologyConstants.JOIN_COMPONENT, "default");
        Assert.assertTrue(bolt3.getAllGroupings().isEmpty());
        Assert.assertTrue(bolt3.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt3.getFieldsGroupings().isEmpty());
        Assert.assertTrue(bolt3.getCustomGroupings().isEmpty());
        CustomBoltDeclarer bolt4 = getBolt(TopologyConstants.LOOP_COMPONENT);
        Assert.assertNotNull(bolt4);
        Assert.assertEquals(bolt4.getBolt().getClass(), LoopBolt.class);
        Assert.assertEquals(bolt4.getParallelism(), 2);
        Assert.assertEquals(bolt4.getCpuLoad(), Double.valueOf(20.0d));
        Assert.assertEquals(bolt4.getOnHeap(), Double.valueOf(256.0d));
        Assert.assertEquals(bolt4.getOffHeap(), Double.valueOf(160.0d));
        List<Pair<String, String>> shuffleGroupings3 = bolt4.getShuffleGroupings();
        Assert.assertEquals(shuffleGroupings3.size(), 2);
        assertContains(shuffleGroupings3, TopologyConstants.FILTER_COMPONENT, "feedback");
        assertContains(shuffleGroupings3, TopologyConstants.JOIN_COMPONENT, "feedback");
        Assert.assertTrue(bolt4.getAllGroupings().isEmpty());
        Assert.assertTrue(bolt4.getDirectGroupings().isEmpty());
        Assert.assertTrue(bolt4.getFieldsGroupings().isEmpty());
        Assert.assertTrue(bolt4.getCustomGroupings().isEmpty());
        Assert.assertNull(getBolt(TopologyConstants.REPLAY_COMPONENT));
    }

    @Test
    public void testDisabledLoopBoltOnNoWindowing() {
        this.builder.setSpout("source", new CustomIRichSpout(), Integer.valueOf(HASH_COUNT));
        this.config.set("bullet.query.window.disable", true);
        Assert.assertFalse(this.builder.isTopologyCreated());
        submitWithTopology("source");
        Assert.assertTrue(this.builder.isTopologyCreated());
        Assert.assertEquals(this.builder.getCreatedSpouts().size(), 3);
        Assert.assertEquals(this.builder.getCreatedBolts().size(), 3);
        Assert.assertNotNull(getSpout("source"));
        Assert.assertNotNull(getSpout(TopologyConstants.TICK_COMPONENT));
        Assert.assertNotNull(getSpout(TopologyConstants.QUERY_COMPONENT));
        Assert.assertNotNull(getBolt(TopologyConstants.FILTER_COMPONENT));
        Assert.assertNotNull(getBolt(TopologyConstants.JOIN_COMPONENT));
        Assert.assertNotNull(getBolt(TopologyConstants.RESULT_COMPONENT));
        Assert.assertNull(getBolt(TopologyConstants.LOOP_COMPONENT));
    }

    @Test
    public void testEnabledLoopBoltOnNoWindowingButReplay() {
        this.builder.setSpout("source", new CustomIRichSpout(), Integer.valueOf(HASH_COUNT));
        this.config.set("bullet.query.window.disable", true);
        this.config.set("bullet.topology.replay.enable", true);
        Assert.assertFalse(this.builder.isTopologyCreated());
        submitWithTopology("source");
        Assert.assertTrue(this.builder.isTopologyCreated());
        Assert.assertEquals(this.builder.getCreatedSpouts().size(), 3);
        Assert.assertEquals(this.builder.getCreatedBolts().size(), 5);
        Assert.assertNotNull(getSpout("source"));
        Assert.assertNotNull(getSpout(TopologyConstants.TICK_COMPONENT));
        Assert.assertNotNull(getSpout(TopologyConstants.QUERY_COMPONENT));
        Assert.assertNotNull(getBolt(TopologyConstants.FILTER_COMPONENT));
        Assert.assertNotNull(getBolt(TopologyConstants.JOIN_COMPONENT));
        Assert.assertNotNull(getBolt(TopologyConstants.RESULT_COMPONENT));
        Assert.assertNotNull(getBolt(TopologyConstants.LOOP_COMPONENT));
        Assert.assertNotNull(getBolt(TopologyConstants.REPLAY_COMPONENT));
    }

    @Test
    public void testHookingInCustomMetricsConsumer() {
        this.builder.setSpout("source", new CustomIRichSpout(), Integer.valueOf(HASH_COUNT));
        this.config.set("bullet.topology.metrics.enable", true);
        this.config.set("bullet.topology.metrics.classes", Collections.singletonList(CustomIMetricsConsumer.class.getName()));
        Assert.assertNull(this.config.get(CustomIMetricsConsumer.CUSTOM_METRICS_REGISTERED));
        Assert.assertNull(this.config.get(CustomIMetricsConsumer.CUSTOM_METRICS_V2_ENABLED));
        Assert.assertFalse(this.builder.isTopologyCreated());
        submitWithTopology("source");
        Assert.assertTrue(this.builder.isTopologyCreated());
        Assert.assertTrue(((Boolean) this.config.get(CustomIMetricsConsumer.CUSTOM_METRICS_V2_ENABLED)).booleanValue());
        Assert.assertTrue(((Boolean) this.config.get(CustomIMetricsConsumer.CUSTOM_METRICS_REGISTERED)).booleanValue());
    }

    @Test
    public void testGetHashIndex() {
        Random random = new Random();
        for (int i = 0; i < NUM_TESTS; i++) {
            int hashIndex = StormUtils.getHashIndex(Integer.valueOf(random.nextInt()), HASH_COUNT);
            Assert.assertTrue(0 <= hashIndex && hashIndex < HASH_COUNT);
        }
    }
}
