package com.redis.riot;

import com.redis.riot.ProgressMonitor;
import com.redis.riot.TransferOptions;
import com.redis.spring.batch.RedisItemReader;
import com.redis.spring.batch.RedisItemWriter;
import com.redis.spring.batch.RedisScanSizeEstimator;
import com.redis.spring.batch.reader.PollableItemReader;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy;
import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy;
import org.springframework.batch.core.step.skip.SkipPolicy;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.support.SynchronizedItemStreamReader;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import picocli.CommandLine;

/* loaded from: input_file:com/redis/riot/AbstractTransferCommand.class */
public abstract class AbstractTransferCommand extends AbstractRiotCommand {
    private static final Logger log = LoggerFactory.getLogger(AbstractTransferCommand.class);

    @CommandLine.Mixin
    protected TransferOptions transferOptions = new TransferOptions();

    /* JADX INFO: Access modifiers changed from: protected */
    public RedisItemReader.Builder<String, String> stringReader(RedisOptions redisOptions) {
        return reader(redisOptions, StringCodec.UTF8);
    }

    protected <K, V> RedisItemReader.Builder<K, V> reader(RedisOptions redisOptions, RedisCodec<K, V> redisCodec) {
        return redisOptions.isCluster() ? RedisItemReader.client(redisOptions.redisModulesClusterClient(), redisCodec) : RedisItemReader.client(redisOptions.redisModulesClient(), redisCodec);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <K, V> RedisItemWriter.OperationBuilder<K, V> writer(RedisOptions redisOptions, RedisCodec<K, V> redisCodec) {
        return redisOptions.isCluster() ? RedisItemWriter.client(redisOptions.redisModulesClusterClient(), redisCodec) : RedisItemWriter.client(redisOptions.redisModulesClient(), redisCodec);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RedisScanSizeEstimator.Builder estimator() {
        RedisOptions redisOptions = getRedisOptions();
        return redisOptions.isCluster() ? RedisScanSizeEstimator.client(redisOptions.redisModulesClusterClient()) : RedisScanSizeEstimator.client(redisOptions.redisModulesClient());
    }

    public <I, O> FaultTolerantStepBuilder<I, O> step(RiotStep<I, O> riotStep) throws Exception {
        SimpleStepBuilder<I, O> chunk = chunk(riotStep.getName(), this.transferOptions.getChunkSize());
        chunk.reader(riotStep.getReader()).writer(riotStep.getWriter());
        Optional<ItemProcessor<I, O>> processor = riotStep.getProcessor();
        Objects.requireNonNull(chunk);
        processor.ifPresent(chunk::processor);
        if (this.transferOptions.getProgress() != TransferOptions.Progress.NONE) {
            ProgressMonitor.ProgressMonitorBuilder builder = ProgressMonitor.builder();
            builder.style(this.transferOptions.getProgress());
            builder.taskName(riotStep.getTaskName());
            builder.initialMax(riotStep.getInitialMax());
            builder.updateInterval(Duration.ofMillis(this.transferOptions.getProgressUpdateIntervalMillis()));
            builder.extraMessage(riotStep.getExtraMessage());
            ProgressMonitor build = builder.build();
            chunk.listener(build);
            chunk.listener(build);
        }
        FaultTolerantStepBuilder<I, O> skipPolicy = chunk.faultTolerant().skipPolicy(skipPolicy(this.transferOptions.getSkipPolicy()));
        if (this.transferOptions.getThreads() > 1) {
            skipPolicy.reader(synchronize(riotStep.getReader()));
            ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
            threadPoolTaskExecutor.setCorePoolSize(this.transferOptions.getThreads());
            threadPoolTaskExecutor.setMaxPoolSize(this.transferOptions.getThreads());
            threadPoolTaskExecutor.setQueueCapacity(this.transferOptions.getThreads());
            threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            threadPoolTaskExecutor.afterPropertiesSet();
            log.debug("Created pooled task executor of size {}", Integer.valueOf(threadPoolTaskExecutor.getCorePoolSize()));
            skipPolicy.taskExecutor(threadPoolTaskExecutor).throttleLimit(this.transferOptions.getThreads());
        } else {
            skipPolicy.taskExecutor(new SyncTaskExecutor());
        }
        return skipPolicy;
    }

    protected <I, O> SimpleStepBuilder<I, O> chunk(String str, int i) throws Exception {
        return getJobRunner().step(str).chunk(i);
    }

    private <I> ItemReader<I> synchronize(ItemReader<I> itemReader) {
        if (itemReader instanceof PollableItemReader) {
            SynchronizedPollableItemReader synchronizedPollableItemReader = new SynchronizedPollableItemReader();
            synchronizedPollableItemReader.setDelegate((PollableItemReader) itemReader);
            return synchronizedPollableItemReader;
        }
        if (!(itemReader instanceof ItemStreamReader)) {
            return itemReader;
        }
        SynchronizedItemStreamReader synchronizedItemStreamReader = new SynchronizedItemStreamReader();
        synchronizedItemStreamReader.setDelegate((ItemStreamReader) itemReader);
        return synchronizedItemStreamReader;
    }

    protected <I, O> FaultTolerantStepBuilder<I, O> faultTolerant(SimpleStepBuilder<I, O> simpleStepBuilder) {
        return simpleStepBuilder.faultTolerant();
    }

    private SkipPolicy skipPolicy(TransferOptions.SkipPolicy skipPolicy) {
        switch (skipPolicy) {
            case ALWAYS:
                return new AlwaysSkipItemSkipPolicy();
            case NEVER:
                return new NeverSkipItemSkipPolicy();
            default:
                return RedisItemReader.limitCheckingSkipPolicy(this.transferOptions.getSkipLimit());
        }
    }
}
