package cz.o2.proxima.flink.core;

import cz.o2.proxima.direct.batch.BatchLogObserver;
import cz.o2.proxima.direct.batch.BatchLogReader;
import cz.o2.proxima.direct.batch.Offset;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.flink.core.AbstractLogSourceFunction;
import cz.o2.proxima.flink.core.batch.OffsetTrackingBatchLogReader;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.internal.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.RepositoryFactory;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/flink/core/BatchLogSourceFunction.class */
public class BatchLogSourceFunction<OutputT> extends AbstractLogSourceFunction<BatchLogReader, LogObserver<OutputT>, Offset, BatchLogObserver.OnNextContext, OutputT> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(BatchLogSourceFunction.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cz/o2/proxima/flink/core/BatchLogSourceFunction$LogObserver.class */
    public static class LogObserver<OutputT> extends AbstractSourceLogObserver<Offset, BatchLogObserver.OnNextContext, OutputT> implements BatchLogObserver {
        LogObserver(SourceFunction.SourceContext<OutputT> sourceContext, ResultExtractor<OutputT> resultExtractor, Set<Partition> set) {
            super(sourceContext, resultExtractor, set);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // cz.o2.proxima.flink.core.AbstractSourceLogObserver
        public void markOffsetAsConsumed(BatchLogObserver.OnNextContext onNextContext) {
            ((OffsetTrackingBatchLogReader.OffsetCommitter) onNextContext).markOffsetAsConsumed();
        }

        public /* bridge */ /* synthetic */ boolean onNext(StreamElement streamElement, BatchLogObserver.OnNextContext onNextContext) {
            return super.onNext(streamElement, (StreamElement) onNextContext);
        }
    }

    public BatchLogSourceFunction(RepositoryFactory repositoryFactory, List<AttributeDescriptor<?>> list, ResultExtractor<OutputT> resultExtractor) {
        super(repositoryFactory, list, resultExtractor);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    BatchLogReader createLogReader(List<AttributeDescriptor<?>> list) {
        return OffsetTrackingBatchLogReader.of((BatchLogReader) getRepositoryFactory().apply().getOrCreateOperator(DirectDataOperator.class, new Consumer[0]).getBatchLogReader(list).orElseThrow(() -> {
            return new IllegalStateException(String.format("Unable to find batch log reader for [%s].", list));
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    public List<Partition> getPartitions(BatchLogReader batchLogReader) {
        return batchLogReader.getPartitions();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    public Partition getOffsetPartition(Offset offset) {
        return offset.getPartition();
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    Set<Partition> getSkipFirstElementFromPartitions(List<Offset> list) {
        return (Set) list.stream().filter(offset -> {
            return offset.getElementIndex() >= 0;
        }).map((v0) -> {
            return v0.getPartition();
        }).collect(Collectors.toSet());
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    LogObserver<OutputT> createLogObserver(SourceFunction.SourceContext<OutputT> sourceContext, ResultExtractor<OutputT> resultExtractor, Set<Partition> set) {
        return new LogObserver<>(sourceContext, resultExtractor, set);
    }

    AbstractLogSourceFunction.UnifiedObserveHandle<Offset> observeRestoredOffsets(BatchLogReader batchLogReader, List<Offset> list, List<AttributeDescriptor<?>> list2, LogObserver<OutputT> logObserver) {
        final OffsetTrackingBatchLogReader.OffsetTrackingObserveHandle offsetTrackingObserveHandle = (OffsetTrackingBatchLogReader.OffsetTrackingObserveHandle) batchLogReader.observeOffsets(list, list2, wrapSourceObserver(logObserver));
        return new AbstractLogSourceFunction.UnifiedObserveHandle<Offset>() { // from class: cz.o2.proxima.flink.core.BatchLogSourceFunction.1
            @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction.UnifiedObserveHandle
            public List<Offset> getConsumedOffsets() {
                return (List) offsetTrackingObserveHandle.getCurrentOffsets().stream().filter(offset -> {
                    return !offset.isLast();
                }).collect(Collectors.toList());
            }

            @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction.UnifiedObserveHandle, java.io.Closeable, java.lang.AutoCloseable
            public void close() {
                offsetTrackingObserveHandle.close();
            }
        };
    }

    AbstractLogSourceFunction.UnifiedObserveHandle<Offset> observePartitions(BatchLogReader batchLogReader, List<Partition> list, List<AttributeDescriptor<?>> list2, LogObserver<OutputT> logObserver) {
        final OffsetTrackingBatchLogReader.OffsetTrackingObserveHandle offsetTrackingObserveHandle = (OffsetTrackingBatchLogReader.OffsetTrackingObserveHandle) batchLogReader.observe(list, list2, wrapSourceObserver(logObserver));
        return new AbstractLogSourceFunction.UnifiedObserveHandle<Offset>() { // from class: cz.o2.proxima.flink.core.BatchLogSourceFunction.2
            @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction.UnifiedObserveHandle
            public List<Offset> getConsumedOffsets() {
                return (List) offsetTrackingObserveHandle.getCurrentOffsets().stream().filter(offset -> {
                    return !offset.isLast();
                }).collect(Collectors.toList());
            }

            @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction.UnifiedObserveHandle, java.io.Closeable, java.lang.AutoCloseable
            public void close() {
                offsetTrackingObserveHandle.close();
            }
        };
    }

    @VisibleForTesting
    BatchLogObserver wrapSourceObserver(BatchLogObserver batchLogObserver) {
        return batchLogObserver;
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    @Generated
    public /* bridge */ /* synthetic */ RepositoryFactory getRepositoryFactory() {
        return super.getRepositoryFactory();
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    public /* bridge */ /* synthetic */ void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        super.initializeState(functionInitializationContext);
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    public /* bridge */ /* synthetic */ void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        super.snapshotState(functionSnapshotContext);
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    public /* bridge */ /* synthetic */ void notifyCheckpointComplete(long j) {
        super.notifyCheckpointComplete(j);
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    public /* bridge */ /* synthetic */ void close() {
        super.close();
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    public /* bridge */ /* synthetic */ void cancel() {
        super.cancel();
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    public /* bridge */ /* synthetic */ void run(SourceFunction.SourceContext sourceContext) throws Exception {
        super.run(sourceContext);
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    /* bridge */ /* synthetic */ AbstractLogSourceFunction.UnifiedObserveHandle<Offset> observeRestoredOffsets(BatchLogReader batchLogReader, List<Offset> list, List list2, AbstractSourceLogObserver abstractSourceLogObserver) {
        return observeRestoredOffsets(batchLogReader, list, (List<AttributeDescriptor<?>>) list2, (LogObserver) abstractSourceLogObserver);
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    /* bridge */ /* synthetic */ AbstractLogSourceFunction.UnifiedObserveHandle<Offset> observePartitions(BatchLogReader batchLogReader, List list, List list2, AbstractSourceLogObserver abstractSourceLogObserver) {
        return observePartitions(batchLogReader, (List<Partition>) list, (List<AttributeDescriptor<?>>) list2, (LogObserver) abstractSourceLogObserver);
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    /* bridge */ /* synthetic */ AbstractSourceLogObserver createLogObserver(SourceFunction.SourceContext sourceContext, ResultExtractor resultExtractor, Set set) {
        return createLogObserver(sourceContext, resultExtractor, (Set<Partition>) set);
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    /* bridge */ /* synthetic */ BatchLogReader createLogReader(List list) {
        return createLogReader((List<AttributeDescriptor<?>>) list);
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    public /* bridge */ /* synthetic */ void open(Configuration configuration) {
        super.open(configuration);
    }
}
