package com.facebook.presto.kafka;

import com.facebook.presto.decoder.DecoderColumnHandle;
import com.facebook.presto.decoder.FieldValueProvider;
import com.facebook.presto.decoder.FieldValueProviders;
import com.facebook.presto.decoder.RowDecoder;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.RecordCursor;
import com.facebook.presto.spi.RecordSet;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.type.Type;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.api.FetchRequestBuilder;
import kafka.javaapi.FetchResponse;
import kafka.message.MessageAndOffset;

/* loaded from: input_file:com/facebook/presto/kafka/KafkaRecordSet.class */
public class KafkaRecordSet implements RecordSet {
    private static final int KAFKA_READ_BUFFER_SIZE = 100000;
    private final KafkaSplit split;
    private final KafkaSimpleConsumerManager consumerManager;
    private final RowDecoder keyDecoder;
    private final RowDecoder messageDecoder;
    private final List<KafkaColumnHandle> columnHandles;
    private final List<Type> columnTypes;
    private static final Logger log = Logger.get(KafkaRecordSet.class);
    private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];

    /* loaded from: input_file:com/facebook/presto/kafka/KafkaRecordSet$KafkaRecordCursor.class */
    public class KafkaRecordCursor implements RecordCursor {
        private long totalBytes;
        private long totalMessages;
        private long cursorOffset;
        private Iterator<MessageAndOffset> messageAndOffsetIterator;
        private final AtomicBoolean reported = new AtomicBoolean();
        private final FieldValueProvider[] currentRowValues;

        KafkaRecordCursor() {
            this.cursorOffset = KafkaRecordSet.this.split.getStart();
            this.currentRowValues = new FieldValueProvider[KafkaRecordSet.this.columnHandles.size()];
        }

        public long getCompletedBytes() {
            return this.totalBytes;
        }

        public long getReadTimeNanos() {
            return 0L;
        }

        public Type getType(int i) {
            Preconditions.checkArgument(i < KafkaRecordSet.this.columnHandles.size(), "Invalid field index");
            return ((KafkaColumnHandle) KafkaRecordSet.this.columnHandles.get(i)).getType();
        }

        public boolean advanceNextPosition() {
            while (this.cursorOffset < KafkaRecordSet.this.split.getEnd()) {
                openFetchRequest();
                while (this.messageAndOffsetIterator.hasNext()) {
                    MessageAndOffset next = this.messageAndOffsetIterator.next();
                    long offset = next.offset();
                    if (offset >= KafkaRecordSet.this.split.getEnd()) {
                        return endOfData();
                    }
                    if (offset >= this.cursorOffset) {
                        return nextRow(next);
                    }
                }
                this.messageAndOffsetIterator = null;
            }
            return endOfData();
        }

        private boolean endOfData() {
            if (this.reported.getAndSet(true)) {
                return false;
            }
            KafkaRecordSet.log.debug("Found a total of %d messages with %d bytes (%d messages expected). Last Offset: %d (%d, %d)", new Object[]{Long.valueOf(this.totalMessages), Long.valueOf(this.totalBytes), Long.valueOf(KafkaRecordSet.this.split.getEnd() - KafkaRecordSet.this.split.getStart()), Long.valueOf(this.cursorOffset), Long.valueOf(KafkaRecordSet.this.split.getStart()), Long.valueOf(KafkaRecordSet.this.split.getEnd())});
            return false;
        }

        private boolean nextRow(MessageAndOffset messageAndOffset) {
            this.cursorOffset = messageAndOffset.offset() + 1;
            this.totalBytes += messageAndOffset.message().payloadSize();
            this.totalMessages++;
            byte[] bArr = KafkaRecordSet.EMPTY_BYTE_ARRAY;
            byte[] bArr2 = KafkaRecordSet.EMPTY_BYTE_ARRAY;
            ByteBuffer key = messageAndOffset.message().key();
            if (key != null) {
                bArr = new byte[key.remaining()];
                key.get(bArr);
            }
            ByteBuffer payload = messageAndOffset.message().payload();
            if (payload != null) {
                bArr2 = new byte[payload.remaining()];
                payload.get(bArr2);
            }
            HashMap hashMap = new HashMap();
            Optional decodeRow = KafkaRecordSet.this.keyDecoder.decodeRow(bArr, (Map) null);
            Optional decodeRow2 = KafkaRecordSet.this.messageDecoder.decodeRow(bArr2, (Map) null);
            for (DecoderColumnHandle decoderColumnHandle : KafkaRecordSet.this.columnHandles) {
                if (decoderColumnHandle.isInternal()) {
                    KafkaInternalFieldDescription forColumnName = KafkaInternalFieldDescription.forColumnName(decoderColumnHandle.getName());
                    switch (forColumnName) {
                        case SEGMENT_COUNT_FIELD:
                            hashMap.put(decoderColumnHandle, FieldValueProviders.longValueProvider(this.totalMessages));
                            break;
                        case PARTITION_OFFSET_FIELD:
                            hashMap.put(decoderColumnHandle, FieldValueProviders.longValueProvider(messageAndOffset.offset()));
                            break;
                        case MESSAGE_FIELD:
                            hashMap.put(decoderColumnHandle, FieldValueProviders.bytesValueProvider(bArr2));
                            break;
                        case MESSAGE_LENGTH_FIELD:
                            hashMap.put(decoderColumnHandle, FieldValueProviders.longValueProvider(bArr2.length));
                            break;
                        case KEY_FIELD:
                            hashMap.put(decoderColumnHandle, FieldValueProviders.bytesValueProvider(bArr));
                            break;
                        case KEY_LENGTH_FIELD:
                            hashMap.put(decoderColumnHandle, FieldValueProviders.longValueProvider(bArr.length));
                            break;
                        case KEY_CORRUPT_FIELD:
                            hashMap.put(decoderColumnHandle, FieldValueProviders.booleanValueProvider(!decodeRow.isPresent()));
                            break;
                        case MESSAGE_CORRUPT_FIELD:
                            hashMap.put(decoderColumnHandle, FieldValueProviders.booleanValueProvider(!decodeRow2.isPresent()));
                            break;
                        case PARTITION_ID_FIELD:
                            hashMap.put(decoderColumnHandle, FieldValueProviders.longValueProvider(KafkaRecordSet.this.split.getPartitionId()));
                            break;
                        case SEGMENT_START_FIELD:
                            hashMap.put(decoderColumnHandle, FieldValueProviders.longValueProvider(KafkaRecordSet.this.split.getStart()));
                            break;
                        case SEGMENT_END_FIELD:
                            hashMap.put(decoderColumnHandle, FieldValueProviders.longValueProvider(KafkaRecordSet.this.split.getEnd()));
                            break;
                        default:
                            throw new IllegalArgumentException("unknown internal field " + forColumnName);
                    }
                }
            }
            hashMap.getClass();
            decodeRow.ifPresent(hashMap::putAll);
            hashMap.getClass();
            decodeRow2.ifPresent(hashMap::putAll);
            for (int i = 0; i < KafkaRecordSet.this.columnHandles.size(); i++) {
                this.currentRowValues[i] = (FieldValueProvider) hashMap.get((ColumnHandle) KafkaRecordSet.this.columnHandles.get(i));
            }
            return true;
        }

        public boolean getBoolean(int i) {
            return getFieldValueProvider(i, Boolean.TYPE).getBoolean();
        }

        public long getLong(int i) {
            return getFieldValueProvider(i, Long.TYPE).getLong();
        }

        public double getDouble(int i) {
            return getFieldValueProvider(i, Double.TYPE).getDouble();
        }

        public Slice getSlice(int i) {
            return getFieldValueProvider(i, Slice.class).getSlice();
        }

        public Object getObject(int i) {
            return getFieldValueProvider(i, Block.class).getBlock();
        }

        public boolean isNull(int i) {
            Preconditions.checkArgument(i < KafkaRecordSet.this.columnHandles.size(), "Invalid field index");
            return this.currentRowValues[i] == null || this.currentRowValues[i].isNull();
        }

        private FieldValueProvider getFieldValueProvider(int i, Class<?> cls) {
            Preconditions.checkArgument(i < KafkaRecordSet.this.columnHandles.size(), "Invalid field index");
            checkFieldType(i, cls);
            return this.currentRowValues[i];
        }

        private void checkFieldType(int i, Class<?> cls) {
            Class<?> javaType = getType(i).getJavaType();
            Preconditions.checkArgument(javaType == cls, "Expected field %s to be type %s but is %s", Integer.valueOf(i), cls, javaType);
        }

        public void close() {
        }

        private void openFetchRequest() {
            try {
                if (this.messageAndOffsetIterator == null) {
                    KafkaRecordSet.log.debug("Fetching %d bytes from offset %d (%d - %d). %d messages read so far", new Object[]{Integer.valueOf(KafkaRecordSet.KAFKA_READ_BUFFER_SIZE), Long.valueOf(this.cursorOffset), Long.valueOf(KafkaRecordSet.this.split.getStart()), Long.valueOf(KafkaRecordSet.this.split.getEnd()), Long.valueOf(this.totalMessages)});
                    FetchResponse fetch = KafkaRecordSet.this.consumerManager.getConsumer(KafkaRecordSet.this.split.getLeader()).fetch(new FetchRequestBuilder().clientId("presto-worker-" + Thread.currentThread().getName()).addFetch(KafkaRecordSet.this.split.getTopicName(), KafkaRecordSet.this.split.getPartitionId(), this.cursorOffset, KafkaRecordSet.KAFKA_READ_BUFFER_SIZE).build());
                    if (fetch.hasError()) {
                        short errorCode = fetch.errorCode(KafkaRecordSet.this.split.getTopicName(), KafkaRecordSet.this.split.getPartitionId());
                        KafkaRecordSet.log.warn("Fetch response has error: %d", new Object[]{Short.valueOf(errorCode)});
                        throw new RuntimeException("could not fetch data from Kafka, error code is '" + ((int) errorCode) + "'");
                    }
                    this.messageAndOffsetIterator = fetch.messageSet(KafkaRecordSet.this.split.getTopicName(), KafkaRecordSet.this.split.getPartitionId()).iterator();
                }
            } catch (Exception e) {
                if (!(e instanceof PrestoException)) {
                    throw new PrestoException(KafkaErrorCode.KAFKA_SPLIT_ERROR, String.format("Cannot read data from topic '%s', partition '%s', startOffset %s, endOffset %s, leader %s ", KafkaRecordSet.this.split.getTopicName(), Integer.valueOf(KafkaRecordSet.this.split.getPartitionId()), Long.valueOf(KafkaRecordSet.this.split.getStart()), Long.valueOf(KafkaRecordSet.this.split.getEnd()), KafkaRecordSet.this.split.getLeader()), e);
                }
                throw e;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaRecordSet(KafkaSplit kafkaSplit, KafkaSimpleConsumerManager kafkaSimpleConsumerManager, List<KafkaColumnHandle> list, RowDecoder rowDecoder, RowDecoder rowDecoder2) {
        this.split = (KafkaSplit) Objects.requireNonNull(kafkaSplit, "split is null");
        this.consumerManager = (KafkaSimpleConsumerManager) Objects.requireNonNull(kafkaSimpleConsumerManager, "consumerManager is null");
        this.keyDecoder = (RowDecoder) Objects.requireNonNull(rowDecoder, "rowDecoder is null");
        this.messageDecoder = (RowDecoder) Objects.requireNonNull(rowDecoder2, "rowDecoder is null");
        this.columnHandles = (List) Objects.requireNonNull(list, "columnHandles is null");
        ImmutableList.Builder builder = ImmutableList.builder();
        Iterator<KafkaColumnHandle> it = list.iterator();
        while (it.hasNext()) {
            builder.add(it.next().getType());
        }
        this.columnTypes = builder.build();
    }

    public List<Type> getColumnTypes() {
        return this.columnTypes;
    }

    public RecordCursor cursor() {
        return new KafkaRecordCursor();
    }
}
