package com.yahoo.bullet.storm.drpc;

import com.yahoo.bullet.common.BulletConfig;
import com.yahoo.bullet.common.RandomPool;
import com.yahoo.bullet.pubsub.Metadata;
import com.yahoo.bullet.pubsub.PubSubMessage;
import com.yahoo.bullet.pubsub.Publisher;
import com.yahoo.bullet.pubsub.Subscriber;
import com.yahoo.bullet.storm.drpc.utils.DRPCError;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/yahoo/bullet/storm/drpc/DRPCQueryResultPubscriber.class */
public class DRPCQueryResultPubscriber implements Publisher, Subscriber {
    private static final Logger log = LoggerFactory.getLogger(DRPCQueryResultPubscriber.class);
    private RandomPool<String> urls;
    private Queue<PubSubMessage> responses;
    private AsyncHttpClient client;
    public static final int NO_TIMEOUT = -1;
    public static final int OK_200 = 200;
    private static final String URL_TEMPLATE = "%1$s://%2$s:%3$s/%4$s/%5$s";

    public DRPCQueryResultPubscriber(BulletConfig bulletConfig) {
        Objects.requireNonNull(bulletConfig);
        Number number = (Number) bulletConfig.getRequiredConfigAs(DRPCConfig.DRPC_HTTP_CONNECT_TIMEOUT, Number.class);
        Number number2 = (Number) bulletConfig.getRequiredConfigAs(DRPCConfig.DRPC_HTTP_CONNECT_RETRY_LIMIT, Number.class);
        List list = (List) bulletConfig.getRequiredConfigAs(DRPCConfig.DRPC_SERVERS, List.class);
        String str = (String) bulletConfig.getRequiredConfigAs(DRPCConfig.DRPC_HTTP_PROTOCOL, String.class);
        String str2 = (String) bulletConfig.getRequiredConfigAs(DRPCConfig.DRPC_HTTP_PORT, String.class);
        String str3 = (String) bulletConfig.getRequiredConfigAs(DRPCConfig.DRPC_HTTP_PATH, String.class);
        String str4 = (String) bulletConfig.getRequiredConfigAs(DRPCConfig.DRPC_FUNCTION, String.class);
        this.urls = new RandomPool<>((List) list.stream().map(str5 -> {
            return String.format(URL_TEMPLATE, str, str5, str2, str3, str4);
        }).collect(Collectors.toList()));
        this.client = new DefaultAsyncHttpClient(new DefaultAsyncHttpClientConfig.Builder().setConnectTimeout(number.intValue()).setMaxRequestRetry(number2.intValue()).setReadTimeout(-1).setRequestTimeout(-1).build());
        this.responses = new ConcurrentLinkedQueue();
    }

    public PubSubMessage send(PubSubMessage pubSubMessage) {
        String str = (String) this.urls.get();
        String id = pubSubMessage.getId();
        String asJSON = pubSubMessage.asJSON();
        log.info("Posting to {} for id {}", str, id);
        log.debug("Posting to {} with body {}", str, asJSON);
        this.client.preparePost(str).setBody(asJSON).execute().toCompletableFuture().exceptionally(this::handleException).thenAcceptAsync((Consumer) createResponseConsumer(id));
        return pubSubMessage;
    }

    public PubSubMessage receive() {
        return this.responses.poll();
    }

    public void commit(String str) {
    }

    public void fail(String str) {
    }

    public void close() {
        try {
            this.client.close();
        } catch (IOException e) {
            log.error("Error while closing AsyncHTTPClient", e);
        }
    }

    private Consumer<Response> createResponseConsumer(String str) {
        return response -> {
            handleResponse(str, response);
        };
    }

    private Response handleException(Throwable th) {
        log.error("Received error while posting query", th);
        return null;
    }

    private void handleResponse(String str, Response response) {
        if (response == null || response.getStatusCode() != 200) {
            log.error("Handling error for id {} with response {}", str, response);
            this.responses.offer(new PubSubMessage(str, DRPCError.CANNOT_REACH_DRPC.asJSONClip(), (Metadata) null));
        } else {
            log.info("Received for id {}: {} {}", new Object[]{Integer.valueOf(response.getStatusCode()), str, response.getStatusText()});
            PubSubMessage fromJSON = PubSubMessage.fromJSON(response.getResponseBody());
            log.debug("Received for id {}:\n{}", fromJSON.getId(), fromJSON.getContent());
            this.responses.offer(fromJSON);
        }
    }

    void setClient(AsyncHttpClient asyncHttpClient) {
        this.client = asyncHttpClient;
    }
}
