package cz.o2.proxima.storage.hbase;

import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.batch.BatchLogObservable;
import cz.o2.proxima.storage.batch.BatchLogObserver;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/storage/hbase/HBaseLogObservable.class */
class HBaseLogObservable extends HBaseClientWrapper implements BatchLogObservable {
    private static final Logger log = LoggerFactory.getLogger(HBaseLogObservable.class);
    private static Charset UTF8 = Charset.forName("UTF-8");
    private final EntityDescriptor entity;
    private final Executor executor;

    public HBaseLogObservable(URI uri, Configuration configuration, Map<String, Object> map, EntityDescriptor entityDescriptor, Executor executor) {
        super(uri, configuration, map);
        this.entity = entityDescriptor;
        this.executor = executor;
    }

    public List<Partition> getPartitions(long j, long j2) {
        try {
            ensureClient();
            ArrayList arrayList = new ArrayList();
            byte[][] endKeys = this.conn.getRegionLocator(this.table).getEndKeys();
            byte[] bArr = new byte[0];
            if (j < 0) {
                j = 0;
            }
            for (int i = 0; i < endKeys.length; i++) {
                arrayList.add(new HBasePartition(i, bArr, endKeys[i], j, j2));
                bArr = endKeys[i];
            }
            return arrayList;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void observe(List<Partition> list, List<AttributeDescriptor<?>> list2, BatchLogObserver batchLogObserver) {
        this.executor.execute(() -> {
            Iterator it;
            Result next;
            ensureClient();
            try {
                it = list.iterator();
            } catch (Throwable th) {
                log.warn("Failed to observe partitions {}", list, th);
                batchLogObserver.onError(th);
                return;
            }
            loop0: while (true) {
                if (!it.hasNext()) {
                    break;
                }
                HBasePartition hBasePartition = (HBasePartition) ((Partition) it.next());
                Scan scan = new Scan(hBasePartition.getStartKey(), hBasePartition.getEndKey());
                scan.addFamily(this.family);
                scan.setTimeRange(hBasePartition.getStartStamp(), hBasePartition.getEndStamp());
                scan.setFilter(toFilter(list2));
                ResultScanner scanner = this.client.getScanner(scan);
                Throwable th2 = null;
                do {
                    try {
                        try {
                            next = scanner.next();
                            if (next == null || Thread.currentThread().isInterrupted()) {
                                if (scanner != null) {
                                    if (0 != 0) {
                                        try {
                                            scanner.close();
                                        } catch (Throwable th3) {
                                            th2.addSuppressed(th3);
                                        }
                                    } else {
                                        scanner.close();
                                    }
                                }
                            }
                        } finally {
                        }
                    } catch (Throwable th4) {
                        th2 = th4;
                        throw th4;
                    }
                    log.warn("Failed to observe partitions {}", list, th);
                    batchLogObserver.onError(th);
                    return;
                } while (consume(next, list2, hBasePartition, batchLogObserver));
                if (scanner != null) {
                    if (0 != 0) {
                        try {
                            scanner.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        scanner.close();
                    }
                }
            }
            batchLogObserver.onCompleted();
        });
    }

    private boolean consume(Result result, List<AttributeDescriptor<?>> list, HBasePartition hBasePartition, BatchLogObserver batchLogObserver) throws IOException {
        CellScanner cellScanner = result.cellScanner();
        while (cellScanner.advance()) {
            if (!batchLogObserver.onNext(toStreamElement(cellScanner.current(), list, hBasePartition), hBasePartition)) {
                return false;
            }
        }
        return true;
    }

    private StreamElement toStreamElement(Cell cell, List<AttributeDescriptor<?>> list, HBasePartition hBasePartition) {
        String str = new String(cell.getRowArray(), cell.getRowOffset(), (int) cell.getRowLength());
        String str2 = new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
        for (AttributeDescriptor<?> attributeDescriptor : list) {
            if (str2.startsWith(attributeDescriptor.toAttributePrefix())) {
                return StreamElement.update(this.entity, attributeDescriptor, new String(hBasePartition.getStartKey()) + "#" + cell.getSequenceId(), str, str2, cell.getTimestamp(), cell.getValue());
            }
        }
        throw new IllegalStateException("Illegal state! Fix code!");
    }

    private Filter toFilter(List<AttributeDescriptor<?>> list) {
        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
        list.forEach(attributeDescriptor -> {
            if (attributeDescriptor.isWildcard()) {
                filterList.addFilter(new ColumnPrefixFilter(attributeDescriptor.toAttributePrefix().getBytes(UTF8)));
            } else {
                filterList.addFilter(new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(attributeDescriptor.getName().getBytes(UTF8))));
            }
        });
        return filterList;
    }
}
