package com.expediagroup.streamplatform.streamregistry.state.kafka;

import com.expediagroup.streamplatform.streamregistry.state.Configurator;
import com.expediagroup.streamplatform.streamregistry.state.EventSender;
import com.expediagroup.streamplatform.streamregistry.state.avro.AvroConverter;
import com.expediagroup.streamplatform.streamregistry.state.avro.AvroEvent;
import com.expediagroup.streamplatform.streamregistry.state.avro.AvroKey;
import com.expediagroup.streamplatform.streamregistry.state.avro.AvroValue;
import com.expediagroup.streamplatform.streamregistry.state.internal.EventCorrelator;
import com.expediagroup.streamplatform.streamregistry.state.model.Entity;
import com.expediagroup.streamplatform.streamregistry.state.model.event.Event;
import com.expediagroup.streamplatform.streamregistry.state.model.event.StatusDeletionEvent;
import com.expediagroup.streamplatform.streamregistry.state.model.event.StatusEvent;
import com.expediagroup.streamplatform.streamregistry.state.model.specification.Specification;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.beans.ConstructorProperties;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import lombok.NonNull;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/expediagroup/streamplatform/streamregistry/state/kafka/KafkaEventSender.class */
public class KafkaEventSender implements EventSender {
    private static final Logger log = LoggerFactory.getLogger(KafkaEventSender.class);

    @NonNull
    private final Config config;

    @NonNull
    private final CorrelationStrategy correlationStrategy;

    @NonNull
    private final AvroConverter converter;

    @NonNull
    private final KafkaProducer<AvroKey, AvroValue> producer;

    /* loaded from: input_file:com/expediagroup/streamplatform/streamregistry/state/kafka/KafkaEventSender$Config.class */
    public static final class Config {

        @NonNull
        private final String bootstrapServers;

        @NonNull
        private final String topic;

        @NonNull
        private final String schemaRegistryUrl;
        private final Map<String, Object> properties;
        private final Boolean entityStatusEnabled;

        /* loaded from: input_file:com/expediagroup/streamplatform/streamregistry/state/kafka/KafkaEventSender$Config$ConfigBuilder.class */
        public static class ConfigBuilder {
            private String bootstrapServers;
            private String topic;
            private String schemaRegistryUrl;
            private Map<String, Object> properties;
            private boolean entityStatusEnabled$set;
            private Boolean entityStatusEnabled$value;

            ConfigBuilder() {
            }

            public ConfigBuilder bootstrapServers(@NonNull String str) {
                if (str == null) {
                    throw new NullPointerException("bootstrapServers is marked non-null but is null");
                }
                this.bootstrapServers = str;
                return this;
            }

            public ConfigBuilder topic(@NonNull String str) {
                if (str == null) {
                    throw new NullPointerException("topic is marked non-null but is null");
                }
                this.topic = str;
                return this;
            }

            public ConfigBuilder schemaRegistryUrl(@NonNull String str) {
                if (str == null) {
                    throw new NullPointerException("schemaRegistryUrl is marked non-null but is null");
                }
                this.schemaRegistryUrl = str;
                return this;
            }

            public ConfigBuilder properties(Map<String, Object> map) {
                this.properties = map;
                return this;
            }

            public ConfigBuilder entityStatusEnabled(Boolean bool) {
                this.entityStatusEnabled$value = bool;
                this.entityStatusEnabled$set = true;
                return this;
            }

            public Config build() {
                Boolean bool = this.entityStatusEnabled$value;
                if (!this.entityStatusEnabled$set) {
                    bool = Config.$default$entityStatusEnabled();
                }
                return new Config(this.bootstrapServers, this.topic, this.schemaRegistryUrl, this.properties, bool);
            }

            public String toString() {
                return "KafkaEventSender.Config.ConfigBuilder(bootstrapServers=" + this.bootstrapServers + ", topic=" + this.topic + ", schemaRegistryUrl=" + this.schemaRegistryUrl + ", properties=" + this.properties + ", entityStatusEnabled$value=" + this.entityStatusEnabled$value + ")";
            }
        }

        private static Boolean $default$entityStatusEnabled() {
            return true;
        }

        @ConstructorProperties({"bootstrapServers", "topic", "schemaRegistryUrl", "properties", "entityStatusEnabled"})
        Config(@NonNull String str, @NonNull String str2, @NonNull String str3, Map<String, Object> map, Boolean bool) {
            if (str == null) {
                throw new NullPointerException("bootstrapServers is marked non-null but is null");
            }
            if (str2 == null) {
                throw new NullPointerException("topic is marked non-null but is null");
            }
            if (str3 == null) {
                throw new NullPointerException("schemaRegistryUrl is marked non-null but is null");
            }
            this.bootstrapServers = str;
            this.topic = str2;
            this.schemaRegistryUrl = str3;
            this.properties = map;
            this.entityStatusEnabled = bool;
        }

