package com.huaweicloud.dis.adapter.kafka.producer;

import com.huaweicloud.dis.DISConfig;
import com.huaweicloud.dis.adapter.common.Utils;
import com.huaweicloud.dis.adapter.common.model.DisProducerRecord;
import com.huaweicloud.dis.adapter.common.model.ProduceCallback;
import com.huaweicloud.dis.adapter.common.producer.DISProducer;
import com.huaweicloud.dis.adapter.common.producer.IDISProducer;
import com.huaweicloud.dis.core.DISCredentials;
import com.huaweicloud.dis.core.util.StringUtils;
import com.huaweicloud.dis.iface.data.response.PutRecordsResult;
import com.huaweicloud.dis.iface.data.response.PutRecordsResultEntry;
import com.huaweicloud.dis.iface.stream.response.DescribeStreamResult;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huaweicloud/dis/adapter/kafka/producer/DISKafkaProducer.class */
public class DISKafkaProducer<K, V> implements Producer<K, V> {
    private static final Logger log = LoggerFactory.getLogger(DISProducer.class);
    private IDISProducer disProducer;
    private DISConfig disConfig;
    private Serializer<K> keySerializer;
    private Serializer<V> valueSerializer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/huaweicloud/dis/adapter/kafka/producer/DISKafkaProducer$RecordMetadataFuture.class */
    public class RecordMetadataFuture implements Future<RecordMetadata> {
        private Future<PutRecordsResult> future;
        private String streamName;

        public RecordMetadataFuture(String str, Future<PutRecordsResult> future) {
            this.future = future;
            this.streamName = str;
        }

        @Override // java.util.concurrent.Future
        public boolean cancel(boolean z) {
            return this.future.cancel(z);
        }

        @Override // java.util.concurrent.Future
        public boolean isCancelled() {
            return this.future.isCancelled();
        }

        @Override // java.util.concurrent.Future
        public boolean isDone() {
            return this.future.isDone();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Future
        public RecordMetadata get() throws InterruptedException, ExecutionException {
            PutRecordsResult putRecordsResult = this.future.get();
            if (putRecordsResult == null) {
                return null;
            }
            return DISKafkaProducer.this.buildRecordMetadata(this.streamName, putRecordsResult);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Future
        public RecordMetadata get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
            PutRecordsResult putRecordsResult = this.future.get(j, timeUnit);
            if (putRecordsResult == null) {
                return null;
            }
            return DISKafkaProducer.this.buildRecordMetadata(this.streamName, putRecordsResult);
        }
    }

    public DISKafkaProducer(Map map) {
        this((Map<String, Object>) map, (Serializer) null, (Serializer) null);
    }

    public DISKafkaProducer(Properties properties) {
        this(properties, (Serializer) null, (Serializer) null);
    }

    public DISKafkaProducer(DISConfig dISConfig) {
        this(dISConfig, (Serializer) null, (Serializer) null);
    }

    public DISKafkaProducer(Map<String, Object> map, Serializer<K> serializer, Serializer<V> serializer2) {
        this(Utils.newDisConfig(map), serializer, serializer2);
    }

    public DISKafkaProducer(Properties properties, Serializer<K> serializer, Serializer<V> serializer2) {
        this((Map<String, Object>) properties, serializer, serializer2);
    }

    private DISKafkaProducer(DISConfig dISConfig, Serializer<K> serializer, Serializer<V> serializer2) {
        this.disConfig = dISConfig;
        this.disProducer = new DISProducer(dISConfig);
        init(serializer, serializer2);
    }

