package me.ahoo.eventbus.kafka;

import com.google.common.primitives.Longs;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import me.ahoo.eventbus.core.codec.EventCodec;
import me.ahoo.eventbus.core.compensate.CompensatePublishEvent;
import me.ahoo.eventbus.core.publisher.PublishEvent;
import me.ahoo.eventbus.core.serialize.Deserializer;
import me.ahoo.eventbus.core.serialize.Serializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeaders;

/* loaded from: input_file:me/ahoo/eventbus/kafka/KafkaEventCodec.class */
public class KafkaEventCodec implements EventCodec {
    public static final String EVENT_DATA_ID = "event_data_id";
    private final Serializer serializer;
    private final Deserializer deserializer;

    public KafkaEventCodec(Serializer serializer, Deserializer deserializer) {
        this.serializer = serializer;
        this.deserializer = deserializer;
    }

    public ProducerRecord<Long, String> encode(PublishEvent publishEvent) {
        String serialize = publishEvent instanceof CompensatePublishEvent ? (String) ((CompensatePublishEvent) publishEvent).getEventData() : this.serializer.serialize(publishEvent.getEventData());
        RecordHeaders recordHeaders = new RecordHeaders();
        recordHeaders.add(EVENT_DATA_ID, publishEvent.getEventDataId().toString().getBytes(StandardCharsets.UTF_8));
        return new ProducerRecord<>(publishEvent.getEventName(), (Integer) null, publishEvent.getCreateTime(), publishEvent.getId(), serialize, recordHeaders);
    }

    public PublishEvent decode(ConsumerRecord<Long, String> consumerRecord, Class<?> cls) {
        String str = consumerRecord.topic();
        long longValue = ((Long) consumerRecord.key()).longValue();
        Long valueOf = Long.valueOf(consumerRecord.timestamp());
        Object deserialize = this.deserializer.deserialize((String) consumerRecord.value(), cls);
        Header lastHeader = consumerRecord.headers().lastHeader(EVENT_DATA_ID);
        PublishEvent publishEvent = new PublishEvent();
        publishEvent.setId(Long.valueOf(longValue));
        publishEvent.setEventName(str);
        if (Objects.nonNull(lastHeader)) {
            publishEvent.setEventDataId(Long.valueOf(Longs.tryParse(new String(lastHeader.value(), StandardCharsets.UTF_8)).longValue()));
        }
        publishEvent.setCreateTime(valueOf);
        publishEvent.setEventData(deserialize);
        return publishEvent;
    }
}
