package cz.o2.proxima.direct.blob;

import cz.o2.proxima.direct.batch.BatchLogObserver;
import cz.o2.proxima.direct.batch.BatchLogObservers;
import cz.o2.proxima.direct.batch.BatchLogReader;
import cz.o2.proxima.direct.batch.ObserveHandle;
import cz.o2.proxima.direct.batch.Offset;
import cz.o2.proxima.direct.batch.TerminationContext;
import cz.o2.proxima.direct.blob.BlobBase;
import cz.o2.proxima.direct.blob.BlobPath;
import cz.o2.proxima.direct.bulk.FileFormat;
import cz.o2.proxima.direct.bulk.FileSystem;
import cz.o2.proxima.direct.bulk.NamingConvention;
import cz.o2.proxima.direct.bulk.Reader;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.internal.shaded.com.google.common.base.MoreObjects;
import cz.o2.proxima.internal.shaded.com.google.common.base.Preconditions;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.util.Pair;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/direct/blob/BlobLogReader.class */
public abstract class BlobLogReader<BlobT extends BlobBase, BlobPathT extends BlobPath<BlobT>> implements BatchLogReader {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(BlobLogReader.class);
    private final EntityDescriptor entity;
    private final FileSystem fs;
    private final FileFormat fileFormat;
    private final NamingConvention namingConvention;
    private final long partitionMinSize;
    private final int partitionMaxNumBlobs;
    private final long partitionMaxTimeSpan;
    private final ExecutorService executor;
    private final BlobStorageAccessor accessor;
    private final Context context;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/o2/proxima/direct/blob/BlobLogReader$BulkStoragePartition.class */
    public static class BulkStoragePartition<BlobT extends BlobBase> implements Partition {
        private static final long serialVersionUID = 1;
        private final List<BlobT> blobs = new ArrayList();
        private final int id;
        private long minStamp;
        private long maxStamp;
        private long size;

        BulkStoragePartition(int i, long j, long j2) {
            this.id = i;
            this.minStamp = j;
            this.maxStamp = j2;
        }

        void add(BlobT blobt, long j, long j2) {
            this.blobs.add(blobt);
            this.size += getSize(blobt);
            this.minStamp = Math.min(this.minStamp, j);
            this.maxStamp = Math.max(this.maxStamp, j2);
        }

        private long getSize(BlobT blobt) {
            return blobt.getSize();
        }

        public int getId() {
            return this.id;
        }

        public boolean isBounded() {
            return true;
        }

        public long size() {
            return this.size;
        }

        public int getNumBlobs() {
            return this.blobs.size();
        }

        public long getMinTimestamp() {
            return this.minStamp;
        }

        public long getMaxTimestamp() {
            return this.maxStamp;
        }

        public String toString() {
            return MoreObjects.toStringHelper(BulkStoragePartition.class).add("id", getId()).add("size", size()).add("minTimestamp", getMinTimestamp()).add("maxTimestamp", getMaxTimestamp()).add("blobs.size()", this.blobs.size()).toString();
        }

