package com.yahoo.bullet.storm;

import com.yahoo.bullet.common.BulletConfig;
import com.yahoo.bullet.common.SerializerDeserializer;
import com.yahoo.bullet.pubsub.Metadata;
import com.yahoo.bullet.pubsub.PubSubMessage;
import com.yahoo.bullet.query.QueryUtils;
import com.yahoo.bullet.storage.MemoryStorageManager;
import com.yahoo.bullet.storage.StorageManager;
import com.yahoo.bullet.storm.ReplayBolt;
import com.yahoo.bullet.storm.TupleClassifier;
import com.yahoo.bullet.storm.grouping.TaskIndexCaptureGrouping;
import com.yahoo.bullet.storm.testing.ComponentUtils;
import com.yahoo.bullet.storm.testing.CustomCollector;
import com.yahoo.bullet.storm.testing.CustomOutputFieldsDeclarer;
import com.yahoo.bullet.storm.testing.CustomTopologyContext;
import com.yahoo.bullet.storm.testing.TupleUtils;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/yahoo/bullet/storm/ReplayBoltTest.class */
public class ReplayBoltTest {
    private static final int NUM_PARTITIONS = 4;
    private ReplayBolt bolt;
    private CustomCollector collector;
    private CustomTopologyContext context;
    private BulletStormConfig config;
    private StorageManager<PubSubMessage> storageManager;

    /* loaded from: input_file:com/yahoo/bullet/storm/ReplayBoltTest$TestStorageManager.class */
    public static class TestStorageManager extends MemoryStorageManager<PubSubMessage> {
        /* JADX WARN: Type inference failed for: r5v11, types: [byte[], java.io.Serializable] */
        /* JADX WARN: Type inference failed for: r5v3, types: [byte[], java.io.Serializable] */
        /* JADX WARN: Type inference failed for: r5v7, types: [byte[], java.io.Serializable] */
        public TestStorageManager(BulletConfig bulletConfig) {
            super(bulletConfig);
            put("0", new PubSubMessage("0", (Serializable) SerializerDeserializer.toBytes(QueryUtils.makeRawQuery(1)), new Metadata()));
            put("1", new PubSubMessage("1", (Serializable) SerializerDeserializer.toBytes(QueryUtils.makeRawQuery(1)), new Metadata()));
            put("2", new PubSubMessage("2", (Serializable) SerializerDeserializer.toBytes(QueryUtils.makeRawQuery(1)), new Metadata()));
        }
    }

    /* loaded from: input_file:com/yahoo/bullet/storm/ReplayBoltTest$ThrowingStorageManager.class */
    public static class ThrowingStorageManager extends MemoryStorageManager<PubSubMessage> {
        public ThrowingStorageManager(BulletConfig bulletConfig) {
            super(bulletConfig);
        }

        public CompletableFuture<Map<String, PubSubMessage>> getAll() {
            throw new RuntimeException();
        }
    }

    @BeforeMethod
    public void setup() {
        this.collector = new CustomCollector();
        this.context = new CustomTopologyContext();
        this.config = new BulletStormConfig("test_config.yaml");
        this.config.set("bullet.topology.metrics.built.in.enable", true);
        this.config.set("bullet.storage.class.name", "com.yahoo.bullet.storm.ReplayBoltTest$TestStorageManager");
        this.config.validate();
        this.bolt = ComponentUtils.prepare(new HashMap(), new ReplayBolt(this.config), this.context, this.collector);
        Assert.assertFalse(this.bolt.isReplayBatchCompressEnable());
        this.storageManager = this.bolt.getStorageManager();
    }

    @Test
    public void testPrepare() {
        Assert.assertEquals(this.collector.getAckedCount(), 0);
        this.bolt.getCollector().ack((Tuple) null);
        Assert.assertEquals(this.collector.getAckedCount(), 1);
        Assert.assertNotNull(this.bolt.getClassifier());
        Assert.assertTrue(this.bolt.getMetrics().isEnabled());
        Assert.assertNotNull(this.bolt.getBatchedQueriesCount());
        Assert.assertNotNull(this.bolt.getActiveReplaysCount());
        Assert.assertNotNull(this.bolt.getCreatedReplaysCount());
        Assert.assertEquals(this.context.getRegisteredMetricByName("bullet_batched_queries"), this.bolt.getBatchedQueriesCount());
        Assert.assertEquals(this.context.getRegisteredMetricByName("bullet_active_replays"), this.bolt.getActiveReplaysCount());
        Assert.assertEquals(this.context.getRegisteredMetricByName("bullet_created_replays"), this.bolt.getCreatedReplaysCount());
        Assert.assertEquals(this.bolt.getBatchManager().size(), 3L);
        Assert.assertEquals(this.bolt.getBatchedQueriesCount().getValueAndReset(), 3L);
        Assert.assertEquals(this.context.getLongMetric("bullet_batched_queries").longValue(), 3L);
        Assert.assertTrue(this.bolt.getReplays().isEmpty());
    }

