package cz.o2.proxima.flink.core;

import cz.o2.proxima.core.storage.Partition;
import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.direct.core.LogObserver;
import cz.o2.proxima.direct.core.LogObserver.OnNextContext;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Nullable;
import lombok.Generated;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:cz/o2/proxima/flink/core/AbstractSourceLogObserver.class */
public abstract class AbstractSourceLogObserver<OffsetT extends Serializable, ContextT extends LogObserver.OnNextContext<OffsetT>, OutputT> implements LogObserver<OffsetT, ContextT> {
    private final SourceFunction.SourceContext<OutputT> sourceContext;
    private final ResultExtractor<OutputT> resultExtractor;
    private final Set<Partition> skipFirstElementFromEachPartition;
    private final CountDownLatch completed = new CountDownLatch(1);
    private final Set<Partition> seenPartitions = new HashSet();
    private long watermark = Long.MIN_VALUE;

    @Nullable
    private volatile Throwable error = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractSourceLogObserver(SourceFunction.SourceContext<OutputT> sourceContext, ResultExtractor<OutputT> resultExtractor, Set<Partition> set) {
        this.sourceContext = sourceContext;
        this.resultExtractor = resultExtractor;
        this.skipFirstElementFromEachPartition = set;
    }

    abstract void markOffsetAsConsumed(ContextT contextt);

    public boolean onError(Throwable th) {
        this.error = th;
        this.completed.countDown();
        return false;
    }

    public void onCompleted() {
        this.completed.countDown();
    }

    public void onCancelled() {
        this.completed.countDown();
    }

    public boolean onNext(StreamElement streamElement, ContextT contextt) {
        if (!(this.skipFirstElementFromEachPartition.contains(contextt.getPartition()) && this.seenPartitions.add(contextt.getPartition()))) {
            synchronized (this.sourceContext.getCheckpointLock()) {
                this.sourceContext.collectWithTimestamp(this.resultExtractor.toResult(streamElement), streamElement.getStamp());
                markOffsetAsConsumed(contextt);
            }
        }
        maybeUpdateWatermark(this.watermark);
        return true;
    }

    public void awaitCompleted() throws InterruptedException {
        this.completed.await();
    }

    public Optional<Throwable> getError() {
        return Optional.ofNullable(this.error);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void maybeUpdateWatermark(long j) {
        if (j > this.watermark) {
            this.watermark = j;
            synchronized (this.sourceContext.getCheckpointLock()) {
                this.sourceContext.emitWatermark(new Watermark(this.watermark));
            }
        }
    }

    @Generated
    public SourceFunction.SourceContext<OutputT> getSourceContext() {
        return this.sourceContext;
    }
}