        public static ConfigBuilder builder() {
            return new ConfigBuilder();
        }

        @NonNull
        public String getBootstrapServers() {
            return this.bootstrapServers;
        }

        @NonNull
        public String getTopic() {
            return this.topic;
        }

        @NonNull
        public String getSchemaRegistryUrl() {
            return this.schemaRegistryUrl;
        }

        public Map<String, Object> getProperties() {
            return this.properties;
        }

        public Boolean getEntityStatusEnabled() {
            return this.entityStatusEnabled;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof Config)) {
                return false;
            }
            Config config = (Config) obj;
            Boolean entityStatusEnabled = getEntityStatusEnabled();
            Boolean entityStatusEnabled2 = config.getEntityStatusEnabled();
            if (entityStatusEnabled == null) {
                if (entityStatusEnabled2 != null) {
                    return false;
                }
            } else if (!entityStatusEnabled.equals(entityStatusEnabled2)) {
                return false;
            }
            String bootstrapServers = getBootstrapServers();
            String bootstrapServers2 = config.getBootstrapServers();
            if (bootstrapServers == null) {
                if (bootstrapServers2 != null) {
                    return false;
                }
            } else if (!bootstrapServers.equals(bootstrapServers2)) {
                return false;
            }
            String topic = getTopic();
            String topic2 = config.getTopic();
            if (topic == null) {
                if (topic2 != null) {
                    return false;
                }
            } else if (!topic.equals(topic2)) {
                return false;
            }
            String schemaRegistryUrl = getSchemaRegistryUrl();
            String schemaRegistryUrl2 = config.getSchemaRegistryUrl();
            if (schemaRegistryUrl == null) {
                if (schemaRegistryUrl2 != null) {
                    return false;
                }
            } else if (!schemaRegistryUrl.equals(schemaRegistryUrl2)) {
                return false;
            }
            Map<String, Object> properties = getProperties();
            Map<String, Object> properties2 = config.getProperties();
            return properties == null ? properties2 == null : properties.equals(properties2);
        }

        public int hashCode() {
            Boolean entityStatusEnabled = getEntityStatusEnabled();
            int hashCode = (1 * 59) + (entityStatusEnabled == null ? 43 : entityStatusEnabled.hashCode());
            String bootstrapServers = getBootstrapServers();
            int hashCode2 = (hashCode * 59) + (bootstrapServers == null ? 43 : bootstrapServers.hashCode());
            String topic = getTopic();
            int hashCode3 = (hashCode2 * 59) + (topic == null ? 43 : topic.hashCode());
            String schemaRegistryUrl = getSchemaRegistryUrl();
            int hashCode4 = (hashCode3 * 59) + (schemaRegistryUrl == null ? 43 : schemaRegistryUrl.hashCode());
            Map<String, Object> properties = getProperties();
            return (hashCode4 * 59) + (properties == null ? 43 : properties.hashCode());
        }

        public String toString() {
            return "KafkaEventSender.Config(bootstrapServers=" + getBootstrapServers() + ", topic=" + getTopic() + ", schemaRegistryUrl=" + getSchemaRegistryUrl() + ", properties=" + getProperties() + ", entityStatusEnabled=" + getEntityStatusEnabled() + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/expediagroup/streamplatform/streamregistry/state/kafka/KafkaEventSender$CorrelationStrategy.class */
    public interface CorrelationStrategy {
        String correlationId(CompletableFuture<Void> completableFuture);

        List<Header> headers(String str);

        Callback callback(String str, CompletableFuture<Void> completableFuture);
    }

    /* loaded from: input_file:com/expediagroup/streamplatform/streamregistry/state/kafka/KafkaEventSender$CorrelationStrategyImpl.class */
    static class CorrelationStrategyImpl implements CorrelationStrategy {
        private final EventCorrelator correlator;

        @Override // com.expediagroup.streamplatform.streamregistry.state.kafka.KafkaEventSender.CorrelationStrategy
        public String correlationId(CompletableFuture<Void> completableFuture) {
            return this.correlator.register(completableFuture);
        }

        @Override // com.expediagroup.streamplatform.streamregistry.state.kafka.KafkaEventSender.CorrelationStrategy
        public List<Header> headers(String str) {
            return Collections.singletonList(new RecordHeader("correlationId", str.getBytes(StandardCharsets.UTF_8)));
        }

        @Override // com.expediagroup.streamplatform.streamregistry.state.kafka.KafkaEventSender.CorrelationStrategy
        public Callback callback(String str, CompletableFuture<Void> completableFuture) {
            return (recordMetadata, exc) -> {
                if (recordMetadata != null) {
                    KafkaEventSender.log.debug("Sent {} - {}", str, recordMetadata);
                } else {
                    KafkaEventSender.log.error("Error sending record {}", str, exc);
                    this.correlator.failed(str, exc);
                }
            };
        }

        @ConstructorProperties({"correlator"})
        public CorrelationStrategyImpl(EventCorrelator eventCorrelator) {
            this.correlator = eventCorrelator;
        }
    }

    /* loaded from: input_file:com/expediagroup/streamplatform/streamregistry/state/kafka/KafkaEventSender$NullCorrelationStrategy.class */
    static class NullCorrelationStrategy implements CorrelationStrategy {
        NullCorrelationStrategy() {
        }

        @Override // com.expediagroup.streamplatform.streamregistry.state.kafka.KafkaEventSender.CorrelationStrategy
        public String correlationId(CompletableFuture<Void> completableFuture) {
            return null;
        }

        @Override // com.expediagroup.streamplatform.streamregistry.state.kafka.KafkaEventSender.CorrelationStrategy
        public List<Header> headers(String str) {
            return Collections.emptyList();
        }

        @Override // com.expediagroup.streamplatform.streamregistry.state.kafka.KafkaEventSender.CorrelationStrategy
        public Callback callback(String str, CompletableFuture<Void> completableFuture) {
            return (recordMetadata, exc) -> {
                if (recordMetadata != null) {
                    KafkaEventSender.log.debug("Sent {} - {}", str, recordMetadata);
                    completableFuture.complete(null);
                } else {
                    KafkaEventSender.log.error("Error sending record {}", str, exc);
                    completableFuture.completeExceptionally(exc);
                }
            };
        }
    }

    public KafkaEventSender(Config config, EventCorrelator eventCorrelator, Configurator<KafkaProducer<AvroKey, AvroValue>> configurator) {
        this(config, eventCorrelator == null ? new NullCorrelationStrategy() : new CorrelationStrategyImpl(eventCorrelator), new AvroConverter(), getKafkaProducer(config, configurator));
    }

    public KafkaEventSender(Config config, EventCorrelator eventCorrelator) {
        this(config, eventCorrelator, kafkaProducer -> {
        });
    }

    public KafkaEventSender(Config config) {
        this(config, null);
    }

    private static KafkaProducer<AvroKey, AvroValue> getKafkaProducer(Config config, Configurator<KafkaProducer<AvroKey, AvroValue>> configurator) {
        KafkaProducer<AvroKey, AvroValue> kafkaProducer = new KafkaProducer<>(producerConfig(config));
        configurator.configure(kafkaProducer);
        return kafkaProducer;
    }

    public <K extends Entity.Key<S>, S extends Specification> CompletableFuture<Void> send(@NonNull Event<K, S> event) {
        if (event == null) {
            throw new NullPointerException("event is marked non-null but is null");
        }
        if (this.config.getEntityStatusEnabled().booleanValue() || !((event instanceof StatusEvent) || (event instanceof StatusDeletionEvent))) {
            AvroEvent avro = this.converter.toAvro(event);
            return send(avro.getKey(), avro.getValue());
        }
        log.warn("Entity Status is disabled and will not send event with key={}", event.getKey());
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> send(AvroKey avroKey, AvroValue avroValue) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        String correlationId = this.correlationStrategy.correlationId(completableFuture);
        ProducerRecord producerRecord = new ProducerRecord(this.config.getTopic(), (Integer) null, (Long) null, avroKey, avroValue, this.correlationStrategy.headers(correlationId));
        log.debug("Sending {} - {}", correlationId, producerRecord);
        this.producer.send(producerRecord, this.correlationStrategy.callback(correlationId, completableFuture));
        return completableFuture;
    }

    public void close() {
        this.producer.close();
    }

    static Map<String, Object> producerConfig(Config config) {
        HashMap hashMap = new HashMap();
        if (config.getProperties() != null) {
            hashMap.putAll(config.getProperties());
        }
        hashMap.put("bootstrap.servers", config.getBootstrapServers());
        hashMap.put("acks", "all");
        hashMap.put("key.serializer", KafkaAvroSerializer.class);
        hashMap.put("value.serializer", KafkaAvroSerializer.class);
        hashMap.put("schema.registry.url", config.getSchemaRegistryUrl());
        return hashMap;
    }

    @ConstructorProperties({"config", "correlationStrategy", "converter", "producer"})
    KafkaEventSender(@NonNull Config config, @NonNull CorrelationStrategy correlationStrategy, @NonNull AvroConverter avroConverter, @NonNull KafkaProducer<AvroKey, AvroValue> kafkaProducer) {
        if (config == null) {
            throw new NullPointerException("config is marked non-null but is null");
        }
        if (correlationStrategy == null) {
            throw new NullPointerException("correlationStrategy is marked non-null but is null");
        }
        if (avroConverter == null) {
            throw new NullPointerException("converter is marked non-null but is null");
        }
        if (kafkaProducer == null) {
            throw new NullPointerException("producer is marked non-null but is null");
        }
        this.config = config;
        this.correlationStrategy = correlationStrategy;
        this.converter = avroConverter;
        this.producer = kafkaProducer;
    }
}