    private void init(Serializer<K> serializer, Serializer<V> serializer2) {
        this.keySerializer = serializer;
        this.valueSerializer = serializer2;
        if (serializer == null) {
            Class<?> cls = StringSerializer.class;
            String str = (String) this.disConfig.get("key.serializer");
            if (str != null) {
                try {
                    cls = Class.forName(str);
                } catch (ClassNotFoundException e) {
                    log.error(e.getMessage());
                }
            }
            try {
                this.keySerializer = (Serializer) cls.newInstance();
            } catch (IllegalAccessException | InstantiationException e2) {
                log.error(e2.getMessage());
            }
        }
        if (serializer2 == null) {
            Class<?> cls2 = StringSerializer.class;
            String str2 = (String) this.disConfig.get("value.serializer");
            if (str2 != null) {
                try {
                    cls2 = Class.forName(str2);
                } catch (ClassNotFoundException e3) {
                    log.error(e3.getMessage());
                }
            }
            try {
                this.valueSerializer = (Serializer) cls2.newInstance();
            } catch (IllegalAccessException | InstantiationException e4) {
                log.error(e4.getMessage());
            }
        }
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
        return send(producerRecord, null);
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord, final Callback callback) {
        final String str = producerRecord.topic();
        Integer partition = producerRecord.partition();
        Long timestamp = producerRecord.timestamp();
        String str2 = null;
        Object key = producerRecord.key();
        Object value = producerRecord.value();
        ByteBuffer wrap = value instanceof byte[] ? ByteBuffer.wrap((byte[]) value) : ByteBuffer.wrap(this.valueSerializer.serialize(str, value));
        if (key != null) {
            str2 = new String(this.keySerializer.serialize(str, key));
        }
        ProduceCallback produceCallback = null;
        if (callback != null) {
            produceCallback = new ProduceCallback() { // from class: com.huaweicloud.dis.adapter.kafka.producer.DISKafkaProducer.1
                public void onCompletion(PutRecordsResult putRecordsResult, Exception exc) {
                    if (exc == null) {
                        callback.onCompletion(DISKafkaProducer.this.buildRecordMetadata(str, putRecordsResult), (Exception) null);
                    } else {
                        callback.onCompletion((RecordMetadata) null, exc);
                    }
                }
            };
        }
        return new RecordMetadataFuture(str, this.disProducer.send(new DisProducerRecord(str, partition, timestamp, str2, wrap), produceCallback));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public RecordMetadata buildRecordMetadata(String str, PutRecordsResult putRecordsResult) {
        PutRecordsResultEntry putRecordsResultEntry = (PutRecordsResultEntry) putRecordsResult.getRecords().get(0);
        if (StringUtils.isNullOrEmpty(putRecordsResultEntry.getErrorCode())) {
            return new RecordMetadata(new TopicPartition(str, getKafkaPartitionFromShardId(putRecordsResultEntry.getPartitionId())), 0L, Long.parseLong(putRecordsResultEntry.getSequenceNumber()), -1L, -1L, -1, -1);
        }
        throw new RuntimeException(putRecordsResultEntry.getErrorCode());
    }

    private int getKafkaPartitionFromShardId(String str) {
        int indexOf = str.indexOf("0");
        return Integer.parseInt(str.substring(indexOf == -1 ? 0 : indexOf));
    }

    public void flush() {
        this.disProducer.flush();
    }

    public List<PartitionInfo> partitionsFor(String str) {
        ArrayList arrayList = new ArrayList();
        DescribeStreamResult describeStream = this.disProducer.describeStream(str);
        for (int i = 0; i < describeStream.getWritablePartitionCount(); i++) {
            arrayList.add(new PartitionInfo(str, i, Node.noNode(), (Node[]) null, (Node[]) null));
        }
        return arrayList;
    }

    public Map<MetricName, ? extends Metric> metrics() {
        return null;
    }

    public void close() {
        this.disProducer.close();
    }

    public void close(long j, TimeUnit timeUnit) {
        this.disProducer.close(j, timeUnit);
    }

    public void initTransactions() {
        throw new UnsupportedOperationException();
    }

    public void beginTransaction() throws ProducerFencedException {
        throw new UnsupportedOperationException();
    }

    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, String str) throws ProducerFencedException {
        throw new UnsupportedOperationException();
    }

    public void commitTransaction() throws ProducerFencedException {
        throw new UnsupportedOperationException();
    }

    public void abortTransaction() throws ProducerFencedException {
        throw new UnsupportedOperationException();
    }

    public void updateCredentials(DISCredentials dISCredentials) {
        this.disProducer.updateCredentials(dISCredentials);
    }
}
