package com.facebook.presto.pinot;

import com.facebook.presto.pinot.query.PinotProxyGrpcRequestBuilder;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.testing.assertions.Assert;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
import org.apache.pinot.connector.presto.grpc.PinotStreamingQueryClient;
import org.apache.pinot.connector.presto.grpc.Utils;
import org.testng.annotations.Test;

/* loaded from: input_file:com/facebook/presto/pinot/TestPinotSegmentStreamingPageSource.class */
public class TestPinotSegmentStreamingPageSource extends TestPinotSegmentPageSource {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/facebook/presto/pinot/TestPinotSegmentStreamingPageSource$MockPinotStreamingQueryClient.class */
    public static final class MockPinotStreamingQueryClient extends PinotStreamingQueryClient {
        private final ImmutableList<DataTable> dataTables;

        MockPinotStreamingQueryClient(GrpcQueryClient.Config config, List<DataTable> list) {
            super(config);
            this.dataTables = ImmutableList.copyOf(list);
        }

        public Iterator<Server.ServerResponse> submit(String str, int i, GrpcRequestBuilder grpcRequestBuilder) {
            return new Iterator<Server.ServerResponse>() { // from class: com.facebook.presto.pinot.TestPinotSegmentStreamingPageSource.MockPinotStreamingQueryClient.1
                int index;

                @Override // java.util.Iterator
                public boolean hasNext() {
                    return this.index <= MockPinotStreamingQueryClient.this.dataTables.size();
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.Iterator
                public Server.ServerResponse next() {
                    if (this.index >= MockPinotStreamingQueryClient.this.dataTables.size()) {
                        return Server.ServerResponse.newBuilder().putMetadata("responseType", "metadata").build();
                    }
                    ImmutableList immutableList = MockPinotStreamingQueryClient.this.dataTables;
                    int i2 = this.index;
                    this.index = i2 + 1;
                    try {
                        return Server.ServerResponse.newBuilder().setPayload(Utils.toByteString(((DataTable) immutableList.get(i2)).toBytes())).putMetadata("responseType", "data").build();
                    } catch (IOException e) {
                        throw new RuntimeException();
                    }
                }
            };
        }
    }

    @Override // com.facebook.presto.pinot.TestPinotSegmentPageSource
    PinotSegmentPageSource getPinotSegmentPageSource(ConnectorSession connectorSession, List<DataTable> list, PinotSplit pinotSplit, List<PinotColumnHandle> list2) {
        return new PinotSegmentStreamingPageSource(connectorSession, this.pinotConfig, new MockPinotStreamingQueryClient(new GrpcQueryClient.Config(this.pinotConfig.getStreamingServerGrpcMaxInboundMessageBytes(), true), list), pinotSplit, list2);
    }

    @Override // com.facebook.presto.pinot.TestPinotSegmentPageSource
    Optional<Integer> getGrpcPort() {
        return Optional.of(8090);
    }

    @Test
    public void testPinotProxyGrpcRequest() {
        Server.ServerRequest build = new PinotProxyGrpcRequestBuilder().setHostName("localhost").setPort(8124).setSegments(ImmutableList.of("segment1")).setEnableStreaming(true).setRequestId(121).setBrokerId("presto-coordinator-grpc").addExtraMetadata(ImmutableMap.of("k1", "v1", "k2", "v2")).setSql("SELECT * FROM myTable").build();
        Assert.assertEquals(build.getSql(), "SELECT * FROM myTable");
        Assert.assertEquals(build.getSegmentsCount(), 1);
        Assert.assertEquals(build.getSegments(0), "segment1");
        Assert.assertEquals(build.getMetadataCount(), 9);
        Assert.assertEquals(build.getMetadataOrThrow("k1"), "v1");
        Assert.assertEquals(build.getMetadataOrThrow("k2"), "v2");
        Assert.assertEquals(build.getMetadataOrThrow("FORWARD_HOST"), "localhost");
        Assert.assertEquals(build.getMetadataOrThrow("FORWARD_PORT"), "8124");
        Assert.assertEquals(build.getMetadataOrThrow("requestId"), "121");
        Assert.assertEquals(build.getMetadataOrThrow("brokerId"), "presto-coordinator-grpc");
        Assert.assertEquals(build.getMetadataOrThrow("enableTrace"), "false");
        Assert.assertEquals(build.getMetadataOrThrow("enableStreaming"), "true");
        Assert.assertEquals(build.getMetadataOrThrow("payloadType"), "sql");
        Server.ServerRequest build2 = new PinotProxyGrpcRequestBuilder().setSegments(ImmutableList.of("segment1")).setEnableStreaming(true).setRequestId(121).setBrokerId("presto-coordinator-grpc").addExtraMetadata(ImmutableMap.of("k1", "v1", "k2", "v2")).setSql("SELECT * FROM myTable").build();
        Assert.assertEquals(build2.getSql(), "SELECT * FROM myTable");
        Assert.assertEquals(build2.getSegmentsCount(), 1);
        Assert.assertEquals(build2.getSegments(0), "segment1");
        Assert.assertEquals(build2.getMetadataCount(), 7);
        Assert.assertEquals(build2.getMetadataOrThrow("k1"), "v1");
        Assert.assertEquals(build2.getMetadataOrThrow("k2"), "v2");
        Assert.assertEquals(build2.getMetadataOrThrow("requestId"), "121");
        Assert.assertEquals(build2.getMetadataOrThrow("brokerId"), "presto-coordinator-grpc");
        Assert.assertEquals(build2.getMetadataOrThrow("enableTrace"), "false");
        Assert.assertEquals(build2.getMetadataOrThrow("enableStreaming"), "true");
        Assert.assertEquals(build2.getMetadataOrThrow("payloadType"), "sql");
    }

    @Test
    public void testPinotGrpcRequest() {
        Server.ServerRequest build = new GrpcRequestBuilder().setSegments(ImmutableList.of("segment1")).setEnableStreaming(true).setRequestId(121).setBrokerId("presto-coordinator-grpc").setSql("SELECT * FROM myTable").build();
        Assert.assertEquals(build.getSql(), "SELECT * FROM myTable");
        Assert.assertEquals(build.getSegmentsCount(), 1);
        Assert.assertEquals(build.getSegments(0), "segment1");
        Assert.assertEquals(build.getMetadataCount(), 5);
        Assert.assertEquals(build.getMetadataOrThrow("requestId"), "121");
        Assert.assertEquals(build.getMetadataOrThrow("brokerId"), "presto-coordinator-grpc");
        Assert.assertEquals(build.getMetadataOrThrow("enableTrace"), "false");
        Assert.assertEquals(build.getMetadataOrThrow("enableStreaming"), "true");
        Assert.assertEquals(build.getMetadataOrThrow("payloadType"), "sql");
    }
}