    @Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "Could not create StorageManager\\.")
    public void testPrepareCouldNotCreateStorageManager() {
        this.config = new BulletStormConfig("test_config.yaml");
        this.config.set("bullet.storage.class.name", "");
        this.config.validate();
        this.bolt = ComponentUtils.prepare(new HashMap(), new ReplayBolt(this.config), new CustomTopologyContext(), new CustomCollector());
    }

    @Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "Failed to get queries from storage\\.")
    public void testPrepareCouldNotGetStoredQueries() {
        this.config = new BulletStormConfig("test_config.yaml");
        this.config.set("bullet.storage.class.name", "com.yahoo.bullet.storm.ReplayBoltTest$ThrowingStorageManager");
        this.config.validate();
        this.bolt = ComponentUtils.prepare(new HashMap(), new ReplayBolt(this.config), new CustomTopologyContext(), new CustomCollector());
    }

    @Test
    public void testDeclareOutputFields() {
        CustomOutputFieldsDeclarer customOutputFieldsDeclarer = new CustomOutputFieldsDeclarer();
        this.bolt.declareOutputFields(customOutputFieldsDeclarer);
        Fields fields = new Fields(new String[]{"id", "timestamp", "index", "batch"});
        Fields fields2 = new Fields(new String[0]);
        Assert.assertTrue(customOutputFieldsDeclarer.areFieldsPresent("replay", false, fields));
        Assert.assertTrue(customOutputFieldsDeclarer.areFieldsPresent("capture", false, fields2));
    }

    @Test
    public void testUnknownTuple() {
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.UNKNOWN_TUPLE, "", "");
        this.bolt.execute(makeTuple);
        Assert.assertFalse(this.collector.wasAcked(makeTuple));
    }

    @Test
    public void testCleanup() {
        this.bolt.cleanup();
    }

    @Test
    public void testOnQuery() {
        Tuple makeIDTuple = TupleUtils.makeIDTuple(TupleClassifier.Type.QUERY_TUPLE, "123", null);
        this.bolt.execute(makeIDTuple);
        Assert.assertEquals(this.bolt.getBatchManager().size(), NUM_PARTITIONS);
        Assert.assertEquals(this.context.getLongMetric("bullet_batched_queries").longValue(), 4L);
        this.bolt.execute(makeIDTuple);
        Assert.assertEquals(this.bolt.getBatchManager().size(), NUM_PARTITIONS);
        Assert.assertEquals(this.context.getLongMetric("bullet_batched_queries").longValue(), 4L);
    }

    @Test
    public void testOnMeta() {
        Assert.assertEquals(this.bolt.getBatchManager().size(), 3);
        Assert.assertEquals(this.context.getLongMetric("bullet_batched_queries").longValue(), 3L);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.METADATA_TUPLE, "0", new Metadata(Metadata.Signal.COMPLETE, (Serializable) null)));
        Assert.assertEquals(this.bolt.getBatchManager().size(), 2);
        Assert.assertEquals(this.context.getLongMetric("bullet_batched_queries").longValue(), 2L);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.METADATA_TUPLE, "1", new Metadata(Metadata.Signal.COMPLETE, (Serializable) null)));
        Assert.assertEquals(this.bolt.getBatchManager().size(), 1);
        Assert.assertEquals(this.context.getLongMetric("bullet_batched_queries").longValue(), 1L);
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.METADATA_TUPLE, "2", new Metadata(Metadata.Signal.COMPLETE, (Serializable) null)));
        Assert.assertEquals(this.bolt.getBatchManager().size(), 0);
        Assert.assertEquals(this.context.getLongMetric("bullet_batched_queries").longValue(), 0L);
    }

    @Test
    public void testOnMetaIgnoreTuple() {
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.METADATA_TUPLE, "", null));
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.METADATA_TUPLE, "", new Metadata(Metadata.Signal.CUSTOM, (Serializable) null)));
    }

    @Test
    public void testHandleReplaySignal() {
        this.bolt.getReplays().put("FilterBolt-18", new ReplayBolt.Replay(18, 0L, (List) null));
        Assert.assertEquals(this.bolt.getReplays().size(), 1);
        Assert.assertEquals(this.bolt.getBatchManager().size(), 3);
        Assert.assertEquals(this.context.getLongMetric("bullet_batched_queries").longValue(), 3L);
        this.storageManager.remove("0");
        this.bolt.execute(TupleUtils.makeIDTuple(TupleClassifier.Type.METADATA_TUPLE, "123", new Metadata(Metadata.Signal.REPLAY, (Serializable) null)));
        Assert.assertEquals(this.bolt.getReplays().size(), 0);
        Assert.assertEquals(this.bolt.getBatchManager().size(), 2);
        Assert.assertEquals(this.context.getLongMetric("bullet_batched_queries").longValue(), 2L);
    }

    @Test
    public void testReplayFilterBolt() {
        long currentTimeMillis = System.currentTimeMillis();
        Tuple makeIDTuple = TupleUtils.makeIDTuple(TupleClassifier.Type.REPLAY_TUPLE, "FilterBolt-18", Long.valueOf(currentTimeMillis), false);
        Tuple makeIDTuple2 = TupleUtils.makeIDTuple(TupleClassifier.Type.REPLAY_TUPLE, "FilterBolt-18", Long.valueOf(currentTimeMillis), true);
        ((Tuple) Mockito.doReturn(Integer.valueOf(NUM_PARTITIONS)).when(makeIDTuple)).getSourceTask();
        ((Tuple) Mockito.doReturn(Integer.valueOf(NUM_PARTITIONS)).when(makeIDTuple2)).getSourceTask();
        ((Tuple) Mockito.doReturn(Long.valueOf(currentTimeMillis)).when(makeIDTuple)).getLong(1);
        ((Tuple) Mockito.doReturn(Long.valueOf(currentTimeMillis)).when(makeIDTuple2)).getLong(1);
        ((Tuple) Mockito.doReturn(false).when(makeIDTuple)).getBoolean(2);
        ((Tuple) Mockito.doReturn(true).when(makeIDTuple2)).getBoolean(2);
        this.bolt.execute(makeIDTuple);
        Assert.assertEquals(this.bolt.getReplays().size(), 1);
        Assert.assertEquals(this.context.getDimensionLongMetric("bullet_active_replays", "FilterBolt-18").longValue(), 1L);
        Assert.assertEquals(this.context.getDimensionLongMetric("bullet_created_replays", "FilterBolt-18").longValue(), 1L);
        ReplayBolt.Replay replay = (ReplayBolt.Replay) this.bolt.getReplays().get("FilterBolt-18");
        Assert.assertEquals(replay.getBatches().size(), NUM_PARTITIONS);
        Assert.assertTrue(replay.getBatches().get(0) instanceof Map);
        Assert.assertEquals(replay.getAnchor(), NUM_PARTITIONS);
        Assert.assertEquals(replay.getIndex(), 0);
        Assert.assertEquals(replay.getTaskID(), 18);
        Assert.assertEquals(replay.getTimestamp(), currentTimeMillis);
        for (int i = 1; i <= NUM_PARTITIONS; i++) {
            this.bolt.execute(makeIDTuple2);
            Assert.assertEquals(replay.getAnchor(), NUM_PARTITIONS);
            Assert.assertEquals(replay.getIndex(), i);
        }
        this.bolt.execute(makeIDTuple);
        Assert.assertEquals(replay.getAnchor(), NUM_PARTITIONS);
        Assert.assertEquals(replay.getIndex(), NUM_PARTITIONS);
        this.bolt.execute(makeIDTuple2);
        Assert.assertNull(replay.getBatches());
        Assert.assertEquals(replay.getAnchor(), -1);
        Assert.assertEquals(this.collector.getEmittedCount(), 6);
        Assert.assertEquals(this.collector.getAckedCount(), 6);
        Assert.assertEquals(this.collector.getFailedCount(), 1);
        Assert.assertEquals(this.context.getDimensionLongMetric("bullet_active_replays", "FilterBolt-18").longValue(), 0L);
        Assert.assertEquals(this.context.getDimensionLongMetric("bullet_created_replays", "FilterBolt-18").longValue(), 1L);
        this.bolt.execute(makeIDTuple2);
        Assert.assertEquals(this.collector.getFailedCount(), 2);
    }

    @Test
    public void testReplayJoinBolt() {
        long currentTimeMillis = System.currentTimeMillis();
        Tuple makeIDTuple = TupleUtils.makeIDTuple(TupleClassifier.Type.REPLAY_TUPLE, "JoinBolt-21", Long.valueOf(currentTimeMillis), false);
        Tuple makeIDTuple2 = TupleUtils.makeIDTuple(TupleClassifier.Type.REPLAY_TUPLE, "JoinBolt-21", Long.valueOf(currentTimeMillis + 1), true);
        ((Tuple) Mockito.doReturn(Integer.valueOf(NUM_PARTITIONS)).when(makeIDTuple)).getSourceTask();
        ((Tuple) Mockito.doReturn(5).when(makeIDTuple2)).getSourceTask();
        ((Tuple) Mockito.doReturn(Long.valueOf(currentTimeMillis)).when(makeIDTuple)).getLong(1);
        ((Tuple) Mockito.doReturn(Long.valueOf(currentTimeMillis + 1)).when(makeIDTuple2)).getLong(1);
        ((Tuple) Mockito.doReturn(false).when(makeIDTuple)).getBoolean(2);
        ((Tuple) Mockito.doReturn(true).when(makeIDTuple2)).getBoolean(2);
        TaskIndexCaptureGrouping.TASK_INDEX_MAP.put(21, 0);
        this.bolt.execute(makeIDTuple);
        Assert.assertEquals(this.bolt.getReplays().size(), 1);
        Assert.assertEquals(this.context.getDimensionLongMetric("bullet_active_replays", "JoinBolt-21").longValue(), 1L);
        Assert.assertEquals(this.context.getDimensionLongMetric("bullet_created_replays", "JoinBolt-21").longValue(), 1L);
        ReplayBolt.Replay replay = (ReplayBolt.Replay) this.bolt.getReplays().get("JoinBolt-21");
        Assert.assertEquals(replay.getBatches().size(), 1);
        Assert.assertTrue(replay.getBatches().get(0) instanceof Map);
        Assert.assertEquals(replay.getAnchor(), NUM_PARTITIONS);
        Assert.assertEquals(replay.getIndex(), 0);
        Assert.assertEquals(replay.getTaskID(), 21);
        Assert.assertEquals(replay.getTimestamp(), currentTimeMillis);
        this.bolt.execute(makeIDTuple2);
        Assert.assertNotEquals(this.bolt.getReplays().get("JoinBolt-21"), replay);
        Assert.assertEquals(this.bolt.getReplays().size(), 1);
        Assert.assertEquals(this.context.getDimensionLongMetric("bullet_active_replays", "JoinBolt-21").longValue(), 1L);
        Assert.assertEquals(this.context.getDimensionLongMetric("bullet_created_replays", "JoinBolt-21").longValue(), 2L);
        ReplayBolt.Replay replay2 = (ReplayBolt.Replay) this.bolt.getReplays().get("JoinBolt-21");
        Assert.assertEquals(replay2.getBatches().size(), 1);
        Assert.assertEquals(replay2.getAnchor(), 5);
        Assert.assertEquals(replay2.getIndex(), 0);
        Assert.assertEquals(replay2.getTaskID(), 21);
        Assert.assertEquals(replay2.getTimestamp(), currentTimeMillis + 1);
        this.bolt.execute(makeIDTuple2);
        Assert.assertEquals(replay2.getAnchor(), 5);
        Assert.assertEquals(replay2.getIndex(), 1);
        this.bolt.execute(makeIDTuple2);
        Assert.assertNull(replay2.getBatches());
        Assert.assertEquals(replay2.getAnchor(), -1);
        Assert.assertEquals(this.collector.getEmittedCount(), 3);
        Assert.assertEquals(this.collector.getAckedCount(), 3);
        Assert.assertEquals(this.collector.getFailedCount(), 1);
        Assert.assertEquals(this.context.getDimensionLongMetric("bullet_active_replays", "JoinBolt-21").longValue(), 0L);
        Assert.assertEquals(this.context.getDimensionLongMetric("bullet_created_replays", "JoinBolt-21").longValue(), 2L);
        this.bolt.execute(makeIDTuple);
        this.bolt.execute(makeIDTuple2);
        Assert.assertEquals(this.collector.getFailedCount(), 3);
        TaskIndexCaptureGrouping.TASK_INDEX_MAP.clear();
    }

    @Test
    public void testReplayFilterBoltWithCompression() {
        this.config = new BulletStormConfig("test_config.yaml");
        this.config.set("bullet.topology.replay.batch.compress.enable", true);
        this.config.validate();
        this.bolt = ComponentUtils.prepare(new HashMap(), new ReplayBolt(this.config), this.context, this.collector);
        Assert.assertTrue(this.bolt.isReplayBatchCompressEnable());
        long currentTimeMillis = System.currentTimeMillis();
        Tuple makeIDTuple = TupleUtils.makeIDTuple(TupleClassifier.Type.REPLAY_TUPLE, "FilterBolt-18", Long.valueOf(currentTimeMillis), false);
        ((Tuple) Mockito.doReturn(Integer.valueOf(NUM_PARTITIONS)).when(makeIDTuple)).getSourceTask();
        ((Tuple) Mockito.doReturn(Long.valueOf(currentTimeMillis)).when(makeIDTuple)).getLong(1);
        ((Tuple) Mockito.doReturn(false).when(makeIDTuple)).getBoolean(2);
        this.bolt.execute(makeIDTuple);
        ReplayBolt.Replay replay = (ReplayBolt.Replay) this.bolt.getReplays().get("FilterBolt-18");
        Assert.assertEquals(replay.getBatches().size(), NUM_PARTITIONS);
        Assert.assertTrue(replay.getBatches().get(0) instanceof byte[]);
    }

    @Test
    public void testReplayJoinBoltWithCompression() {
        this.config = new BulletStormConfig("test_config.yaml");
        this.config.set("bullet.topology.replay.batch.compress.enable", true);
        this.config.validate();
        this.bolt = ComponentUtils.prepare(new HashMap(), new ReplayBolt(this.config), this.context, this.collector);
        Assert.assertTrue(this.bolt.isReplayBatchCompressEnable());
        long currentTimeMillis = System.currentTimeMillis();
        Tuple makeIDTuple = TupleUtils.makeIDTuple(TupleClassifier.Type.REPLAY_TUPLE, "JoinBolt-21", Long.valueOf(currentTimeMillis), false);
        ((Tuple) Mockito.doReturn(Integer.valueOf(NUM_PARTITIONS)).when(makeIDTuple)).getSourceTask();
        ((Tuple) Mockito.doReturn(Long.valueOf(currentTimeMillis)).when(makeIDTuple)).getLong(1);
        ((Tuple) Mockito.doReturn(false).when(makeIDTuple)).getBoolean(2);
        TaskIndexCaptureGrouping.TASK_INDEX_MAP.put(21, 0);
        this.bolt.execute(makeIDTuple);
        ReplayBolt.Replay replay = (ReplayBolt.Replay) this.bolt.getReplays().get("JoinBolt-21");
        Assert.assertEquals(replay.getBatches().size(), 1);
        Assert.assertTrue(replay.getBatches().get(0) instanceof byte[]);
    }

    @Test
    public void testDoubleReplay() {
        long currentTimeMillis = System.currentTimeMillis();
        Tuple makeIDTuple = TupleUtils.makeIDTuple(TupleClassifier.Type.REPLAY_TUPLE, "FilterBolt-18", Long.valueOf(currentTimeMillis), false);
        Tuple makeIDTuple2 = TupleUtils.makeIDTuple(TupleClassifier.Type.REPLAY_TUPLE, "FilterBolt-18", Long.valueOf(currentTimeMillis), true);
        ((Tuple) Mockito.doReturn(Integer.valueOf(NUM_PARTITIONS)).when(makeIDTuple)).getSourceTask();
        ((Tuple) Mockito.doReturn(5).when(makeIDTuple2)).getSourceTask();
        ((Tuple) Mockito.doReturn(Long.valueOf(currentTimeMillis)).when(makeIDTuple)).getLong(1);
        ((Tuple) Mockito.doReturn(Long.valueOf(currentTimeMillis)).when(makeIDTuple2)).getLong(1);
        ((Tuple) Mockito.doReturn(false).when(makeIDTuple)).getBoolean(2);
        ((Tuple) Mockito.doReturn(true).when(makeIDTuple2)).getBoolean(2);
        this.bolt.execute(makeIDTuple);
        Assert.assertEquals(this.bolt.getReplays().size(), 1);
        ReplayBolt.Replay replay = (ReplayBolt.Replay) this.bolt.getReplays().get("FilterBolt-18");
        Assert.assertEquals(replay.getAnchor(), NUM_PARTITIONS);
        Assert.assertEquals(replay.getIndex(), 0);
        this.bolt.execute(makeIDTuple2);
        Assert.assertEquals(this.bolt.getReplays().size(), 1);
        Assert.assertEquals(replay.getAnchor(), NUM_PARTITIONS);
        Assert.assertEquals(replay.getIndex(), 0);
        ((Tuple) Mockito.doReturn(false).when(makeIDTuple2)).getBoolean(2);
        this.bolt.execute(makeIDTuple2);
        Assert.assertEquals(this.bolt.getReplays().size(), 1);
        Assert.assertEquals(replay.getAnchor(), 5);
        Assert.assertEquals(replay.getIndex(), 0);
    }
}
