package com.facebook.presto.kafka;

import com.facebook.presto.decoder.DecoderColumnHandle;
import com.facebook.presto.decoder.FieldDecoder;
import com.facebook.presto.decoder.FieldValueProvider;
import com.facebook.presto.decoder.RowDecoder;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.RecordCursor;
import com.facebook.presto.spi.RecordSet;
import com.facebook.presto.spi.type.Type;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.api.FetchRequestBuilder;
import kafka.javaapi.FetchResponse;
import kafka.message.MessageAndOffset;

/* loaded from: input_file:com/facebook/presto/kafka/KafkaRecordSet.class */
public class KafkaRecordSet implements RecordSet {
    private static final int KAFKA_READ_BUFFER_SIZE = 100000;
    private final KafkaSplit split;
    private final KafkaSimpleConsumerManager consumerManager;
    private final RowDecoder keyDecoder;
    private final RowDecoder messageDecoder;
    private final Map<DecoderColumnHandle, FieldDecoder<?>> keyFieldDecoders;
    private final Map<DecoderColumnHandle, FieldDecoder<?>> messageFieldDecoders;
    private final List<DecoderColumnHandle> columnHandles;
    private final List<Type> columnTypes;
    private final Set<FieldValueProvider> globalInternalFieldValueProviders;
    private static final Logger log = Logger.get(KafkaRecordSet.class);
    private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];

    /* loaded from: input_file:com/facebook/presto/kafka/KafkaRecordSet$KafkaRecordCursor.class */
    public class KafkaRecordCursor implements RecordCursor {
        private long totalBytes;
        private long totalMessages;
        private long cursorOffset;
        private Iterator<MessageAndOffset> messageAndOffsetIterator;
        private final AtomicBoolean reported = new AtomicBoolean();
        private FieldValueProvider[] fieldValueProviders;

        KafkaRecordCursor() {
            this.cursorOffset = KafkaRecordSet.this.split.getStart();
        }

        public long getTotalBytes() {
            return this.totalBytes;
        }

        public long getCompletedBytes() {
            return this.totalBytes;
        }

        public long getReadTimeNanos() {
            return 0L;
        }

        public Type getType(int i) {
            Preconditions.checkArgument(i < KafkaRecordSet.this.columnHandles.size(), "Invalid field index");
            return ((DecoderColumnHandle) KafkaRecordSet.this.columnHandles.get(i)).getType();
        }

        public boolean advanceNextPosition() {
            while (this.cursorOffset < KafkaRecordSet.this.split.getEnd()) {
                openFetchRequest();
                while (this.messageAndOffsetIterator.hasNext()) {
                    MessageAndOffset next = this.messageAndOffsetIterator.next();
                    long offset = next.offset();
                    if (offset >= KafkaRecordSet.this.split.getEnd()) {
                        return endOfData();
                    }
                    if (offset >= this.cursorOffset) {
                        return nextRow(next);
                    }
                }
                this.messageAndOffsetIterator = null;
            }
            return endOfData();
        }

        private boolean endOfData() {
            if (this.reported.getAndSet(true)) {
                return false;
            }
            KafkaRecordSet.log.debug("Found a total of %d messages with %d bytes (%d messages expected). Last Offset: %d (%d, %d)", new Object[]{Long.valueOf(this.totalMessages), Long.valueOf(this.totalBytes), Long.valueOf(KafkaRecordSet.this.split.getEnd() - KafkaRecordSet.this.split.getStart()), Long.valueOf(this.cursorOffset), Long.valueOf(KafkaRecordSet.this.split.getStart()), Long.valueOf(KafkaRecordSet.this.split.getEnd())});
            return false;
        }

        private boolean nextRow(MessageAndOffset messageAndOffset) {
            this.cursorOffset = messageAndOffset.offset() + 1;
            this.totalBytes += messageAndOffset.message().payloadSize();
            this.totalMessages++;
            byte[] bArr = KafkaRecordSet.EMPTY_BYTE_ARRAY;
            byte[] bArr2 = KafkaRecordSet.EMPTY_BYTE_ARRAY;
            ByteBuffer key = messageAndOffset.message().key();
            if (key != null) {
                bArr = new byte[key.remaining()];
                key.get(bArr);
            }
            ByteBuffer payload = messageAndOffset.message().payload();
            if (payload != null) {
                bArr2 = new byte[payload.remaining()];
                payload.get(bArr2);
            }
            HashSet hashSet = new HashSet();
            hashSet.addAll(KafkaRecordSet.this.globalInternalFieldValueProviders);
            hashSet.add(KafkaInternalFieldDescription.SEGMENT_COUNT_FIELD.forLongValue(this.totalMessages));
            hashSet.add(KafkaInternalFieldDescription.PARTITION_OFFSET_FIELD.forLongValue(messageAndOffset.offset()));
            hashSet.add(KafkaInternalFieldDescription.MESSAGE_FIELD.forByteValue(bArr2));
            hashSet.add(KafkaInternalFieldDescription.MESSAGE_LENGTH_FIELD.forLongValue(bArr2.length));
            hashSet.add(KafkaInternalFieldDescription.KEY_FIELD.forByteValue(bArr));
            hashSet.add(KafkaInternalFieldDescription.KEY_LENGTH_FIELD.forLongValue(bArr.length));
            hashSet.add(KafkaInternalFieldDescription.KEY_CORRUPT_FIELD.forBooleanValue(KafkaRecordSet.this.keyDecoder.decodeRow(bArr, (Map) null, hashSet, KafkaRecordSet.this.columnHandles, KafkaRecordSet.this.keyFieldDecoders)));
            hashSet.add(KafkaInternalFieldDescription.MESSAGE_CORRUPT_FIELD.forBooleanValue(KafkaRecordSet.this.messageDecoder.decodeRow(bArr2, (Map) null, hashSet, KafkaRecordSet.this.columnHandles, KafkaRecordSet.this.messageFieldDecoders)));
            this.fieldValueProviders = new FieldValueProvider[KafkaRecordSet.this.columnHandles.size()];
            for (int i = 0; i < KafkaRecordSet.this.columnHandles.size(); i++) {
                Iterator it = hashSet.iterator();
                while (true) {
                    if (it.hasNext()) {
                        FieldValueProvider fieldValueProvider = (FieldValueProvider) it.next();
                        if (fieldValueProvider.accept((DecoderColumnHandle) KafkaRecordSet.this.columnHandles.get(i))) {
                            this.fieldValueProviders[i] = fieldValueProvider;
                            break;
                        }
                    }
                }
            }
            return true;
        }

        public boolean getBoolean(int i) {
            Preconditions.checkArgument(i < KafkaRecordSet.this.columnHandles.size(), "Invalid field index");
            checkFieldType(i, Boolean.TYPE);
            if (isNull(i)) {
                return false;
            }
            return this.fieldValueProviders[i].getBoolean();
        }

        public long getLong(int i) {
            Preconditions.checkArgument(i < KafkaRecordSet.this.columnHandles.size(), "Invalid field index");
            checkFieldType(i, Long.TYPE);
            if (isNull(i)) {
                return 0L;
            }
            return this.fieldValueProviders[i].getLong();
        }

        public double getDouble(int i) {
            Preconditions.checkArgument(i < KafkaRecordSet.this.columnHandles.size(), "Invalid field index");
            checkFieldType(i, Double.TYPE);
            if (isNull(i)) {
                return 0.0d;
            }
            return this.fieldValueProviders[i].getDouble();
        }

        public Slice getSlice(int i) {
            Preconditions.checkArgument(i < KafkaRecordSet.this.columnHandles.size(), "Invalid field index");
            checkFieldType(i, Slice.class);
            return isNull(i) ? Slices.EMPTY_SLICE : this.fieldValueProviders[i].getSlice();
        }

        public Object getObject(int i) {
            throw new UnsupportedOperationException();
        }

        public boolean isNull(int i) {
            Preconditions.checkArgument(i < KafkaRecordSet.this.columnHandles.size(), "Invalid field index");
            return this.fieldValueProviders[i] == null || this.fieldValueProviders[i].isNull();
        }

        private void checkFieldType(int i, Class<?> cls) {
            Class<?> javaType = getType(i).getJavaType();
            Preconditions.checkArgument(javaType == cls, "Expected field %s to be type %s but is %s", new Object[]{Integer.valueOf(i), cls, javaType});
        }

        public void close() {
        }

        private void openFetchRequest() {
            if (this.messageAndOffsetIterator == null) {
                KafkaRecordSet.log.debug("Fetching %d bytes from offset %d (%d - %d). %d messages read so far", new Object[]{Integer.valueOf(KafkaRecordSet.KAFKA_READ_BUFFER_SIZE), Long.valueOf(this.cursorOffset), Long.valueOf(KafkaRecordSet.this.split.getStart()), Long.valueOf(KafkaRecordSet.this.split.getEnd()), Long.valueOf(this.totalMessages)});
                FetchResponse fetch = KafkaRecordSet.this.consumerManager.getConsumer(KafkaRecordSet.this.split.getNodes().get(0)).fetch(new FetchRequestBuilder().clientId("presto-worker-" + Thread.currentThread().getName()).addFetch(KafkaRecordSet.this.split.getTopicName(), KafkaRecordSet.this.split.getPartitionId(), this.cursorOffset, KafkaRecordSet.KAFKA_READ_BUFFER_SIZE).build());
                if (!fetch.hasError()) {
                    this.messageAndOffsetIterator = fetch.messageSet(KafkaRecordSet.this.split.getTopicName(), KafkaRecordSet.this.split.getPartitionId()).iterator();
                } else {
                    short errorCode = fetch.errorCode(KafkaRecordSet.this.split.getTopicName(), KafkaRecordSet.this.split.getPartitionId());
                    KafkaRecordSet.log.warn("Fetch response has error: %d", new Object[]{Short.valueOf(errorCode)});
                    throw new PrestoException(KafkaErrorCode.KAFKA_SPLIT_ERROR, "could not fetch data from Kafka, error code is '" + ((int) errorCode) + "'");
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaRecordSet(KafkaSplit kafkaSplit, KafkaSimpleConsumerManager kafkaSimpleConsumerManager, List<DecoderColumnHandle> list, RowDecoder rowDecoder, RowDecoder rowDecoder2, Map<DecoderColumnHandle, FieldDecoder<?>> map, Map<DecoderColumnHandle, FieldDecoder<?>> map2) {
        this.split = (KafkaSplit) Objects.requireNonNull(kafkaSplit, "split is null");
        this.globalInternalFieldValueProviders = ImmutableSet.of(KafkaInternalFieldDescription.PARTITION_ID_FIELD.forLongValue(kafkaSplit.getPartitionId()), KafkaInternalFieldDescription.SEGMENT_START_FIELD.forLongValue(kafkaSplit.getStart()), KafkaInternalFieldDescription.SEGMENT_END_FIELD.forLongValue(kafkaSplit.getEnd()));
        this.consumerManager = (KafkaSimpleConsumerManager) Objects.requireNonNull(kafkaSimpleConsumerManager, "consumerManager is null");
        this.keyDecoder = (RowDecoder) Objects.requireNonNull(rowDecoder, "rowDecoder is null");
        this.messageDecoder = (RowDecoder) Objects.requireNonNull(rowDecoder2, "rowDecoder is null");
        this.keyFieldDecoders = (Map) Objects.requireNonNull(map, "keyFieldDecoders is null");
        this.messageFieldDecoders = (Map) Objects.requireNonNull(map2, "messageFieldDecoders is null");
        this.columnHandles = (List) Objects.requireNonNull(list, "columnHandles is null");
        ImmutableList.Builder builder = ImmutableList.builder();
        Iterator<DecoderColumnHandle> it = list.iterator();
        while (it.hasNext()) {
            builder.add(it.next().getType());
        }
        this.columnTypes = builder.build();
    }

    public List<Type> getColumnTypes() {
        return this.columnTypes;
    }

    public RecordCursor cursor() {
        return new KafkaRecordCursor();
    }
}
