package com.yahoo.bullet.storm;

import com.yahoo.bullet.pubsub.Metadata;
import com.yahoo.bullet.pubsub.PubSubMessage;
import com.yahoo.bullet.storm.QuerySpout;
import com.yahoo.bullet.storm.TupleClassifier;
import com.yahoo.bullet.storm.testing.ComponentUtils;
import com.yahoo.bullet.storm.testing.CustomEmitter;
import com.yahoo.bullet.storm.testing.CustomOutputFieldsDeclarer;
import com.yahoo.bullet.storm.testing.CustomSubscriber;
import com.yahoo.bullet.storm.testing.CustomTopologyContext;
import com.yahoo.bullet.storm.testing.TupleUtils;
import java.io.Serializable;
import java.util.HashMap;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/yahoo/bullet/storm/QuerySpoutTest.class */
public class QuerySpoutTest {
    private CustomEmitter emitter;
    private QuerySpout spout;
    private CustomSubscriber subscriber;
    private CustomTopologyContext context;

    @BeforeMethod
    public void setup() {
        this.emitter = new CustomEmitter();
        BulletStormConfig bulletStormConfig = new BulletStormConfig("test_config.yaml");
        bulletStormConfig.set("bullet.topology.metrics.built.in.enable", true);
        bulletStormConfig.validate();
        this.context = new CustomTopologyContext();
        this.spout = ComponentUtils.open(new HashMap(), new QuerySpout(bulletStormConfig), this.context, this.emitter);
        this.spout.activate();
        this.subscriber = (CustomSubscriber) this.spout.getSubscriber();
    }