        @Generated
        public List<BlobT> getBlobs() {
            return this.blobs;
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:cz/o2/proxima/direct/blob/BlobLogReader$ThrowingRunnable.class */
    public interface ThrowingRunnable extends Serializable {
        void run() throws Exception;
    }

    protected BlobLogReader(BlobStorageAccessor blobStorageAccessor, Context context) {
        this.entity = blobStorageAccessor.getEntityDescriptor();
        this.fs = blobStorageAccessor.getTargetFileSystem();
        this.fileFormat = blobStorageAccessor.getFileFormat();
        this.namingConvention = blobStorageAccessor.getNamingConvention();
        this.partitionMinSize = blobStorageAccessor.getPartitionMinSize();
        this.partitionMaxNumBlobs = blobStorageAccessor.getPartitionMaxNumBlobs();
        this.partitionMaxTimeSpan = blobStorageAccessor.getPartitionMaxTimeSpanMs();
        this.executor = context.getExecutorService();
        this.context = context;
        this.accessor = blobStorageAccessor;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public List<Partition> getPartitions(long j, long j2) {
        ArrayList arrayList = new ArrayList();
        AtomicInteger atomicInteger = new AtomicInteger();
        BulkStoragePartition bulkStoragePartition = null;
        Iterator it = this.fs.list(j, j2).sorted().iterator();
        while (it.hasNext()) {
            bulkStoragePartition = considerBlobForPartitionInclusion(((BlobPath) it.next()).getBlob(), atomicInteger, bulkStoragePartition, arrayList);
        }
        if (bulkStoragePartition != null) {
            arrayList.add(bulkStoragePartition);
        }
        return arrayList;
    }

    @Nullable
    private BulkStoragePartition<BlobT> considerBlobForPartitionInclusion(BlobT blobt, AtomicInteger atomicInteger, @Nullable BulkStoragePartition<BlobT> bulkStoragePartition, List<Partition> list) {
        log.trace("Considering blob {} for partition inclusion", blobt.getName());
        Pair parseMinMaxTimestamp = this.namingConvention.parseMinMaxTimestamp(blobt.getName());
        BulkStoragePartition<BlobT> bulkStoragePartition2 = bulkStoragePartition;
        if (this.partitionMaxTimeSpan > 0 && bulkStoragePartition != null && Math.max(((Long) parseMinMaxTimestamp.getSecond()).longValue() - bulkStoragePartition.getMinTimestamp(), bulkStoragePartition.getMaxTimestamp() - ((Long) parseMinMaxTimestamp.getFirst()).longValue()) > this.partitionMaxTimeSpan) {
            log.debug("Closing partition {} due to max time span {} reached", bulkStoragePartition, Long.valueOf(this.partitionMaxTimeSpan));
            list.add(bulkStoragePartition);
            bulkStoragePartition2 = null;
        }
        if (bulkStoragePartition2 == null) {
            bulkStoragePartition2 = new BulkStoragePartition<>(atomicInteger.getAndIncrement(), ((Long) parseMinMaxTimestamp.getFirst()).longValue(), ((Long) parseMinMaxTimestamp.getSecond()).longValue());
        }
        bulkStoragePartition2.add(blobt, ((Long) parseMinMaxTimestamp.getFirst()).longValue(), ((Long) parseMinMaxTimestamp.getSecond()).longValue());
        log.trace("Blob {} added to partition {}", blobt.getName(), bulkStoragePartition2);
        if (bulkStoragePartition2.size() < this.partitionMinSize && bulkStoragePartition2.getNumBlobs() < this.partitionMaxNumBlobs) {
            return bulkStoragePartition2;
        }
        list.add(bulkStoragePartition2);
        return null;
    }

    public ObserveHandle observe(List<Partition> list, List<AttributeDescriptor<?>> list2, BatchLogObserver batchLogObserver) {
        TerminationContext terminationContext = new TerminationContext(batchLogObserver);
        observeInternal(list, list2, batchLogObserver, terminationContext);
        return terminationContext.asObserveHandle();
    }

    private void observeInternal(List<Partition> list, List<AttributeDescriptor<?>> list2, BatchLogObserver batchLogObserver, TerminationContext terminationContext) {
        Preconditions.checkArgument(list.stream().map((v0) -> {
            return v0.getId();
        }).distinct().count() == ((long) list.size()), "Passed partitions must be unique, got partitions %s", list);
        this.executor.submit(() -> {
            terminationContext.setRunningThread();
            try {
                HashSet hashSet = new HashSet(list2);
                AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                Iterator it = list.iterator();
                while (it.hasNext() && !terminationContext.isCancelled() && !atomicBoolean.get()) {
                    processSinglePartition((Partition) it.next(), hashSet, terminationContext, atomicBoolean, batchLogObserver);
                }
                terminationContext.finished();
            } catch (Throwable th) {
                terminationContext.handleErrorCaught(th, () -> {
                    log.info("Restarting processing by request");
                    observeInternal(list, list2, batchLogObserver, terminationContext);
                });
            }
        });
    }

    private void processSinglePartition(Partition partition, Set<AttributeDescriptor<?>> set, TerminationContext terminationContext, AtomicBoolean atomicBoolean, BatchLogObserver batchLogObserver) {
        for (BlobT blobt : ((BulkStoragePartition) partition).getBlobs()) {
            if (terminationContext.isCancelled() || atomicBoolean.get()) {
                return;
            }
            try {
                runHandlingErrors(blobt, () -> {
                    Partition partition2 = partition;
                    log.info("Starting to observe {} from partition {}", blobt, partition2);
                    Reader openReader = this.fileFormat.openReader(createPath(blobt), this.entity);
                    Throwable th = null;
                    try {
                        try {
                            long j = 0;
                            Iterator it = openReader.iterator();
                            Partition partition3 = partition2;
                            while (!terminationContext.isCancelled() && !atomicBoolean.get() && it.hasNext()) {
                                StreamElement streamElement = (StreamElement) it.next();
                                long j2 = j;
                                long j3 = partition3;
                                ?? r3 = 1;
                                j = j3 + 1;
                                Offset.SimpleOffset of = Offset.of(partition, j2, !it.hasNext());
                                if (set.contains(streamElement.getAttributeDescriptor())) {
                                    r3 = of;
                                    partition.getClass();
                                    if (!batchLogObserver.onNext(streamElement, BatchLogObservers.withWatermarkSupplier(partition, (Offset) r3, partition::getMinTimestamp))) {
                                        atomicBoolean.set(true);
                                    }
                                }
                                partition3 = r3;
                            }
                            if (openReader != null) {
                                if (0 == 0) {
                                    openReader.close();
                                    return;
                                }
                                try {
                                    openReader.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                        } catch (Throwable th3) {
                            th = th3;
                            throw th3;
                        }
                    } catch (Throwable th4) {
                        if (openReader != null) {
                            if (th != null) {
                                try {
                                    openReader.close();
                                } catch (Throwable th5) {
                                    th.addSuppressed(th5);
                                }
                            } else {
                                openReader.close();
                            }
                        }
                        throw th4;
                    }
                });
            } catch (Exception e) {
                throw new IllegalStateException(String.format("Failed to read from %s", blobt), e);
            }
        }
    }

    protected abstract void runHandlingErrors(BlobT blobt, ThrowingRunnable throwingRunnable) throws Exception;

    protected abstract BlobPath<BlobT> createPath(BlobT blobt);

    @Generated
    public BlobStorageAccessor getAccessor() {
        return this.accessor;
    }

    @Generated
    public Context getContext() {
        return this.context;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -121458158:
                if (implMethodName.equals("lambda$processSinglePartition$b4ae9411$1")) {
                    z = false;
                    break;
                }
                break;
            case 732770650:
                if (implMethodName.equals("getMinTimestamp")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/direct/blob/BlobLogReader$ThrowingRunnable") && serializedLambda.getFunctionalInterfaceMethodName().equals("run") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/blob/BlobLogReader") && serializedLambda.getImplMethodSignature().equals("(Lcz/o2/proxima/direct/blob/BlobBase;Lcz/o2/proxima/storage/Partition;Lcz/o2/proxima/direct/batch/TerminationContext;Ljava/util/concurrent/atomic/AtomicBoolean;Ljava/util/Set;Lcz/o2/proxima/direct/batch/BatchLogObserver;)V")) {
                    BlobLogReader blobLogReader = (BlobLogReader) serializedLambda.getCapturedArg(0);
                    BlobBase blobBase = (BlobBase) serializedLambda.getCapturedArg(1);
                    Partition partition = (Partition) serializedLambda.getCapturedArg(2);
                    TerminationContext terminationContext = (TerminationContext) serializedLambda.getCapturedArg(3);
                    AtomicBoolean atomicBoolean = (AtomicBoolean) serializedLambda.getCapturedArg(4);
                    Set set = (Set) serializedLambda.getCapturedArg(5);
                    BatchLogObserver batchLogObserver = (BatchLogObserver) serializedLambda.getCapturedArg(6);
                    return () -> {
                        Partition partition2 = partition;
                        log.info("Starting to observe {} from partition {}", blobBase, partition2);
                        Reader openReader = this.fileFormat.openReader(createPath(blobBase), this.entity);
                        Throwable th = null;
                        try {
                            try {
                                long j = 0;
                                Iterator it = openReader.iterator();
                                Partition partition3 = partition2;
                                while (!terminationContext.isCancelled() && !atomicBoolean.get() && it.hasNext()) {
                                    StreamElement streamElement = (StreamElement) it.next();
                                    long j2 = j;
                                    long j3 = partition3;
                                    ?? r3 = 1;
                                    j = j3 + 1;
                                    Offset.SimpleOffset of = Offset.of(partition, j2, !it.hasNext());
                                    if (set.contains(streamElement.getAttributeDescriptor())) {
                                        r3 = of;
                                        partition.getClass();
                                        if (!batchLogObserver.onNext(streamElement, BatchLogObservers.withWatermarkSupplier(partition, (Offset) r3, partition::getMinTimestamp))) {
                                            atomicBoolean.set(true);
                                        }
                                    }
                                    partition3 = r3;
                                }
                                if (openReader != null) {
                                    if (0 == 0) {
                                        openReader.close();
                                        return;
                                    }
                                    try {
                                        openReader.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                }
                            } catch (Throwable th3) {
                                th = th3;
                                throw th3;
                            }
                        } catch (Throwable th4) {
                            if (openReader != null) {
                                if (th != null) {
                                    try {
                                        openReader.close();
                                    } catch (Throwable th5) {
                                        th.addSuppressed(th5);
                                    }
                                } else {
                                    openReader.close();
                                }
                            }
                            throw th4;
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/time/WatermarkSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("getWatermark") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()J") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    Partition partition2 = (Partition) serializedLambda.getCapturedArg(0);
                    return partition2::getMinTimestamp;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
