package co.cask.cdap.data2.transaction.stream.hbase;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.data.file.FileReader;
import co.cask.cdap.data.file.ReadFilter;
import co.cask.cdap.data.stream.StreamEventOffset;
import co.cask.cdap.data.stream.StreamFileOffset;
import co.cask.cdap.data2.queue.ConsumerConfig;
import co.cask.cdap.data2.transaction.queue.QueueConstants;
import co.cask.cdap.data2.transaction.queue.QueueEntryRow;
import co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumerFactory;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.data2.transaction.stream.StreamConfig;
import co.cask.cdap.data2.transaction.stream.StreamConsumer;
import co.cask.cdap.data2.transaction.stream.StreamConsumerState;
import co.cask.cdap.data2.transaction.stream.StreamConsumerStateStore;
import co.cask.cdap.data2.transaction.stream.StreamConsumerStateStoreFactory;
import co.cask.cdap.data2.util.TableId;
import co.cask.cdap.data2.util.hbase.HBaseTableUtil;
import co.cask.cdap.data2.util.hbase.HTableDescriptorBuilder;
import co.cask.cdap.hbase.wd.AbstractRowKeyDistributor;
import co.cask.cdap.hbase.wd.RowKeyDistributorByHashPrefix;
import co.cask.cdap.proto.id.NamespaceId;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;

/* loaded from: input_file:co/cask/cdap/data2/transaction/stream/hbase/HBaseStreamFileConsumerFactory.class */
public final class HBaseStreamFileConsumerFactory extends AbstractStreamFileConsumerFactory {
    private final HBaseTableUtil tableUtil;
    private final CConfiguration cConf;
    private final Configuration hConf;

    @Inject
    HBaseStreamFileConsumerFactory(StreamAdmin streamAdmin, StreamConsumerStateStoreFactory streamConsumerStateStoreFactory, CConfiguration cConfiguration, Configuration configuration, HBaseTableUtil hBaseTableUtil) {
        super(cConfiguration, streamAdmin, streamConsumerStateStoreFactory);
        this.hConf = configuration;
        this.cConf = cConfiguration;
        this.tableUtil = hBaseTableUtil;
    }

    @Override // co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumerFactory
    protected StreamConsumer create(TableId tableId, StreamConfig streamConfig, ConsumerConfig consumerConfig, StreamConsumerStateStore streamConsumerStateStore, StreamConsumerState streamConsumerState, FileReader<StreamEventOffset, Iterable<StreamFileOffset>> fileReader, @Nullable ReadFilter readFilter) throws IOException {
        int i = this.cConf.getInt("stream.consumer.table.presplits");
        byte[][] splitKeys = HBaseTableUtil.getSplitKeys(i, i, new RowKeyDistributorByHashPrefix(new RowKeyDistributorByHashPrefix.OneByteSimpleHash(i)));
        TableId createHTableId = this.tableUtil.createHTableId(new NamespaceId(tableId.getNamespace()), tableId.getTableName());
        HTableDescriptorBuilder buildHTableDescriptor = this.tableUtil.buildHTableDescriptor(createHTableId);
        HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(QueueEntryRow.COLUMN_FAMILY);
        hColumnDescriptor.setMaxVersions(1);
        buildHTableDescriptor.addFamily(hColumnDescriptor);
        buildHTableDescriptor.setValue(QueueConstants.DISTRIBUTOR_BUCKETS, Integer.toString(i));
        HBaseAdmin hBaseAdmin = new HBaseAdmin(this.hConf);
        Throwable th = null;
        try {
            try {
                this.tableUtil.createTableIfNotExists(hBaseAdmin, createHTableId, buildHTableDescriptor.build(), splitKeys, 5000L, TimeUnit.MILLISECONDS);
                if (hBaseAdmin != null) {
                    if (0 != 0) {
                        try {
                            hBaseAdmin.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        hBaseAdmin.close();
                    }
                }
                HTable createHTable = this.tableUtil.createHTable(this.hConf, createHTableId);
                createHTable.setWriteBufferSize(4194304L);
                createHTable.setAutoFlush(false);
                return new HBaseStreamFileConsumer(this.cConf, streamConfig, consumerConfig, this.tableUtil, createHTable, fileReader, streamConsumerStateStore, streamConsumerState, readFilter, createKeyDistributor(createHTable.getTableDescriptor()));
            } finally {
            }
        } catch (Throwable th3) {
            if (hBaseAdmin != null) {
                if (th != null) {
                    try {
                        hBaseAdmin.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hBaseAdmin.close();
                }
            }
            throw th3;
        }
    }

    @Override // co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumerFactory
    protected void dropTable(TableId tableId) throws IOException {
        HBaseAdmin hBaseAdmin = new HBaseAdmin(this.hConf);
        Throwable th = null;
        try {
            try {
                TableId createHTableId = this.tableUtil.createHTableId(new NamespaceId(tableId.getNamespace()), tableId.getTableName());
                if (this.tableUtil.tableExists(hBaseAdmin, createHTableId)) {
                    this.tableUtil.dropTable(hBaseAdmin, createHTableId);
                }
                if (hBaseAdmin != null) {
                    if (0 == 0) {
                        hBaseAdmin.close();
                        return;
                    }
                    try {
                        hBaseAdmin.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hBaseAdmin != null) {
                if (th != null) {
                    try {
                        hBaseAdmin.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hBaseAdmin.close();
                }
            }
            throw th4;
        }
    }

    private AbstractRowKeyDistributor createKeyDistributor(HTableDescriptor hTableDescriptor) {
        int i = 16;
        String value = hTableDescriptor.getValue(QueueConstants.DISTRIBUTOR_BUCKETS);
        if (value != null) {
            i = Integer.parseInt(value);
        }
        return new RowKeyDistributorByHashPrefix(new RowKeyDistributorByHashPrefix.OneByteSimpleHash(i));
    }
}
