package com.yahoo.bullet.storm.drpc;

import com.yahoo.bullet.pubsub.PubSubMessage;
import com.yahoo.bullet.storm.drpc.utils.DRPCError;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Response;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/yahoo/bullet/storm/drpc/DRPCQueryResultPubscriberTest.class */
public class DRPCQueryResultPubscriberTest {
    private DRPCQueryResultPubscriber pubscriber;

    private Response getResponse(int i, String str, String str2) {
        Response response = (Response) Mockito.mock(Response.class);
        ((Response) Mockito.doReturn(Integer.valueOf(i)).when(response)).getStatusCode();
        ((Response) Mockito.doReturn(str).when(response)).getStatusText();
        ((Response) Mockito.doReturn(str2).when(response)).getResponseBody();
        return response;
    }

    private Response getNotOkResponse(int i) {
        return getResponse(i, "Error", null);
    }

    private Response getOkResponse(String str) {
        return getResponse(200, "Ok", str);
    }

    private CompletableFuture<Response> getOkFuture(Response response) {
        CompletableFuture completedFuture = CompletableFuture.completedFuture(response);
        CompletableFuture<Response> completableFuture = (CompletableFuture) Mockito.mock(CompletableFuture.class);
        ((CompletableFuture) Mockito.doReturn(completedFuture).when(completableFuture)).exceptionally((Function) Matchers.any());
        ((CompletableFuture) Mockito.doThrow(new Throwable[]{new RuntimeException("Good futures don't throw")}).when(completableFuture)).thenAcceptAsync((Consumer) Matchers.any());
        return completableFuture;
    }

    private AsyncHttpClient mockClientWith(CompletableFuture<Response> completableFuture) {
        ListenableFuture listenableFuture = (ListenableFuture) Mockito.mock(ListenableFuture.class);
        ((ListenableFuture) Mockito.doReturn(completableFuture).when(listenableFuture)).toCompletableFuture();
        BoundRequestBuilder boundRequestBuilder = (BoundRequestBuilder) Mockito.mock(BoundRequestBuilder.class);
        ((BoundRequestBuilder) Mockito.doReturn(listenableFuture).when(boundRequestBuilder)).execute();
        ((BoundRequestBuilder) Mockito.doReturn(boundRequestBuilder).when(boundRequestBuilder)).setBody(Matchers.anyString());
        AsyncHttpClient asyncHttpClient = (AsyncHttpClient) Mockito.mock(AsyncHttpClient.class);
        ((AsyncHttpClient) Mockito.doReturn(boundRequestBuilder).when(asyncHttpClient)).preparePost(Matchers.anyString());
        return asyncHttpClient;
    }

    private PubSubMessage fetch() {
        PubSubMessage receive;
        do {
            try {
                receive = this.pubscriber.receive();
                Thread.sleep(1L);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } while (receive == null);
        return receive;
    }

    private CompletableFuture<PubSubMessage> fetchAsync() {
        return CompletableFuture.supplyAsync(this::fetch);
    }

    @BeforeMethod
    public void setup() {
        DRPCConfig dRPCConfig = new DRPCConfig("test_drpc_config.yaml");
        dRPCConfig.set("bullet.pubsub.storm.drpc.servers", Collections.singletonList("foo.bar.bullet.drpc.com"));
        this.pubscriber = new DRPCQueryResultPubscriber(dRPCConfig);
    }

    @Test(timeOut = 5000)
    public void testReadingOkResponse() throws Exception {
        PubSubMessage pubSubMessage = new PubSubMessage("foo", "response");
        this.pubscriber.setClient(mockClientWith(getOkFuture(getOkResponse(pubSubMessage.asJSON()))));
        this.pubscriber.send(new PubSubMessage("foo", "bar"));
        PubSubMessage pubSubMessage2 = fetchAsync().get();
        Assert.assertNotNull(pubSubMessage2);
        Assert.assertEquals(pubSubMessage2.getId(), pubSubMessage.getId());
        Assert.assertEquals(pubSubMessage2.getContent(), pubSubMessage.getContent());
    }

    @Test(timeOut = 5000)
    public void testReadingNotOkResponse() throws Exception {
        this.pubscriber.setClient(mockClientWith(getOkFuture(getNotOkResponse(500))));
        this.pubscriber.send(new PubSubMessage("foo", "bar"));
        PubSubMessage pubSubMessage = fetchAsync().get();
        Assert.assertNotNull(pubSubMessage);
        Assert.assertEquals(pubSubMessage.getId(), "foo");
        Assert.assertEquals(pubSubMessage.getContentAsString(), DRPCError.CANNOT_REACH_DRPC.asJSONClip());
    }

    @Test(timeOut = 5000)
    public void testReadingNullResponse() throws Exception {
        this.pubscriber.setClient(mockClientWith(getOkFuture(null)));
        this.pubscriber.send(new PubSubMessage("foo", "bar"));
        PubSubMessage pubSubMessage = fetchAsync().get();
        Assert.assertNotNull(pubSubMessage);
        Assert.assertEquals(pubSubMessage.getId(), "foo");
        Assert.assertEquals(pubSubMessage.getContentAsString(), DRPCError.CANNOT_REACH_DRPC.asJSONClip());
    }

    @Test
    public void testClosing() throws Exception {
        AsyncHttpClient asyncHttpClient = (AsyncHttpClient) Mockito.mock(AsyncHttpClient.class);
        this.pubscriber.setClient(asyncHttpClient);
        this.pubscriber.close();
        ((AsyncHttpClient) Mockito.verify(asyncHttpClient, Mockito.times(1))).close();
    }

    @Test
    public void testClosingWithException() throws Exception {
        AsyncHttpClient asyncHttpClient = (AsyncHttpClient) Mockito.mock(AsyncHttpClient.class);
        ((AsyncHttpClient) Mockito.doThrow(new Throwable[]{new IOException()}).when(asyncHttpClient)).close();
        this.pubscriber.setClient(asyncHttpClient);
        this.pubscriber.close();
        ((AsyncHttpClient) Mockito.verify(asyncHttpClient, Mockito.times(1))).close();
    }

    @Test
    public void testCommiting() {
        AsyncHttpClient asyncHttpClient = (AsyncHttpClient) Mockito.mock(AsyncHttpClient.class);
        this.pubscriber.commit("foo");
        Mockito.verifyZeroInteractions(new Object[]{asyncHttpClient});
    }

    @Test
    public void testFailing() {
        AsyncHttpClient asyncHttpClient = (AsyncHttpClient) Mockito.mock(AsyncHttpClient.class);
        this.pubscriber.fail("foo");
        Mockito.verifyZeroInteractions(new Object[]{asyncHttpClient});
    }

    @Test(timeOut = 5000)
    public void testException() throws Exception {
        this.pubscriber.send(new PubSubMessage("foo", "bar"));
        PubSubMessage pubSubMessage = fetchAsync().get();
        Assert.assertNotNull(pubSubMessage);
        Assert.assertEquals(pubSubMessage.getId(), "foo");
        Assert.assertEquals(pubSubMessage.getContentAsString(), DRPCError.CANNOT_REACH_DRPC.asJSONClip());
    }
}