    @Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = ".*Cannot create PubSub.*")
    public void testFailingToCreatePubSub() {
        BulletStormConfig bulletStormConfig = new BulletStormConfig("test_config.yaml");
        bulletStormConfig.set("bullet.pubsub.class.name", "fake.class");
        QuerySpout querySpout = new QuerySpout(bulletStormConfig);
        ComponentUtils.open(querySpout, this.emitter);
        querySpout.activate();
    }

    @Test
    public void testNextTupleMessagesAreReceivedAndTupleIsEmitted() {
        PubSubMessage pubSubMessage = new PubSubMessage("42", "This is a PubSubMessage", new Metadata());
        PubSubMessage pubSubMessage2 = new PubSubMessage("43", "This is also a PubSubMessage", new Metadata());
        this.subscriber.addMessages(pubSubMessage, pubSubMessage2);
        Assert.assertEquals(this.subscriber.getReceived().size(), 0);
        Assert.assertEquals(this.emitter.getEmitted().size(), 0);
        this.spout.nextTuple();
        Assert.assertEquals(this.subscriber.getReceived().size(), 1);
        Assert.assertEquals(this.subscriber.getReceived().get(0), pubSubMessage);
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.QUERY_TUPLE, pubSubMessage.getId(), pubSubMessage.getContent(), pubSubMessage.getMetadata());
        Assert.assertEquals(this.emitter.getEmitted().size(), 1);
        Assert.assertTrue(this.emitter.wasNthEmitted(makeTuple, 1));
        this.spout.nextTuple();
        Assert.assertEquals(this.subscriber.getReceived().size(), 2);
        Assert.assertEquals(this.subscriber.getReceived().get(0), pubSubMessage);
        Assert.assertEquals(this.subscriber.getReceived().get(1), pubSubMessage2);
        Tuple makeTuple2 = TupleUtils.makeTuple(TupleClassifier.Type.QUERY_TUPLE, pubSubMessage2.getId(), pubSubMessage2.getContent(), pubSubMessage2.getMetadata());
        Assert.assertEquals(this.emitter.getEmitted().size(), 2);
        Assert.assertTrue(this.emitter.wasNthEmitted(makeTuple, 1));
        Assert.assertTrue(this.emitter.wasNthEmitted(makeTuple2, 2));
    }

    @Test
    public void testNextTupleDoesNothingWhenSubscriberReceivesNull() {
        this.subscriber.addMessages(null, null);
        Assert.assertEquals(this.subscriber.getReceived().size(), 0);
        Assert.assertEquals(this.emitter.getEmitted().size(), 0);
        this.spout.nextTuple();
        Assert.assertEquals(this.subscriber.getReceived().size(), 1);
        Assert.assertEquals(this.emitter.getEmitted().size(), 0);
        this.spout.nextTuple();
        Assert.assertEquals(this.subscriber.getReceived().size(), 2);
        Assert.assertEquals(this.emitter.getEmitted().size(), 0);
    }

    @Test
    public void testNextTupleDoesNothingWhenSubscriberThrows() {
        Assert.assertEquals(this.subscriber.getReceived().size(), 0);
        Assert.assertEquals(this.emitter.getEmitted().size(), 0);
        this.spout.nextTuple();
        Assert.assertEquals(this.subscriber.getReceived().size(), 0);
        Assert.assertEquals(this.emitter.getEmitted().size(), 0);
        this.spout.nextTuple();
        Assert.assertEquals(this.subscriber.getReceived().size(), 0);
        Assert.assertEquals(this.emitter.getEmitted().size(), 0);
    }

    @Test
    public void testNextTupleCommitsWhenMetadataIsNull() {
        this.subscriber.addMessages(new PubSubMessage("", "", (Metadata) null));
        Assert.assertEquals(this.subscriber.getReceived().size(), 0);
        Assert.assertEquals(this.subscriber.getCommitted().size(), 0);
        this.spout.nextTuple();
        Assert.assertEquals(this.subscriber.getReceived().size(), 1);
        Assert.assertEquals(this.subscriber.getCommitted().size(), 1);
    }

    @Test
    public void testSignalOnlyMessagesAreSentOnTheMetadataStream() {
        PubSubMessage pubSubMessage = new PubSubMessage("42", Metadata.Signal.KILL);
        PubSubMessage pubSubMessage2 = new PubSubMessage("43", Metadata.Signal.COMPLETE);
        PubSubMessage pubSubMessage3 = new PubSubMessage("44", (byte[]) null, new Metadata());
        this.subscriber.addMessages(pubSubMessage, pubSubMessage2, pubSubMessage3);
        Assert.assertEquals(this.subscriber.getReceived().size(), 0);
        Assert.assertEquals(this.emitter.getEmitted().size(), 0);
        this.spout.nextTuple();
        this.spout.nextTuple();
        this.spout.nextTuple();
        Assert.assertEquals(this.subscriber.getReceived().size(), 3);
        Assert.assertEquals(this.subscriber.getReceived().get(0), pubSubMessage);
        Assert.assertEquals(this.subscriber.getReceived().get(1), pubSubMessage2);
        Assert.assertEquals(this.subscriber.getReceived().get(2), pubSubMessage3);
        Assert.assertEquals(this.emitter.getEmitted().size(), 3);
        Tuple makeTuple = TupleUtils.makeTuple(TupleClassifier.Type.METADATA_TUPLE, pubSubMessage.getId(), pubSubMessage.getMetadata());
        Tuple makeTuple2 = TupleUtils.makeTuple(TupleClassifier.Type.METADATA_TUPLE, pubSubMessage2.getId(), pubSubMessage2.getMetadata());
        Tuple makeTuple3 = TupleUtils.makeTuple(TupleClassifier.Type.METADATA_TUPLE, pubSubMessage3.getId(), pubSubMessage3.getMetadata());
        Assert.assertTrue(this.emitter.wasTupleEmittedTo(makeTuple, "metadata"));
        Assert.assertTrue(this.emitter.wasTupleEmittedTo(makeTuple2, "metadata"));
        Assert.assertTrue(this.emitter.wasTupleEmittedTo(makeTuple3, "metadata"));
        Assert.assertTrue(this.emitter.wasNthEmitted(makeTuple, 1));
        Assert.assertTrue(this.emitter.wasNthEmitted(makeTuple2, 2));
        Assert.assertTrue(this.emitter.wasNthEmitted(makeTuple3, 3));
    }

    @Test
    public void testDeclaredOutputFields() {
        CustomOutputFieldsDeclarer customOutputFieldsDeclarer = new CustomOutputFieldsDeclarer();
        this.spout.declareOutputFields(customOutputFieldsDeclarer);
        Fields fields = new Fields(new String[]{"id", "query", "metadata"});
        Fields fields2 = new Fields(new String[]{"id", "metadata"});
        Assert.assertTrue(customOutputFieldsDeclarer.areFieldsPresent("default", false, fields));
        Assert.assertTrue(customOutputFieldsDeclarer.areFieldsPresent("metadata", false, fields2));
    }

    @Test
    public void testAckCallsSubscriberCommit() {
        this.spout.ack("42");
        this.spout.ack("43");
        this.spout.ack("44");
        Assert.assertEquals(this.subscriber.getCommitted().size(), 3);
        Assert.assertEquals(this.subscriber.getCommitted().get(0), "42");
        Assert.assertEquals(this.subscriber.getCommitted().get(1), "43");
        Assert.assertEquals(this.subscriber.getCommitted().get(2), "44");
    }

    @Test
    public void testFailCallsSubscriberFail() {
        this.spout.fail("42");
        this.spout.fail("43");
        this.spout.fail("44");
        Assert.assertEquals(this.subscriber.getFailed().size(), 3);
        Assert.assertEquals(this.subscriber.getFailed().get(0), "42");
        Assert.assertEquals(this.subscriber.getFailed().get(1), "43");
        Assert.assertEquals(this.subscriber.getFailed().get(2), "44");
    }

    @Test
    public void testCloseDoesNotCallSubscriberClose() {
        this.spout.close();
        Assert.assertFalse(this.subscriber.isClosed());
        Assert.assertFalse(this.subscriber.isThrown());
    }

    @Test
    public void testDeactivateCallsSubscriberClose() {
        this.spout.deactivate();
        Assert.assertTrue(this.subscriber.isClosed());
        Assert.assertFalse(this.subscriber.isThrown());
        this.spout.deactivate();
        Assert.assertTrue(this.subscriber.isClosed());
        Assert.assertTrue(this.subscriber.isThrown());
    }

    @Test
    public void testForcedReplaySignal() {
        this.subscriber.addMessages(new PubSubMessage("123", (byte[]) null, new Metadata(Metadata.Signal.REPLAY, (Serializable) null)));
        this.spout.nextTuple();
        Assert.assertEquals(this.emitter.getEmitted().size(), 1);
        Assert.assertEquals(this.emitter.getEmitted().get(0).getTuple().get(0), "123");
        Assert.assertEquals(this.subscriber.getCommitted().size(), 1);
        Assert.assertEquals(this.subscriber.getCommitted().get(0), "123");
    }

    @Test
    public void testHandleReplayRequest() {
        long currentTimeMillis = System.currentTimeMillis();
        PubSubMessage pubSubMessage = new PubSubMessage("FilterBolt-18", (byte[]) null, new Metadata(Metadata.Signal.REPLAY, Long.valueOf(currentTimeMillis)));
        this.subscriber.addMessages(pubSubMessage, pubSubMessage, pubSubMessage);
        Assert.assertEquals(this.subscriber.getReceived().size(), 0);
        Assert.assertEquals(this.subscriber.getCommitted().size(), 0);
        Assert.assertEquals(this.emitter.getEmitted().size(), 0);
        Assert.assertEquals(this.spout.getReplays().size(), 0);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_replays").longValue(), 0L);
        this.spout.nextTuple();
        Assert.assertEquals(this.subscriber.getReceived().size(), 1);
        Assert.assertEquals(this.subscriber.getCommitted().size(), 1);
        Assert.assertEquals(this.emitter.getEmitted().size(), 1);
        Assert.assertEquals(this.spout.getReplays().size(), 1);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_replays").longValue(), 1L);
        QuerySpout.Replay replay = (QuerySpout.Replay) this.spout.getReplays().get("FilterBolt-18");
        Assert.assertEquals(replay.getId(), "FilterBolt-18");
        Assert.assertEquals(replay.getTimestamp(), currentTimeMillis);
        Assert.assertFalse(replay.isStopped());
        Assert.assertEquals(this.emitter.getEmitted().get(0).getMessageId(), "FilterBolt-18");
        Assert.assertEquals(this.emitter.getEmitted().get(0).getTuple().size(), 3);
        Assert.assertEquals(this.emitter.getEmitted().get(0).getTuple().get(0), "FilterBolt-18");
        Assert.assertEquals(this.emitter.getEmitted().get(0).getTuple().get(1), Long.valueOf(currentTimeMillis));
        Assert.assertEquals(this.emitter.getEmitted().get(0).getTuple().get(2), false);
        this.spout.nextTuple();
        Assert.assertEquals(this.subscriber.getReceived().size(), 2);
        Assert.assertEquals(this.subscriber.getCommitted().size(), 2);
        Assert.assertEquals(this.emitter.getEmitted().size(), 1);
        Assert.assertEquals(this.spout.getReplays().size(), 1);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_replays").longValue(), 1L);
        this.spout.ack("FilterBolt-18");
        Assert.assertFalse(replay.isStopped());
        Assert.assertEquals(this.emitter.getEmitted().size(), 2);
        Assert.assertEquals(this.emitter.getEmitted().get(1).getMessageId(), "FilterBolt-18");
        Assert.assertEquals(this.emitter.getEmitted().get(1).getTuple().get(2), true);
        Assert.assertEquals(this.subscriber.getCommitted().size(), 2);
        this.spout.fail("FilterBolt-18");
        Assert.assertTrue(replay.isStopped());
        Assert.assertEquals(this.context.getLongMetric("bullet_active_replays").longValue(), 0L);
        this.spout.nextTuple();
        Assert.assertFalse(replay.isStopped());
        Assert.assertEquals(this.emitter.getEmitted().size(), 3);
        Assert.assertEquals(this.emitter.getEmitted().get(2).getMessageId(), "FilterBolt-18");
        Assert.assertEquals(this.emitter.getEmitted().get(2).getTuple().get(2), false);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_replays").longValue(), 1L);
    }

    @Test
    public void testHandleReplayRequestDifferentTimestamp() {
        long currentTimeMillis = System.currentTimeMillis();
        this.subscriber.addMessages(new PubSubMessage("FilterBolt-18", (byte[]) null, new Metadata(Metadata.Signal.REPLAY, Long.valueOf(currentTimeMillis))), new PubSubMessage("FilterBolt-18", (byte[]) null, new Metadata(Metadata.Signal.REPLAY, Long.valueOf(currentTimeMillis - 1))), new PubSubMessage("FilterBolt-18", (byte[]) null, new Metadata(Metadata.Signal.REPLAY, Long.valueOf(currentTimeMillis + 1))), new PubSubMessage("FilterBolt-18", (byte[]) null, new Metadata(Metadata.Signal.REPLAY, Long.valueOf(currentTimeMillis + 2))));
        Assert.assertEquals(this.emitter.getEmitted().size(), 0);
        Assert.assertEquals(this.spout.getReplays().size(), 0);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_replays").longValue(), 0L);
        this.spout.nextTuple();
        Assert.assertEquals(this.subscriber.getReceived().size(), 1);
        Assert.assertEquals(this.subscriber.getCommitted().size(), 1);
        Assert.assertEquals(this.emitter.getEmitted().size(), 1);
        Assert.assertEquals(this.spout.getReplays().size(), 1);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_replays").longValue(), 1L);
        QuerySpout.Replay replay = (QuerySpout.Replay) this.spout.getReplays().get("FilterBolt-18");
        Assert.assertEquals(replay.getId(), "FilterBolt-18");
        Assert.assertEquals(replay.getTimestamp(), currentTimeMillis);
        Assert.assertFalse(replay.isStopped());
        this.spout.nextTuple();
        Assert.assertEquals(this.subscriber.getReceived().size(), 2);
        Assert.assertEquals(this.subscriber.getCommitted().size(), 2);
        Assert.assertEquals(this.emitter.getEmitted().size(), 1);
        Assert.assertEquals(this.spout.getReplays().size(), 1);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_replays").longValue(), 1L);
        this.spout.nextTuple();
        Assert.assertEquals(this.subscriber.getReceived().size(), 3);
        Assert.assertEquals(this.subscriber.getCommitted().size(), 3);
        Assert.assertEquals(this.emitter.getEmitted().size(), 1);
        Assert.assertEquals(this.spout.getReplays().size(), 1);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_replays").longValue(), 1L);
        Assert.assertEquals(replay.getTimestamp(), currentTimeMillis + 1);
        Assert.assertFalse(replay.isStopped());
        this.spout.fail("FilterBolt-18");
        Assert.assertTrue(replay.isStopped());
        Assert.assertEquals(this.context.getLongMetric("bullet_active_replays").longValue(), 0L);
        this.spout.nextTuple();
        Assert.assertEquals(this.subscriber.getReceived().size(), 4);
        Assert.assertEquals(this.subscriber.getCommitted().size(), 4);
        Assert.assertEquals(this.emitter.getEmitted().size(), 2);
        Assert.assertEquals(this.spout.getReplays().size(), 1);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_replays").longValue(), 1L);
        Assert.assertEquals(replay.getTimestamp(), currentTimeMillis + 2);
        Assert.assertFalse(replay.isStopped());
    }

    @Test
    public void testDeactivateClearsReplays() {
        this.subscriber.addMessages(new PubSubMessage("FilterBolt-18", (byte[]) null, new Metadata(Metadata.Signal.REPLAY, Long.valueOf(System.currentTimeMillis()))));
        this.spout.nextTuple();
        Assert.assertEquals(this.spout.getReplays().size(), 1);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_replays").longValue(), 1L);
        this.spout.deactivate();
        Assert.assertEquals(this.spout.getReplays().size(), 0);
        Assert.assertEquals(this.context.getLongMetric("bullet_active_replays").longValue(), 0L);
    }
}
