package com.facebook.presto.kafka;

import com.facebook.presto.common.Page;
import com.facebook.presto.kafka.encoder.RowEncoder;
import com.facebook.presto.spi.ConnectorPageSink;
import com.facebook.presto.spi.PrestoException;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/* loaded from: input_file:com/facebook/presto/kafka/KafkaPageSink.class */
public class KafkaPageSink implements ConnectorPageSink {
    private final String topicName;
    private final List<KafkaColumnHandle> columns;
    private final RowEncoder keyEncoder;
    private final RowEncoder messageEncoder;
    private final KafkaProducer<byte[], byte[]> producer;
    private final ErrorCountingCallback errorCounter;

    /* loaded from: input_file:com/facebook/presto/kafka/KafkaPageSink$ErrorCountingCallback.class */
    private static class ErrorCountingCallback implements Callback {
        private final AtomicLong errorCounter = new AtomicLong(0);

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc != null) {
                this.errorCounter.incrementAndGet();
            }
        }

        public long getErrorCount() {
            return this.errorCounter.get();
        }
    }

    public KafkaPageSink(String str, List<KafkaColumnHandle> list, RowEncoder rowEncoder, RowEncoder rowEncoder2, PlainTextKafkaProducerFactory plainTextKafkaProducerFactory) {
        this.topicName = (String) Objects.requireNonNull(str, "topicName is null");
        this.columns = (List) Objects.requireNonNull(ImmutableList.copyOf(list), "columns is null");
        this.keyEncoder = (RowEncoder) Objects.requireNonNull(rowEncoder, "keyEncoder is null");
        this.messageEncoder = (RowEncoder) Objects.requireNonNull(rowEncoder2, "messageEncoder is null");
        Objects.requireNonNull(plainTextKafkaProducerFactory, "producerFactory is null");
        this.producer = plainTextKafkaProducerFactory.create();
        this.errorCounter = new ErrorCountingCallback();
    }

    public CompletableFuture<?> appendPage(Page page) {
        for (int i = 0; i < page.getPositionCount(); i++) {
            for (int i2 = 0; i2 < page.getChannelCount(); i2++) {
                if (this.columns.get(i2).isKeyCodec()) {
                    this.keyEncoder.appendColumnValue(page.getBlock(i2), i);
                } else {
                    this.messageEncoder.appendColumnValue(page.getBlock(i2), i);
                }
            }
            this.producer.send(new ProducerRecord(this.topicName, this.keyEncoder.toByteArray(), this.messageEncoder.toByteArray()), this.errorCounter);
        }
        return NOT_BLOCKED;
    }

    public CompletableFuture<Collection<Slice>> finish() {
        this.producer.flush();
        this.producer.close();
        try {
            this.keyEncoder.close();
            this.messageEncoder.close();
            if (this.errorCounter.getErrorCount() > 0) {
                throw new PrestoException(KafkaErrorCode.KAFKA_PRODUCER_ERROR, String.format("%d producer record('s) failed to send", Long.valueOf(this.errorCounter.getErrorCount())));
            }
            return CompletableFuture.completedFuture(ImmutableList.of());
        } catch (IOException e) {
            throw new UncheckedIOException("Failed to close row encoders", e);
        }
    }

    public void abort() {
        this.producer.close();
    }
}
