package cz.o2.proxima.flink.core;

import cz.o2.proxima.direct.commitlog.CommitLogObserver;
import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.commitlog.ObserveHandle;
import cz.o2.proxima.direct.commitlog.Offset;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.flink.core.AbstractLogSourceFunction;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.RepositoryFactory;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.commitlog.Position;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/flink/core/CommitLogSourceFunction.class */
public class CommitLogSourceFunction<OutputT> extends AbstractLogSourceFunction<CommitLogReader, LogObserver<OutputT>, Offset, CommitLogObserver.OnNextContext, OutputT> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(CommitLogSourceFunction.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cz/o2/proxima/flink/core/CommitLogSourceFunction$LogObserver.class */
    public static class LogObserver<OutputT> extends AbstractSourceLogObserver<Offset, CommitLogObserver.OnNextContext, OutputT> implements CommitLogObserver {
        LogObserver(SourceFunction.SourceContext<OutputT> sourceContext, ResultExtractor<OutputT> resultExtractor, Set<Partition> set) {
            super(sourceContext, resultExtractor, set);
        }

        public void onIdle(CommitLogObserver.OnIdleContext onIdleContext) {
            maybeUpdateWatermark(onIdleContext.getWatermark());
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // cz.o2.proxima.flink.core.AbstractSourceLogObserver
        public void markOffsetAsConsumed(CommitLogObserver.OnNextContext onNextContext) {
        }
    }

    public CommitLogSourceFunction(RepositoryFactory repositoryFactory, List<AttributeDescriptor<?>> list, ResultExtractor<OutputT> resultExtractor) {
        super(repositoryFactory, list, resultExtractor);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    CommitLogReader createLogReader(List<AttributeDescriptor<?>> list) {
        return (CommitLogReader) getRepositoryFactory().apply().getOrCreateOperator(DirectDataOperator.class, new Consumer[0]).getCommitLogReader(list).orElseThrow(() -> {
            return new IllegalStateException(String.format("Unable to find commit log reader for [%s].", list));
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    public List<Partition> getPartitions(CommitLogReader commitLogReader) {
        return commitLogReader.getPartitions();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    public Partition getOffsetPartition(Offset offset) {
        return offset.getPartition();
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    Set<Partition> getSkipFirstElementFromPartitions(List<Offset> list) {
        return (Set) list.stream().map((v0) -> {
            return v0.getPartition();
        }).collect(Collectors.toSet());
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    LogObserver<OutputT> createLogObserver(SourceFunction.SourceContext<OutputT> sourceContext, ResultExtractor<OutputT> resultExtractor, Set<Partition> set) {
        return new LogObserver<>(sourceContext, resultExtractor, set);
    }

    AbstractLogSourceFunction.UnifiedObserveHandle<Offset> observePartitions(CommitLogReader commitLogReader, List<Partition> list, List<AttributeDescriptor<?>> list2, LogObserver<OutputT> logObserver) {
        final ObserveHandle observeBulkPartitions = commitLogReader.observeBulkPartitions(list, Position.OLDEST, false, logObserver);
        return new AbstractLogSourceFunction.UnifiedObserveHandle<Offset>() { // from class: cz.o2.proxima.flink.core.CommitLogSourceFunction.1
            @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction.UnifiedObserveHandle
            public List<Offset> getConsumedOffsets() {
                return observeBulkPartitions.getCurrentOffsets();
            }

            @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction.UnifiedObserveHandle, java.io.Closeable, java.lang.AutoCloseable
            public void close() {
                observeBulkPartitions.close();
            }
        };
    }

    AbstractLogSourceFunction.UnifiedObserveHandle<Offset> observeRestoredOffsets(CommitLogReader commitLogReader, List<Offset> list, List<AttributeDescriptor<?>> list2, LogObserver<OutputT> logObserver) {
        final ObserveHandle observeBulkOffsets = commitLogReader.observeBulkOffsets(list, false, logObserver);
        return new AbstractLogSourceFunction.UnifiedObserveHandle<Offset>() { // from class: cz.o2.proxima.flink.core.CommitLogSourceFunction.2
            @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction.UnifiedObserveHandle
            public List<Offset> getConsumedOffsets() {
                return observeBulkOffsets.getCurrentOffsets();
            }

            @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction.UnifiedObserveHandle, java.io.Closeable, java.lang.AutoCloseable
            public void close() {
                observeBulkOffsets.close();
            }
        };
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    @Generated
    public /* bridge */ /* synthetic */ RepositoryFactory getRepositoryFactory() {
        return super.getRepositoryFactory();
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    public /* bridge */ /* synthetic */ void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        super.initializeState(functionInitializationContext);
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    public /* bridge */ /* synthetic */ void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        super.snapshotState(functionSnapshotContext);
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    public /* bridge */ /* synthetic */ void notifyCheckpointComplete(long j) {
        super.notifyCheckpointComplete(j);
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    public /* bridge */ /* synthetic */ void close() {
        super.close();
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    public /* bridge */ /* synthetic */ void cancel() {
        super.cancel();
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    public /* bridge */ /* synthetic */ void run(SourceFunction.SourceContext sourceContext) throws Exception {
        super.run(sourceContext);
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    /* bridge */ /* synthetic */ AbstractLogSourceFunction.UnifiedObserveHandle<Offset> observeRestoredOffsets(CommitLogReader commitLogReader, List<Offset> list, List list2, AbstractSourceLogObserver abstractSourceLogObserver) {
        return observeRestoredOffsets(commitLogReader, list, (List<AttributeDescriptor<?>>) list2, (LogObserver) abstractSourceLogObserver);
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    /* bridge */ /* synthetic */ AbstractLogSourceFunction.UnifiedObserveHandle<Offset> observePartitions(CommitLogReader commitLogReader, List list, List list2, AbstractSourceLogObserver abstractSourceLogObserver) {
        return observePartitions(commitLogReader, (List<Partition>) list, (List<AttributeDescriptor<?>>) list2, (LogObserver) abstractSourceLogObserver);
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    /* bridge */ /* synthetic */ AbstractSourceLogObserver createLogObserver(SourceFunction.SourceContext sourceContext, ResultExtractor resultExtractor, Set set) {
        return createLogObserver(sourceContext, resultExtractor, (Set<Partition>) set);
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    /* bridge */ /* synthetic */ CommitLogReader createLogReader(List list) {
        return createLogReader((List<AttributeDescriptor<?>>) list);
    }

    @Override // cz.o2.proxima.flink.core.AbstractLogSourceFunction
    public /* bridge */ /* synthetic */ void open(Configuration configuration) {
        super.open(configuration);
    }
}
