package io.strimzi.kafka.bridge.http;

import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.strimzi.kafka.bridge.BridgeContentType;
import io.strimzi.kafka.bridge.EmbeddedFormat;
import io.strimzi.kafka.bridge.Handler;
import io.strimzi.kafka.bridge.KafkaBridgeProducer;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.converter.MessageConverter;
import io.strimzi.kafka.bridge.http.converter.HttpBinaryMessageConverter;
import io.strimzi.kafka.bridge.http.converter.HttpJsonMessageConverter;
import io.strimzi.kafka.bridge.http.converter.JsonUtils;
import io.strimzi.kafka.bridge.http.model.HttpBridgeError;
import io.strimzi.kafka.bridge.http.model.HttpBridgeResult;
import io.strimzi.kafka.bridge.tracing.SpanHandle;
import io.strimzi.kafka.bridge.tracing.TracingUtil;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.RoutingContext;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.Serializer;

/* loaded from: input_file:io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.class */
public class HttpSourceBridgeEndpoint<K, V> extends HttpBridgeEndpoint {
    private MessageConverter<K, V, Buffer, Buffer> messageConverter;
    private boolean closing;
    private final KafkaBridgeProducer<K, V> kafkaBridgeProducer;

    /* JADX INFO: Access modifiers changed from: package-private */
    public HttpSourceBridgeEndpoint(BridgeConfig bridgeConfig, EmbeddedFormat embeddedFormat, Serializer<K> serializer, Serializer<V> serializer2) {
        super(bridgeConfig, embeddedFormat);
        this.kafkaBridgeProducer = new KafkaBridgeProducer<>(bridgeConfig.getKafkaConfig(), serializer, serializer2);
    }

    @Override // io.strimzi.kafka.bridge.http.HttpBridgeEndpoint
    public void open() {
        this.name = this.bridgeConfig.getBridgeID() == null ? "kafka-bridge-producer-" + UUID.randomUUID() : this.bridgeConfig.getBridgeID() + "-" + UUID.randomUUID();
        this.closing = false;
        this.messageConverter = buildMessageConverter();
        this.kafkaBridgeProducer.create();
    }

    @Override // io.strimzi.kafka.bridge.http.HttpBridgeEndpoint
    public void close() {
        this.kafkaBridgeProducer.close();
        super.close();
    }

    public void maybeClose() {
        if (this.closing) {
            close();
        }
    }

