package co.cask.tigon.data.transaction.queue.inmemory;

import co.cask.tephra.Transaction;
import co.cask.tigon.data.queue.QueueEntry;
import co.cask.tigon.data.queue.QueueName;
import co.cask.tigon.data.transaction.queue.AbstractQueueProducer;
import co.cask.tigon.data.transaction.queue.QueueMetrics;

/* loaded from: input_file:co/cask/tigon/data/transaction/queue/inmemory/InMemoryQueueProducer.class */
public class InMemoryQueueProducer extends AbstractQueueProducer {
    private final QueueName queueName;
    private final InMemoryQueueService queueService;
    private int lastEnqueueCount;
    private Transaction commitTransaction;

    public InMemoryQueueProducer(QueueName queueName, InMemoryQueueService inMemoryQueueService, QueueMetrics queueMetrics) {
        super(queueMetrics, queueName);
        this.queueName = queueName;
        this.queueService = inMemoryQueueService;
    }

    private InMemoryQueue getQueue() {
        return this.queueService.getQueue(this.queueName);
    }

    @Override // co.cask.tigon.data.transaction.queue.AbstractQueueProducer
    public void startTx(Transaction transaction) {
        super.startTx(transaction);
        this.commitTransaction = null;
    }

    @Override // co.cask.tigon.data.transaction.queue.AbstractQueueProducer
    protected int persist(Iterable<QueueEntry> iterable, Transaction transaction) throws Exception {
        this.commitTransaction = transaction;
        int i = 0;
        int i2 = 0;
        InMemoryQueue queue = getQueue();
        for (QueueEntry queueEntry : iterable) {
            int i3 = i;
            i++;
            queue.enqueue(transaction.getWritePointer(), i3, queueEntry);
            i2 += queueEntry.getData().length;
        }
        this.lastEnqueueCount = i;
        return i2;
    }

    @Override // co.cask.tigon.data.transaction.queue.AbstractQueueProducer
    protected void doRollback() {
        if (this.commitTransaction != null) {
            InMemoryQueue queue = getQueue();
            for (int i = 0; i < this.lastEnqueueCount; i++) {
                queue.undoEnqueue(this.commitTransaction.getWritePointer(), i);
            }
        }
    }
}
