package com.redis.riot.stream;

import com.redis.riot.AbstractTransferCommand;
import com.redis.riot.FlushingTransferOptions;
import com.redis.riot.stream.KafkaOptions;
import com.redis.riot.stream.kafka.KafkaItemWriter;
import com.redis.riot.stream.processor.AvroProducerProcessor;
import com.redis.riot.stream.processor.JsonProducerProcessor;
import io.lettuce.core.StreamMessage;
import java.util.ArrayList;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.core.convert.converter.Converter;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import picocli.CommandLine;

@CommandLine.Command(name = "export", description = {"Import Redis streams into Kafka topics"})
/* loaded from: input_file:com/redis/riot/stream/StreamExportCommand.class */
public class StreamExportCommand extends AbstractTransferCommand {
    private static final Logger log = LoggerFactory.getLogger(StreamExportCommand.class);
    private static final String NAME = "stream-export";

    @CommandLine.Parameters(arity = "0..*", description = {"One ore more streams to read from"}, paramLabel = "STREAM")
    private String[] streams;

    @CommandLine.Option(names = {"--topic"}, description = {"Target topic key (default: same as stream)"}, paramLabel = "<string>")
    private String topic;

    @CommandLine.Mixin
    private FlushingTransferOptions flushingTransferOptions = new FlushingTransferOptions();

    @CommandLine.Mixin
    private KafkaOptions options = new KafkaOptions();

    @CommandLine.Option(names = {"--offset"}, description = {"XREAD offset (default: ${DEFAULT-VALUE})"}, paramLabel = "<string>")
    private String offset = "0-0";

    public FlushingTransferOptions getFlushingTransferOptions() {
        return this.flushingTransferOptions;
    }

    public void setFlushingTransferOptions(FlushingTransferOptions flushingTransferOptions) {
        this.flushingTransferOptions = flushingTransferOptions;
    }

    public String[] getStreams() {
        return this.streams;
    }

    public void setStreams(String[] strArr) {
        this.streams = strArr;
    }

    public KafkaOptions getOptions() {
        return this.options;
    }

    public void setOptions(KafkaOptions kafkaOptions) {
        this.options = kafkaOptions;
    }

    public String getOffset() {
        return this.offset;
    }

    public void setOffset(String str) {
        this.offset = str;
    }

    public String getTopic() {
        return this.topic;
    }

    public void setTopic(String str) {
        this.topic = str;
    }

    protected Flow flow() throws Exception {
        Assert.isTrue(!ObjectUtils.isEmpty(this.streams), "No stream specified");
        ArrayList arrayList = new ArrayList();
        for (String str : this.streams) {
            arrayList.add(riotStep(str + "-" + NAME, "Exporting from " + str).reader(reader(getRedisOptions()).stream(str).build()).processor(processor()).writer(writer()).flushingOptions(this.flushingTransferOptions).build().build());
        }
        return flow(NAME, (Step[]) arrayList.toArray(new Step[0]));
    }

    private KafkaItemWriter<String> writer() {
        Map<String, Object> producerProperties = this.options.producerProperties();
        log.debug("Creating Kafka writer with producer properties {}", producerProperties);
        return new KafkaItemWriter<>(new KafkaTemplate(new DefaultKafkaProducerFactory(producerProperties)));
    }

    private ItemProcessor<StreamMessage<String, String>, ProducerRecord<String, Object>> processor() {
        return this.options.getSerde() == KafkaOptions.SerDe.JSON ? new JsonProducerProcessor(topicConverter()) : new AvroProducerProcessor(topicConverter());
    }

    private Converter<StreamMessage<String, String>, String> topicConverter() {
        return this.topic == null ? (v0) -> {
            return v0.getStream();
        } : streamMessage -> {
            return this.topic;
        };
    }
}
