package co.cask.tigon.data.transaction.queue.hbase;

import co.cask.tigon.data.co.cask.tigon.data.hbase.wd.DistributedScanner;
import co.cask.tigon.data.queue.ConsumerConfig;
import co.cask.tigon.data.queue.QueueName;
import co.cask.tigon.data.transaction.queue.AbstractQueueConsumer;
import co.cask.tigon.data.transaction.queue.QueueEntryRow;
import co.cask.tigon.data.transaction.queue.QueueScanner;
import co.cask.tigon.utils.ImmutablePair;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/tigon/data/transaction/queue/hbase/HBaseQueueConsumer.class */
abstract class HBaseQueueConsumer extends AbstractQueueConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(HBaseQueueConsumer.class);
    private static final int PERSIST_START_ROW_LIMIT = 1000;
    private final HTable hTable;
    private final HBaseConsumerStateStore stateStore;
    private boolean closed;
    private final ExecutorService scansExecutor;

    /* loaded from: input_file:co/cask/tigon/data/transaction/queue/hbase/HBaseQueueConsumer$HBaseQueueScanner.class */
    private class HBaseQueueScanner implements QueueScanner {
        private final ResultScanner scanner;
        private final LinkedList<Result> cached = Lists.newLinkedList();
        private final int numRows;

        public HBaseQueueScanner(ResultScanner resultScanner, int i) {
            this.scanner = resultScanner;
            this.numRows = i;
        }

        @Override // co.cask.tigon.data.transaction.queue.QueueScanner
        public ImmutablePair<byte[], Map<byte[], byte[]>> next() throws IOException {
            while (this.cached.size() <= 0) {
                Result[] next = this.scanner.next(this.numRows);
                if (next.length == 0) {
                    return null;
                }
                Collections.addAll(this.cached, next);
            }
            Result removeFirst = this.cached.removeFirst();
            return ImmutablePair.of(HBaseQueueAdmin.ROW_KEY_DISTRIBUTOR.getOriginalKey(removeFirst.getRow()), removeFirst.getFamilyMap(QueueEntryRow.COLUMN_FAMILY));
        }

        @Override // co.cask.tigon.data.transaction.queue.QueueScanner
        public void close() throws IOException {
            this.scanner.close();
        }
    }

    HBaseQueueConsumer(ConsumerConfig consumerConfig, HTable hTable, QueueName queueName, HBaseConsumerState hBaseConsumerState, HBaseConsumerStateStore hBaseConsumerStateStore) {
        super(consumerConfig, queueName);
        this.hTable = hTable;
        this.scansExecutor = new ThreadPoolExecutor(1, 20, 60L, TimeUnit.SECONDS, new SynchronousQueue(), Threads.newDaemonThreadFactory("queue-consumer-scan"));
        ((ThreadPoolExecutor) this.scansExecutor).allowCoreThreadTimeOut(true);
        this.stateStore = hBaseConsumerStateStore;
        byte[] startRow = hBaseConsumerState.getStartRow();
        if (startRow == null || startRow.length <= 0) {
            return;
        }
        this.startRow = startRow;
    }

    @Override // co.cask.tigon.data.transaction.queue.AbstractQueueConsumer
    protected boolean claimEntry(byte[] bArr, byte[] bArr2) throws IOException {
        byte[] distributedKey = HBaseQueueAdmin.ROW_KEY_DISTRIBUTOR.getDistributedKey(bArr);
        Put put = new Put(distributedKey);
        put.add(QueueEntryRow.COLUMN_FAMILY, this.stateColumnName, bArr2);
        return this.hTable.checkAndPut(distributedKey, QueueEntryRow.COLUMN_FAMILY, this.stateColumnName, (byte[]) null, put);
    }

    @Override // co.cask.tigon.data.transaction.queue.AbstractQueueConsumer
    protected void updateState(Set<byte[]> set, byte[] bArr, byte[] bArr2) throws IOException {
        if (set.isEmpty()) {
            return;
        }
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(set.size());
        Iterator<byte[]> it = set.iterator();
        while (it.hasNext()) {
            Put put = new Put(HBaseQueueAdmin.ROW_KEY_DISTRIBUTOR.getDistributedKey(it.next()));
            put.add(QueueEntryRow.COLUMN_FAMILY, bArr, bArr2);
            newArrayListWithCapacity.add(put);
        }
        this.hTable.put(newArrayListWithCapacity);
        this.hTable.flushCommits();
    }

    @Override // co.cask.tigon.data.transaction.queue.AbstractQueueConsumer
    protected void undoState(Set<byte[]> set, byte[] bArr) throws IOException, InterruptedException {
        if (set.isEmpty()) {
            return;
        }
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(set.size());
        Iterator<byte[]> it = set.iterator();
        while (it.hasNext()) {
            Delete delete = new Delete(HBaseQueueAdmin.ROW_KEY_DISTRIBUTOR.getDistributedKey(it.next()));
            delete.deleteColumn(QueueEntryRow.COLUMN_FAMILY, bArr);
            newArrayListWithCapacity.add(delete);
        }
        this.hTable.batch(newArrayListWithCapacity);
        this.hTable.flushCommits();
    }

    @Override // co.cask.tigon.data.transaction.queue.AbstractQueueConsumer
    protected QueueScanner getScanner(byte[] bArr, byte[] bArr2, int i) throws IOException {
        Scan createScan = createScan(bArr, bArr2, i);
        DequeueScanAttributes.setQueueRowPrefix(createScan, getQueueName());
        DequeueScanAttributes.set(createScan, getConfig());
        DequeueScanAttributes.set(createScan, this.transaction);
        return new HBaseQueueScanner(DistributedScanner.create(this.hTable, createScan, HBaseQueueAdmin.ROW_KEY_DISTRIBUTOR, this.scansExecutor), i);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.closed) {
            return;
        }
        try {
            this.stateStore.saveState(new HBaseConsumerState(this.startRow, getConfig().getGroupId(), getConfig().getInstanceId()));
            this.scansExecutor.shutdownNow();
            this.hTable.close();
            this.closed = true;
        } catch (Throwable th) {
            this.scansExecutor.shutdownNow();
            this.hTable.close();
            this.closed = true;
            throw th;
        }
    }

    @Override // co.cask.tigon.data.transaction.queue.AbstractQueueConsumer
    public void postTxCommit() {
        super.postTxCommit();
        if (this.commitCount >= PERSIST_START_ROW_LIMIT) {
            try {
                this.stateStore.saveState(new HBaseConsumerState(this.startRow, getConfig().getGroupId(), getConfig().getInstanceId()));
                this.commitCount = 0;
            } catch (IOException e) {
                LOG.error("Failed to persist start row to HBase.", e);
            }
        }
    }

    protected abstract Scan createScan(byte[] bArr, byte[] bArr2, int i);
}