    @Override // io.strimzi.kafka.bridge.http.HttpBridgeEndpoint
    public void handle(RoutingContext routingContext, Handler<HttpBridgeEndpoint> handler) {
        String pathParam = routingContext.pathParam("topicname");
        Integer num = null;
        if (routingContext.pathParam("partitionid") != null) {
            try {
                num = Integer.valueOf(Integer.parseInt(routingContext.pathParam("partitionid")));
            } catch (NumberFormatException e) {
                HttpUtils.sendResponse(routingContext, HttpResponseStatus.UNPROCESSABLE_ENTITY.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBuffer(new HttpBridgeError(HttpResponseStatus.UNPROCESSABLE_ENTITY.code(), "Specified partition is not a valid number").toJson()));
                return;
            }
        }
        boolean parseBoolean = Boolean.parseBoolean(routingContext.queryParams().get("async"));
        SpanHandle<K, V> span = TracingUtil.getTracing().span(routingContext, num == null ? HttpOpenApiOperations.SEND.toString() : HttpOpenApiOperations.SEND_TO_PARTITION.toString());
        try {
            if (this.messageConverter == null) {
                span.finish(HttpResponseStatus.INTERNAL_SERVER_ERROR.code());
                HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBuffer(new HttpBridgeError(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), HttpResponseStatus.INTERNAL_SERVER_ERROR.reasonPhrase()).toJson()));
                return;
            }
            List<ProducerRecord<K, V>> kafkaRecords = this.messageConverter.toKafkaRecords(pathParam, num, routingContext.body().buffer());
            Iterator<ProducerRecord<K, V>> it = kafkaRecords.iterator();
            while (it.hasNext()) {
                span.inject(it.next());
            }
            ArrayList arrayList = new ArrayList(kafkaRecords.size());
            CompletableFuture.runAsync(() -> {
                if (!parseBoolean) {
                    ArrayList arrayList2 = new ArrayList(kafkaRecords.size());
                    Iterator it2 = kafkaRecords.iterator();
                    while (it2.hasNext()) {
                        ProducerRecord<K, V> producerRecord = (ProducerRecord) it2.next();
                        arrayList2.add(this.kafkaBridgeProducer.send(producerRecord).handle((recordMetadata, th) -> {
                            this.log.trace("Handle thread {}", Thread.currentThread());
                            if (th == null) {
                                this.log.debug("Delivered record {} to Kafka on topic {} at partition {} [{}]", new Object[]{producerRecord, recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset())});
                                arrayList.add(new HttpBridgeResult(recordMetadata));
                            } else {
                                String message = th.getMessage();
                                int handleError = handleError(th);
                                this.log.error("Failed to deliver record {}", producerRecord, th);
                                arrayList.add(new HttpBridgeResult(new HttpBridgeError(handleError, message)));
                            }
                            return recordMetadata;
                        }).toCompletableFuture());
                    }
                    CompletableFuture.allOf((CompletableFuture[]) arrayList2.toArray(new CompletableFuture[0])).whenCompleteAsync((r10, th2) -> {
                        this.log.trace("All sent thread {}", Thread.currentThread());
                        span.finish(HttpResponseStatus.OK.code());
                        HttpUtils.sendResponse(routingContext, HttpResponseStatus.OK.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBuffer(buildOffsets(arrayList)));
                        maybeClose();
                    });
                    return;
                }
                Iterator it3 = kafkaRecords.iterator();
                while (it3.hasNext()) {
                    this.kafkaBridgeProducer.sendIgnoreResult((ProducerRecord) it3.next());
                }
                span.finish(HttpResponseStatus.NO_CONTENT.code());
                HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), BridgeContentType.KAFKA_JSON, null);
                maybeClose();
            });
        } catch (Exception e2) {
            span.finish(HttpResponseStatus.UNPROCESSABLE_ENTITY.code());
            HttpUtils.sendResponse(routingContext, HttpResponseStatus.UNPROCESSABLE_ENTITY.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBuffer(new HttpBridgeError(HttpResponseStatus.UNPROCESSABLE_ENTITY.code(), e2.getMessage()).toJson()));
        }
    }

    private ObjectNode buildOffsets(List<HttpBridgeResult<?>> list) {
        ObjectNode createObjectNode = JsonUtils.createObjectNode();
        ArrayNode createArrayNode = JsonUtils.createArrayNode();
        for (HttpBridgeResult<?> httpBridgeResult : list) {
            ObjectNode objectNode = null;
            if (httpBridgeResult.getResult() instanceof RecordMetadata) {
                RecordMetadata recordMetadata = (RecordMetadata) httpBridgeResult.getResult();
                objectNode = JsonUtils.createObjectNode().put("partition", recordMetadata.partition()).put("offset", recordMetadata.offset());
            } else if (httpBridgeResult.getResult() instanceof HttpBridgeError) {
                objectNode = ((HttpBridgeError) httpBridgeResult.getResult()).toJson();
            }
            createArrayNode.add(objectNode);
        }
        createObjectNode.put("offsets", createArrayNode);
        return createObjectNode;
    }

    private int handleError(Throwable th) {
        if (!(th instanceof TimeoutException) || th.getMessage() == null || !th.getMessage().contains("not present in metadata")) {
            return HttpResponseStatus.INTERNAL_SERVER_ERROR.code();
        }
        this.closing = true;
        return HttpResponseStatus.NOT_FOUND.code();
    }

    private MessageConverter<K, V, Buffer, Buffer> buildMessageConverter() {
        switch (this.format) {
            case JSON:
                return new HttpJsonMessageConverter();
            case BINARY:
                return new HttpBinaryMessageConverter();
            default:
                return null;
        }
    }
}
