package de.robertmetzger.flink.utils.watermarks;

import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/* loaded from: input_file:de/robertmetzger/flink/utils/watermarks/WatermarkQualityScorer.class */
public class WatermarkQualityScorer<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
    private transient long currentWatermark;
    private transient long elementsSeen;
    private transient long elementsBehindWatermark;
    private transient long lastWatermark;

    public static <T> void score(DataStream<T> dataStream) {
        dataStream.transform("Watermark Quality Scorer", dataStream.getType(), new WatermarkQualityScorer());
    }

    public void open() throws Exception {
        this.lastWatermark = Long.MIN_VALUE;
        this.currentWatermark = Long.MIN_VALUE;
        MetricGroup addGroup = getMetricGroup().addGroup("watermarkQuality");
        addGroup.gauge("currentWatermark", new Gauge<Long>() { // from class: de.robertmetzger.flink.utils.watermarks.WatermarkQualityScorer.1
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Long m2getValue() {
                return Long.valueOf(WatermarkQualityScorer.this.currentWatermark);
            }
        });
        addGroup.gauge("elementsSeen", new Gauge<Long>() { // from class: de.robertmetzger.flink.utils.watermarks.WatermarkQualityScorer.2
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Long m3getValue() {
                return Long.valueOf(WatermarkQualityScorer.this.elementsSeen);
            }
        });
        addGroup.gauge("elementsBehindWatermark", new Gauge<Long>() { // from class: de.robertmetzger.flink.utils.watermarks.WatermarkQualityScorer.3
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Long m4getValue() {
                return Long.valueOf(WatermarkQualityScorer.this.elementsBehindWatermark);
            }
        });
        addGroup.gauge("qualityScore", new Gauge<Double>() { // from class: de.robertmetzger.flink.utils.watermarks.WatermarkQualityScorer.4
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Double m5getValue() {
                return Double.valueOf(WatermarkQualityScorer.this.elementsBehindWatermark / WatermarkQualityScorer.this.elementsSeen);
            }
        });
    }

    public void processElement(StreamRecord<T> streamRecord) throws Exception {
        this.elementsSeen++;
        if (streamRecord.getTimestamp() < this.currentWatermark) {
            this.elementsBehindWatermark++;
        }
    }

    public void processWatermark(Watermark watermark) throws Exception {
        LOG.info("Final quality score " + (this.elementsBehindWatermark / this.elementsSeen) + " elements behind " + this.elementsBehindWatermark + " of " + this.elementsSeen);
        this.currentWatermark = watermark.getTimestamp();
        this.elementsSeen = 0L;
        this.elementsBehindWatermark = 0L;
        if (this.currentWatermark < this.lastWatermark) {
            throw new IllegalStateException("Current watermark is lower than last watermark");
        }
        this.lastWatermark = this.currentWatermark;
    }
}
