package cz.o2.proxima.direct.kafka;

import cz.o2.proxima.direct.core.AbstractOnlineAttributeWriter;
import cz.o2.proxima.direct.core.CommitCallback;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.producer.KafkaProducer;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.producer.ProducerConfig;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.serialization.Serializer;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.Partitioner;
import java.lang.invoke.SerializedLambda;
import java.util.Properties;
import javax.annotation.Nullable;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/direct/kafka/KafkaWriter.class */
public class KafkaWriter<K, V> extends AbstractOnlineAttributeWriter {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaWriter.class);
    final KafkaAccessor accessor;
    private final Partitioner partitioner;
    private final String topic;
    private final ElementSerializer<K, V> serializer;

    @Nullable
    private transient KafkaProducer<K, V> producer;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaWriter(KafkaAccessor kafkaAccessor) {
        super(kafkaAccessor.getEntityDescriptor(), kafkaAccessor.getUri());
        this.accessor = kafkaAccessor;
        this.partitioner = kafkaAccessor.getPartitioner();
        this.topic = kafkaAccessor.getTopic();
        this.serializer = kafkaAccessor.getSerializer();
    }

    public void write(StreamElement streamElement, CommitCallback commitCallback) {
        try {
            if (this.producer == null) {
                this.producer = createProducer();
            }
            this.producer.send(this.serializer.write(this.topic, (this.partitioner.getPartitionId(streamElement) & Integer.MAX_VALUE) % this.producer.partitionsFor(this.topic).size(), streamElement), (recordMetadata, exc) -> {
                log.debug("Written {} to topic {} offset {} and partition {}", streamElement, recordMetadata.topic(), Long.valueOf(recordMetadata.offset()), Integer.valueOf(recordMetadata.partition()));
                commitCallback.commit(exc == null, exc);
            });
        } catch (Exception e) {
            log.warn("Failed to write ingest {}", streamElement, e);
            commitCallback.commit(false, e);
        }
    }

    /* renamed from: asFactory, reason: merged with bridge method [inline-methods] */
    public OnlineAttributeWriter.Factory<?> m52asFactory() {
        KafkaAccessor kafkaAccessor = this.accessor;
        return repository -> {
            return new KafkaWriter(kafkaAccessor);
        };
    }

    private KafkaProducer<K, V> createProducer() {
        Properties createProps = this.accessor.createProps();
        createProps.put(ProducerConfig.ACKS_CONFIG, "all");
        createProps.put("bootstrap.servers", getUri().getAuthority());
        createProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        return new KafkaProducer<>(createProps, (Serializer) this.serializer.keySerde().serializer(), (Serializer) this.serializer.valueSerde().serializer());
    }

    public void close() {
        if (this.producer != null) {
            this.producer.close();
            this.producer = null;
        }
    }

    @Generated
    public KafkaAccessor getAccessor() {
        return this.accessor;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1037896859:
                if (implMethodName.equals("lambda$asFactory$aa81adc6$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/direct/core/OnlineAttributeWriter$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/KafkaWriter") && serializedLambda.getImplMethodSignature().equals("(Lcz/o2/proxima/direct/kafka/KafkaAccessor;Lcz/o2/proxima/repository/Repository;)Lcz/o2/proxima/direct/core/OnlineAttributeWriter;")) {
                    KafkaAccessor kafkaAccessor = (KafkaAccessor) serializedLambda.getCapturedArg(0);
                    return repository -> {
                        return new KafkaWriter(kafkaAccessor);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
