package com.redis.riot.stream;

import com.redis.riot.AbstractTransferCommand;
import com.redis.riot.FlushingTransferOptions;
import com.redis.riot.RedisWriterOptions;
import com.redis.riot.redis.FilteringOptions;
import com.redis.riot.stream.KafkaOptions;
import com.redis.riot.stream.kafka.KafkaItemReaderBuilder;
import com.redis.spring.batch.support.operation.Xadd;
import io.lettuce.core.XAddArgs;
import java.util.ArrayList;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.core.convert.converter.Converter;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import picocli.CommandLine;

@CommandLine.Command(name = "import", description = {"Import Kafka topics into Redis streams"})
/* loaded from: input_file:com/redis/riot/stream/StreamImportCommand.class */
public class StreamImportCommand extends AbstractTransferCommand {
    private static final Logger log = LoggerFactory.getLogger(StreamImportCommand.class);
    private static final String NAME = "stream-import";

    @CommandLine.Parameters(arity = "0..*", description = {"One ore more topics to read from"}, paramLabel = "TOPIC")
    private String[] topics;

    @CommandLine.Option(names = {"--key"}, description = {"Target stream key (default: same as topic)"}, paramLabel = "<string>")
    private String key;

    @CommandLine.Option(names = {"--maxlen"}, description = {"Stream maxlen"}, paramLabel = "<int>")
    private Long maxlen;

    @CommandLine.Option(names = {"--trim"}, description = {"Stream efficient trimming ('~' flag)"})
    private boolean approximateTrimming;

    @CommandLine.Mixin
    private FlushingTransferOptions flushingTransferOptions = new FlushingTransferOptions();

    @CommandLine.Mixin
    private KafkaOptions options = new KafkaOptions();

    @CommandLine.Mixin
    private FilteringOptions filteringOptions = new FilteringOptions();

    @CommandLine.ArgGroup(exclusive = false, heading = "Writer options%n")
    private RedisWriterOptions writerOptions = new RedisWriterOptions();

    public FlushingTransferOptions getFlushingTransferOptions() {
        return this.flushingTransferOptions;
    }

    public void setFlushingTransferOptions(FlushingTransferOptions flushingTransferOptions) {
        this.flushingTransferOptions = flushingTransferOptions;
    }

    public String[] getTopics() {
        return this.topics;
    }

    public void setTopics(String[] strArr) {
        this.topics = strArr;
    }

    public KafkaOptions getOptions() {
        return this.options;
    }

    public void setOptions(KafkaOptions kafkaOptions) {
        this.options = kafkaOptions;
    }

    public String getKey() {
        return this.key;
    }

    public void setKey(String str) {
        this.key = str;
    }

    public Long getMaxlen() {
        return this.maxlen;
    }

    public void setMaxlen(Long l) {
        this.maxlen = l;
    }

    public boolean isApproximateTrimming() {
        return this.approximateTrimming;
    }

    public void setApproximateTrimming(boolean z) {
        this.approximateTrimming = z;
    }

    public FilteringOptions getFilteringOptions() {
        return this.filteringOptions;
    }

    public void setFilteringOptions(FilteringOptions filteringOptions) {
        this.filteringOptions = filteringOptions;
    }

    public RedisWriterOptions getWriterOptions() {
        return this.writerOptions;
    }

    public void setWriterOptions(RedisWriterOptions redisWriterOptions) {
        this.writerOptions = redisWriterOptions;
    }

    protected Flow flow() throws Exception {
        Assert.isTrue(!ObjectUtils.isEmpty(this.topics), "No topic specified");
        ArrayList arrayList = new ArrayList();
        Properties consumerProperties = this.options.consumerProperties();
        log.debug("Using Kafka consumer properties: {}", consumerProperties);
        for (String str : this.topics) {
            log.debug("Creating Kafka reader for topic {}", str);
            arrayList.add(riotStep(str + "-" + NAME, "Importing from " + str).reader(new KafkaItemReaderBuilder().partitions(0).consumerProperties(consumerProperties).partitions(0).name(str).saveState(false).topic(str).build()).writer(this.writerOptions.configureWriter(writer(getRedisOptions()).operation(Xadd.key(keyConverter()).body(bodyConverter()).args(xAddArgs()).build())).build()).flushingOptions(this.flushingTransferOptions).build().build());
        }
        return flow(NAME, (Step[]) arrayList.toArray(new Step[0]));
    }

    private Converter<ConsumerRecord<String, Object>, Map<String, String>> bodyConverter() {
        return this.options.getSerde() == KafkaOptions.SerDe.JSON ? new JsonToMapConverter(this.filteringOptions.converter()) : new AvroToMapConverter(this.filteringOptions.converter());
    }

    private Converter<ConsumerRecord<String, Object>, String> keyConverter() {
        return this.key == null ? (v0) -> {
            return v0.topic();
        } : consumerRecord -> {
            return this.key;
        };
    }

    private XAddArgs xAddArgs() {
        if (this.maxlen == null) {
            return null;
        }
        XAddArgs xAddArgs = new XAddArgs();
        xAddArgs.maxlen(this.maxlen.longValue());
        xAddArgs.approximateTrimming(this.approximateTrimming);
        return xAddArgs;
    }
}
