package com.redislabs.riot.transfer;

import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStream;

/* loaded from: input_file:com/redislabs/riot/transfer/FlowThread.class */
public class FlowThread implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(FlowThread.class);
    public static final String CONTEXT_PARTITION = "partition";
    public static final String CONTEXT_PARTITIONS = "partitions";
    private int threadId;
    private int threads;
    private Flow flow;
    private Batcher batcher;
    private Long flushRate;
    private long readCount;
    private long writeCount;
    private boolean running;
    private boolean stopped;

    @Override // java.lang.Runnable
    public void run() {
        try {
            ExecutionContext executionContext = new ExecutionContext();
            executionContext.putInt("partition", this.threadId);
            executionContext.putInt("partitions", this.threads);
            if (this.flow.reader() instanceof ItemStream) {
                this.flow.reader().open(executionContext);
            }
            if (this.flow.writer() instanceof ItemStream) {
                this.flow.writer().open(executionContext);
            }
            ScheduledExecutorService scheduledExecutorService = null;
            ScheduledFuture<?> scheduledFuture = null;
            if (this.flushRate != null) {
                scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
                scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this::flush, this.flushRate.longValue(), this.flushRate.longValue(), TimeUnit.MILLISECONDS);
            }
            this.running = true;
            while (true) {
                List next = this.batcher.next();
                if (next == null || this.stopped) {
                    break;
                } else {
                    write(next);
                }
            }
            if (scheduledExecutorService != null) {
                scheduledExecutorService.shutdown();
            }
            if (scheduledFuture != null) {
                scheduledFuture.cancel(true);
            }
            log.debug("Closing reader");
            if (this.flow.reader() instanceof ItemStream) {
                this.flow.reader().close();
            }
            if (this.flow.writer() instanceof ItemStream) {
                this.flow.writer().close();
            }
            this.running = false;
            log.debug("Flow {} thread {} finished", this.flow.name(), Integer.valueOf(this.threadId));
        } catch (Throwable th) {
            log.error("Flow {} execution failed", this.flow.name(), th);
        }
    }

    public Metrics progress() {
        return new Metrics().reads(this.readCount).writes(this.writeCount).runningThreads(this.running ? 1 : 0);
    }

    public void stop() {
        this.stopped = true;
    }

    private void write(List list) {
        this.readCount += list.size();
        try {
            this.flow.writer().write(list);
            this.writeCount += list.size();
        } catch (Exception e) {
            log.error("Could not write items", e);
        }
    }

    private void flush() {
        List flush = this.batcher.flush();
        if (flush.isEmpty()) {
            return;
        }
        write(flush);
    }

    public FlowThread threadId(int i) {
        this.threadId = i;
        return this;
    }

    public FlowThread threads(int i) {
        this.threads = i;
        return this;
    }

    public FlowThread flow(Flow flow) {
        this.flow = flow;
        return this;
    }

    public FlowThread batcher(Batcher batcher) {
        this.batcher = batcher;
        return this;
    }

    public FlowThread flushRate(Long l) {
        this.flushRate = l;
        return this;
    }

    public long readCount() {
        return this.readCount;
    }

    public long writeCount() {
        return this.writeCount;
    }

    public boolean running() {
        return this.running;
    }
}
