package cz.o2.proxima.direct.hadoop;

import cz.o2.proxima.direct.batch.BatchLogObserver;
import cz.o2.proxima.direct.batch.BatchLogObservers;
import cz.o2.proxima.direct.batch.BatchLogReader;
import cz.o2.proxima.direct.batch.ObserveHandle;
import cz.o2.proxima.direct.batch.Offset;
import cz.o2.proxima.direct.batch.TerminationContext;
import cz.o2.proxima.direct.bulk.Path;
import cz.o2.proxima.direct.bulk.Reader;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/direct/hadoop/HadoopBatchLogReader.class */
public class HadoopBatchLogReader implements BatchLogReader {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(HadoopBatchLogReader.class);
    private final HadoopDataAccessor accessor;
    private final Context context;
    private final ExecutorService executor;

    public HadoopBatchLogReader(HadoopDataAccessor hadoopDataAccessor, Context context) {
        this.accessor = hadoopDataAccessor;
        this.context = context;
        this.executor = context.getExecutorService();
    }

    public List<Partition> getPartitions(long j, long j2) {
        ArrayList arrayList = new ArrayList();
        HadoopFileSystem hadoopFs = this.accessor.getHadoopFs();
        long batchProcessSize = this.accessor.getBatchProcessSize();
        Stream<Path> list = hadoopFs.list(j, j2);
        AtomicReference atomicReference = new AtomicReference();
        list.filter(hadoopPath -> {
            return !hadoopPath.isTmpPath();
        }).forEach(hadoopPath2 -> {
            if (atomicReference.get() == null) {
                atomicReference.set(new HadoopPartition(arrayList.size()));
                arrayList.add(atomicReference.get());
            }
            HadoopPartition hadoopPartition = (HadoopPartition) atomicReference.get();
            hadoopPartition.add(hadoopPath2);
            if (hadoopPartition.size() > batchProcessSize) {
                atomicReference.set(null);
            }
        });
        return arrayList;
    }

    public ObserveHandle observe(List<Partition> list, List<AttributeDescriptor<?>> list2, BatchLogObserver batchLogObserver) {
        TerminationContext terminationContext = new TerminationContext(batchLogObserver);
        observeInternal(list, list2, batchLogObserver, terminationContext);
        return terminationContext;
    }

    private void observeInternal(List<Partition> list, List<AttributeDescriptor<?>> list2, BatchLogObserver batchLogObserver, TerminationContext terminationContext) {
        this.executor.submit(() -> {
            try {
                Iterator it = list.iterator();
                loop0: while (it.hasNext()) {
                    HadoopPartition hadoopPartition = (HadoopPartition) it.next();
                    Iterator<HadoopPath> it2 = hadoopPartition.getPaths().iterator();
                    while (it2.hasNext()) {
                        if (!processPath(batchLogObserver, hadoopPartition.getMinTimestamp(), hadoopPartition, it2.next(), terminationContext)) {
                            break loop0;
                        }
                    }
                }
                terminationContext.finished();
            } catch (Throwable th) {
                terminationContext.handleErrorCaught(th, () -> {
                    log.info("Restarting processing by request");
                    observeInternal(list, list2, batchLogObserver, terminationContext);
                });
            }
        });
    }

    public BatchLogReader.Factory<?> asFactory() {
        HadoopDataAccessor hadoopDataAccessor = this.accessor;
        Context context = this.context;
        return repository -> {
            return new HadoopBatchLogReader(hadoopDataAccessor, context);
        };
    }

    /* JADX WARN: Multi-variable type inference failed */
    private boolean processPath(BatchLogObserver batchLogObserver, long j, HadoopPartition hadoopPartition, HadoopPath hadoopPath, TerminationContext terminationContext) {
        try {
            Reader openReader = this.accessor.getFormat().openReader(hadoopPath, this.accessor.getEntityDesc());
            Throwable th = null;
            try {
                try {
                    long j2 = 0;
                    Iterator it = openReader.iterator();
                    while (it.hasNext()) {
                        StreamElement streamElement = (StreamElement) it.next();
                        long j3 = j2;
                        j2 = hadoopPartition + 1;
                        Offset.SimpleOffset of = Offset.of(hadoopPartition, j3, !it.hasNext());
                        if (terminationContext.isCancelled() || !batchLogObserver.onNext(streamElement, BatchLogObservers.withWatermark(hadoopPartition, of, j))) {
                            if (openReader != null) {
                                if (0 != 0) {
                                    try {
                                        openReader.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    openReader.close();
                                }
                            }
                            return false;
                        }
                    }
                    if (openReader != null) {
                        if (0 != 0) {
                            try {
                                openReader.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            openReader.close();
                        }
                    }
                    return true;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException("Failed to read file " + hadoopPartition, e);
        }
    }

    @Generated
    public HadoopDataAccessor getAccessor() {
        return this.accessor;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1300914341:
                if (implMethodName.equals("lambda$asFactory$3dac9f2a$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/direct/batch/BatchLogReader$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/hadoop/HadoopBatchLogReader") && serializedLambda.getImplMethodSignature().equals("(Lcz/o2/proxima/direct/hadoop/HadoopDataAccessor;Lcz/o2/proxima/direct/core/Context;Lcz/o2/proxima/repository/Repository;)Lcz/o2/proxima/direct/batch/BatchLogReader;")) {
                    HadoopDataAccessor hadoopDataAccessor = (HadoopDataAccessor) serializedLambda.getCapturedArg(0);
                    Context context = (Context) serializedLambda.getCapturedArg(1);
                    return repository -> {
                        return new HadoopBatchLogReader(hadoopDataAccessor, context);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
