package co.cask.cdap.data2.transaction.queue.hbase;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data2.queue.ConsumerConfig;
import co.cask.cdap.data2.queue.DequeueResult;
import co.cask.cdap.data2.transaction.queue.AbstractQueueConsumer;
import co.cask.cdap.data2.transaction.queue.QueueEntryRow;
import co.cask.cdap.data2.transaction.queue.QueueScanner;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.tephra.Transaction;

/* loaded from: input_file:co/cask/cdap/data2/transaction/queue/hbase/HBaseQueueConsumer.class */
abstract class HBaseQueueConsumer extends AbstractQueueConsumer {
    private final HTable hTable;
    private final HBaseConsumerState state;
    private final HBaseConsumerStateStore stateStore;
    private final HBaseQueueStrategy queueStrategy;
    private boolean closed;
    private boolean canConsume;
    private boolean completed;

    HBaseQueueConsumer(CConfiguration cConfiguration, HTable hTable, QueueName queueName, HBaseConsumerState hBaseConsumerState, HBaseConsumerStateStore hBaseConsumerStateStore, HBaseQueueStrategy hBaseQueueStrategy) {
        super(cConfiguration, hBaseConsumerState.getConsumerConfig(), queueName, hBaseConsumerState.getStartRow());
        this.hTable = hTable;
        this.state = hBaseConsumerState;
        this.stateStore = hBaseConsumerStateStore;
        this.queueStrategy = hBaseQueueStrategy;
        this.canConsume = false;
    }

    @Override // co.cask.cdap.data2.transaction.queue.AbstractQueueConsumer, co.cask.cdap.data2.queue.QueueConsumer
    public DequeueResult<byte[]> dequeue(int i) throws IOException {
        DequeueResult<byte[]> dequeue = super.dequeue(i);
        if (this.canConsume && dequeue.isEmpty() && this.state.getNextBarrier() != null) {
            this.stateStore.completed(this.state.getConsumerConfig().getGroupId(), this.state.getConsumerConfig().getInstanceId());
            this.completed = true;
        }
        return dequeue;
    }

    @Override // co.cask.cdap.data2.transaction.queue.AbstractQueueConsumer
    protected boolean claimEntry(byte[] bArr, byte[] bArr2) throws IOException {
        Put put = new Put(this.queueStrategy.getActualRowKey(getConfig(), bArr));
        put.add(QueueEntryRow.COLUMN_FAMILY, this.stateColumnName, bArr2);
        return this.hTable.checkAndPut(put.getRow(), QueueEntryRow.COLUMN_FAMILY, this.stateColumnName, (byte[]) null, put);
    }

    @Override // co.cask.cdap.data2.transaction.queue.AbstractQueueConsumer
    protected void updateState(Set<byte[]> set, byte[] bArr, byte[] bArr2) throws IOException {
        if (set.isEmpty()) {
            return;
        }
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(set.size());
        Iterator<byte[]> it = set.iterator();
        while (it.hasNext()) {
            Put put = new Put(this.queueStrategy.getActualRowKey(getConfig(), it.next()));
            put.add(QueueEntryRow.COLUMN_FAMILY, bArr, bArr2);
            newArrayListWithCapacity.add(put);
        }
        this.hTable.put(newArrayListWithCapacity);
        this.hTable.flushCommits();
    }

    @Override // co.cask.cdap.data2.transaction.queue.AbstractQueueConsumer
    protected void undoState(Set<byte[]> set, byte[] bArr) throws IOException, InterruptedException {
        if (set.isEmpty()) {
            return;
        }
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(set.size());
        Iterator<byte[]> it = set.iterator();
        while (it.hasNext()) {
            Delete delete = new Delete(this.queueStrategy.getActualRowKey(getConfig(), it.next()));
            delete.deleteColumns(QueueEntryRow.COLUMN_FAMILY, bArr);
            newArrayListWithCapacity.add(delete);
        }
        this.hTable.batch(newArrayListWithCapacity);
        this.hTable.flushCommits();
    }

    @Override // co.cask.cdap.data2.transaction.queue.AbstractQueueConsumer
    protected QueueScanner getScanner(byte[] bArr, byte[] bArr2, int i) throws IOException {
        if (!this.canConsume) {
            byte[] previousBarrier = this.state.getPreviousBarrier();
            this.canConsume = previousBarrier == null || this.stateStore.isAllConsumed(getConfig().getGroupId(), previousBarrier);
            if (!this.canConsume) {
                return QueueScanner.EMPTY;
            }
        }
        return this.queueStrategy.createScanner(getConfig(), this.hTable, createScan(bArr, getScanStopRow(bArr2), i, DequeueScanAttributes.addAttribute(this.transaction, DequeueScanAttributes.addAttribute(getConfig(), new HashMap()))), i);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.closed) {
            return;
        }
        this.closed = true;
        Closeables.closeQuietly(this.queueStrategy);
        Closeables.closeQuietly(this.stateStore);
        Closeables.closeQuietly(this.hTable);
    }

    @Override // co.cask.cdap.data2.transaction.queue.AbstractQueueConsumer
    public void startTx(Transaction transaction) {
        super.startTx(transaction);
        this.stateStore.startTx(transaction);
        this.completed = false;
    }

    @Override // co.cask.cdap.data2.transaction.queue.AbstractQueueConsumer
    public boolean rollbackTx() throws Exception {
        return this.stateStore.rollbackTx() && super.rollbackTx();
    }

    @Override // co.cask.cdap.data2.transaction.queue.AbstractQueueConsumer
    public boolean commitTx() throws Exception {
        return super.commitTx() && this.stateStore.commitTx();
    }

    public void postTxCommit() {
        this.stateStore.postTxCommit();
        if (this.completed) {
            Closeables.closeQuietly(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isClosed() {
        return this.closed;
    }

    @Override // co.cask.cdap.data2.transaction.queue.AbstractQueueConsumer
    protected void updateStartRow(byte[] bArr) {
        if (!this.canConsume || this.completed) {
            return;
        }
        ConsumerConfig config = getConfig();
        this.stateStore.updateState(config.getGroupId(), config.getInstanceId(), bArr);
    }

    protected abstract Scan createScan(byte[] bArr, byte[] bArr2, int i, Map<String, byte[]> map);

    private byte[] getScanStopRow(byte[] bArr) {
        byte[] nextBarrier = this.state.getNextBarrier();
        return (nextBarrier == null || Bytes.compareTo(bArr, nextBarrier) < 0) ? bArr : nextBarrier;
    }
}
