package cz.o2.proxima.flink.core;

import cz.o2.proxima.direct.LogObserver;
import cz.o2.proxima.direct.LogObserver.OnNextContext;
import cz.o2.proxima.flink.core.AbstractSourceLogObserver;
import cz.o2.proxima.internal.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.RepositoryFactory;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.util.ExceptionUtils;
import java.io.Closeable;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.Generated;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/flink/core/AbstractLogSourceFunction.class */
abstract class AbstractLogSourceFunction<ReaderT, ObserverT extends AbstractSourceLogObserver<OffsetT, ContextT, OutputT>, OffsetT extends Serializable, ContextT extends LogObserver.OnNextContext<OffsetT>, OutputT> extends RichParallelSourceFunction<OutputT> implements CheckpointListener, CheckpointedFunction {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(AbstractLogSourceFunction.class);
    private static final String OFFSETS_STATE_NAME = "offsets";
    private final RepositoryFactory repositoryFactory;
    private final List<AttributeDescriptor<?>> attributeDescriptors;
    private final ResultExtractor<OutputT> resultExtractor;

    @Nullable
    private transient List<OffsetT> restoredOffsets;

    @Nullable
    private transient ListState<OffsetT> persistedOffsets;

    @Nullable
    private volatile transient UnifiedObserveHandle<OffsetT> observeHandle;
    private volatile transient CountDownLatch running;
    private volatile transient CountDownLatch cancelled;

    /* loaded from: input_file:cz/o2/proxima/flink/core/AbstractLogSourceFunction$UnifiedObserveHandle.class */
    public interface UnifiedObserveHandle<OffsetT> extends Closeable {
        List<OffsetT> getConsumedOffsets();

        @Override // java.io.Closeable, java.lang.AutoCloseable
        void close();
    }

    private static int getSubtaskIndex(Partition partition, int i) {
        return partition.getId() % i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractLogSourceFunction(RepositoryFactory repositoryFactory, List<AttributeDescriptor<?>> list, ResultExtractor<OutputT> resultExtractor) {
        this.repositoryFactory = repositoryFactory;
        this.attributeDescriptors = list;
        this.resultExtractor = resultExtractor;
    }

    public void open(Configuration configuration) {
        this.running = new CountDownLatch(1);
        this.cancelled = new CountDownLatch(1);
    }

    abstract ReaderT createLogReader(List<AttributeDescriptor<?>> list);

    abstract List<Partition> getPartitions(ReaderT readert);

    abstract Partition getOffsetPartition(OffsetT offsett);

    abstract Set<Partition> getSkipFirstElementFromPartitions(List<OffsetT> list);

    abstract ObserverT createLogObserver(SourceFunction.SourceContext<OutputT> sourceContext, ResultExtractor<OutputT> resultExtractor, Set<Partition> set);

    abstract UnifiedObserveHandle<OffsetT> observePartitions(ReaderT readert, List<Partition> list, List<AttributeDescriptor<?>> list2, ObserverT observert);

    abstract UnifiedObserveHandle<OffsetT> observeRestoredOffsets(ReaderT readert, List<OffsetT> list, List<AttributeDescriptor<?>> list2, ObserverT observert);

    public void run(SourceFunction.SourceContext<OutputT> sourceContext) throws Exception {
        ObserverT createLogObserver;
        ReaderT createLogReader = createLogReader(this.attributeDescriptors);
        List<Partition> list = (List) getPartitions(createLogReader).stream().filter(partition -> {
            return getSubtaskIndex(partition, getRuntimeContext().getNumberOfParallelSubtasks()) == getRuntimeContext().getIndexOfThisSubtask();
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            this.running.countDown();
            finishAndMarkAsIdle(sourceContext);
            return;
        }
        if (this.restoredOffsets != null) {
            List<OffsetT> list2 = (List) this.restoredOffsets.stream().filter(serializable -> {
                return getSubtaskIndex(getOffsetPartition(serializable), getRuntimeContext().getNumberOfParallelSubtasks()) == getRuntimeContext().getIndexOfThisSubtask();
            }).collect(Collectors.toList());
            createLogObserver = createLogObserver(sourceContext, this.resultExtractor, getSkipFirstElementFromPartitions(list2));
            this.observeHandle = observeRestoredOffsets(createLogReader, list2, this.attributeDescriptors, createLogObserver);
        } else {
            createLogObserver = createLogObserver(sourceContext, this.resultExtractor, Collections.emptySet());
            this.observeHandle = observePartitions(createLogReader, list, this.attributeDescriptors, createLogObserver);
        }
        log.info("Source [{}]: RUNNING", this);
        this.running.countDown();
        createLogObserver.awaitCompleted();
        Optional<Throwable> error = createLogObserver.getError();
        if (!error.isPresent()) {
            finishAndMarkAsIdle(sourceContext);
        } else {
            log.error("Source [{}]: FAILED", this, error.get());
            ExceptionUtils.rethrowAsIllegalStateException(error.get());
        }
    }

    @VisibleForTesting
    void finishAndMarkAsIdle(SourceFunction.SourceContext<?> sourceContext) {
        log.info("Source [{}]: COMPLETED", this);
        synchronized (sourceContext.getCheckpointLock()) {
            sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
        }
        sourceContext.markAsTemporarilyIdle();
        while (this.cancelled.getCount() > 0) {
            try {
                this.cancelled.await();
            } catch (InterruptedException e) {
                if (this.cancelled.getCount() == 0) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    public void cancel() {
        this.cancelled.countDown();
    }

    public void close() {
        if (this.observeHandle != null) {
            ((UnifiedObserveHandle) Objects.requireNonNull(this.observeHandle)).close();
        }
    }

    public void notifyCheckpointComplete(long j) {
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        ((ListState) Objects.requireNonNull(this.persistedOffsets)).clear();
        if (this.observeHandle != null) {
            Iterator<OffsetT> it = this.observeHandle.getConsumedOffsets().iterator();
            while (it.hasNext()) {
                this.persistedOffsets.add(it.next());
            }
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        this.persistedOffsets = functionInitializationContext.getOperatorStateStore().getUnionListState(new ListStateDescriptor(OFFSETS_STATE_NAME, new JavaSerializer()));
        if (!functionInitializationContext.isRestored()) {
            log.info("BatchLog subtask {} has no state to restore.", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
            return;
        }
        this.restoredOffsets = new ArrayList();
        Iterable iterable = (Iterable) ((ListState) Objects.requireNonNull(this.persistedOffsets)).get();
        List<OffsetT> list = this.restoredOffsets;
        Objects.requireNonNull(list);
        iterable.forEach((v1) -> {
            r1.add(v1);
        });
        log.info("BatchLog subtask {} restored state: {}.", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), this.restoredOffsets);
    }

    @VisibleForTesting
    void awaitRunning() throws InterruptedException {
        this.running.await();
    }

    @Generated
    public RepositoryFactory getRepositoryFactory() {
        return this.repositoryFactory;
    }
}
