package com.graphaware.tx.executor.batch;

import com.graphaware.common.util.BlockingArrayBlockingQueue;
import com.graphaware.tx.executor.NullItem;
import com.graphaware.tx.executor.single.KeepCalmAndCarryOn;
import com.graphaware.tx.executor.single.SimpleTransactionExecutor;
import com.graphaware.tx.executor.single.TransactionCallback;
import com.graphaware.tx.executor.single.TransactionExecutor;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.neo4j.graphdb.GraphDatabaseService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/graphaware/tx/executor/batch/IterableInputBatchTransactionExecutor.class */
public class IterableInputBatchTransactionExecutor<T> extends DisposableBatchTransactionExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(IterableInputBatchTransactionExecutor.class);
    private final int batchSize;
    private final UnitOfWork<T> unitOfWork;
    protected final Iterable<T> input;
    protected final TransactionExecutor executor;
    protected final AtomicInteger totalSteps = new AtomicInteger(0);
    protected final AtomicInteger batches = new AtomicInteger(0);
    protected final AtomicInteger successfulSteps = new AtomicInteger(0);
    protected final AtomicBoolean finished = new AtomicBoolean(false);
    protected final BlockingArrayBlockingQueue<T> queue = new BlockingArrayBlockingQueue<>(10000);

    public IterableInputBatchTransactionExecutor(GraphDatabaseService graphDatabaseService, int i, Iterable<T> iterable, UnitOfWork<T> unitOfWork) {
        this.batchSize = i;
        this.unitOfWork = unitOfWork;
        this.input = iterable;
        this.executor = new SimpleTransactionExecutor(graphDatabaseService);
    }

    @Override // com.graphaware.tx.executor.batch.DisposableBatchTransactionExecutor
    protected void doExecute() {
        populateQueue();
        processQueue();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void populateQueue() {
        new Thread(new Runnable() { // from class: com.graphaware.tx.executor.batch.IterableInputBatchTransactionExecutor.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    try {
                        Iterator<T> it = IterableInputBatchTransactionExecutor.this.input.iterator();
                        while (it.hasNext()) {
                            IterableInputBatchTransactionExecutor.this.queue.offer(it.next());
                        }
                        IterableInputBatchTransactionExecutor.this.finished.set(true);
                    } catch (Exception e) {
                        IterableInputBatchTransactionExecutor.LOG.warn("Exception while producing input!", e);
                        IterableInputBatchTransactionExecutor.this.finished.set(true);
                    }
                } catch (Throwable th) {
                    IterableInputBatchTransactionExecutor.this.finished.set(true);
                    throw th;
                }
            }
        }).start();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void processQueue() {
        while (notFinished()) {
            final int incrementAndGet = this.batches.incrementAndGet();
            if (LOG.isTraceEnabled()) {
                LOG.trace("Starting a transaction for batch number " + incrementAndGet);
            }
            final AtomicInteger atomicInteger = new AtomicInteger(0);
            final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            if (((NullItem) this.executor.executeInTransaction(new TransactionCallback<NullItem>() { // from class: com.graphaware.tx.executor.batch.IterableInputBatchTransactionExecutor.2
                /* JADX WARN: Can't rename method to resolve collision */
                /* JADX WARN: Multi-variable type inference failed */
                @Override // com.graphaware.tx.executor.single.TransactionCallback
                public NullItem doInTransaction(GraphDatabaseService graphDatabaseService) {
                    while (IterableInputBatchTransactionExecutor.this.notFinished() && atomicInteger.get() < IterableInputBatchTransactionExecutor.this.batchSize) {
                        try {
                            Object poll = IterableInputBatchTransactionExecutor.this.queue.poll(100L, TimeUnit.MILLISECONDS);
                            if (poll == null) {
                                if (IterableInputBatchTransactionExecutor.this.finished.get()) {
                                    break;
                                }
                                IterableInputBatchTransactionExecutor.LOG.warn("Waited for over 100ms but no input arrived. Still expecting more input. ");
                            } else {
                                atomicBoolean.set(true);
                                IterableInputBatchTransactionExecutor.this.totalSteps.incrementAndGet();
                                IterableInputBatchTransactionExecutor.this.unitOfWork.execute(graphDatabaseService, poll, incrementAndGet, atomicInteger.incrementAndGet());
                            }
                        } catch (InterruptedException e) {
                        }
                    }
                    return NullItem.getInstance();
                }
            }, KeepCalmAndCarryOn.getInstance())) != null) {
                this.successfulSteps.addAndGet(atomicInteger.get());
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Committed transaction for batch number " + incrementAndGet);
                }
            } else {
                LOG.warn("Rolled back transaction for batch number " + incrementAndGet);
                if (!atomicBoolean.get()) {
                    LOG.warn("Throwing away the head of the queue as the transaction seems to have failed before polling...");
                    this.queue.poll();
                }
            }
        }
        LOG.debug("Successfully executed " + this.successfulSteps + " (out of " + this.totalSteps.get() + " ) steps in " + this.batches + " batches");
        if (this.successfulSteps.get() != this.totalSteps.get()) {
            LOG.warn("Failed to execute " + (this.totalSteps.get() - this.successfulSteps.get()) + " steps!");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean notFinished() {
        return (this.finished.get() && this.queue.isEmpty()) ? false : true;
    }
}
