package io.streamnative.pulsar.handlers.kop.format;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.streamnative.pulsar.handlers.kop.format.EntryFormatterFactory;
import io.streamnative.pulsar.handlers.kop.offset.OffsetMetadata;
import io.streamnative.pulsar.handlers.kop.utils.ByteBufUtils;
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.TimestampType;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.protocol.Commands;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.class */
public class KafkaEntryFormatter implements EntryFormatter {
    @Override // io.streamnative.pulsar.handlers.kop.format.EntryFormatter
    public ByteBuf encode(MemoryRecords memoryRecords, int i) {
        ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(memoryRecords.buffer());
        ByteBuf serializeMetadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.None, getMessageMetadataWithNumberMessages(i), wrappedBuffer);
        wrappedBuffer.release();
        return serializeMetadataAndPayload;
    }

    @Override // io.streamnative.pulsar.handlers.kop.format.EntryFormatter
    public MemoryRecords decode(List<Entry> list, byte b) {
        int i = 0;
        Iterator<Entry> it = list.iterator();
        while (it.hasNext()) {
            i += it.next().getLength();
        }
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(i), b, CompressionType.NONE, TimestampType.CREATE_TIME, MessageIdUtils.getOffset(list.get(0).getLedgerId(), list.get(0).getEntryId()));
        list.forEach(entry -> {
            ByteBuf dataBuffer = entry.getDataBuffer();
            Commands.skipMessageMetadata(dataBuffer);
            MemoryRecords readableRecords = MemoryRecords.readableRecords(ByteBufUtils.getNioBuffer(dataBuffer));
            long offset = MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId());
            Iterator it2 = readableRecords.records().iterator();
            while (it2.hasNext()) {
                builder.appendWithOffset(offset, (Record) it2.next());
                offset++;
            }
            entry.release();
        });
        return builder.build();
    }

    private static PulsarApi.MessageMetadata getMessageMetadataWithNumberMessages(int i) {
        PulsarApi.MessageMetadata.Builder newBuilder = PulsarApi.MessageMetadata.newBuilder();
        newBuilder.addProperties(PulsarApi.KeyValue.newBuilder().setKey("entry.format").setValue(EntryFormatterFactory.EntryFormat.KAFKA.name().toLowerCase()).build());
        newBuilder.setProducerName(OffsetMetadata.NO_METADATA);
        newBuilder.setSequenceId(0L);
        newBuilder.setPublishTime(0L);
        newBuilder.setNumMessagesInBatch(i);
        return newBuilder.build();
    }
}
