package de.robertmetzger.flink.utils.eventtime;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/* loaded from: input_file:de/robertmetzger/flink/utils/eventtime/TimeSmoother.class */
public class TimeSmoother<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
    private transient TreeMultiMap<Long, T> tree;
    private int sizeLimit;

    /* loaded from: input_file:de/robertmetzger/flink/utils/eventtime/TimeSmoother$TreeMultiMap.class */
    public static class TreeMultiMap<K, V> implements Serializable {
        private final TreeMap<K, List<V>> tree = new TreeMap<>();
        private int size = 0;

        public int size() {
            return this.size;
        }

        public void remove(K k) {
            this.size -= this.tree.get(k).size();
            this.tree.remove(k);
        }

        public void put(K k, V v) {
            this.size++;
            List<V> list = this.tree.get(k);
            if (list == null) {
                list = new ArrayList();
                this.tree.put(k, list);
            }
            list.add(v);
        }

        public Map.Entry<K, List<V>> firstEntry() {
            return this.tree.firstEntry();
        }

        public NavigableMap<K, List<V>> headMap(K k, boolean z) {
            return this.tree.headMap(k, z);
        }

        public void reportRemoved(int i) {
            this.size -= i;
        }
    }

    private TimeSmoother(int i) {
        this.sizeLimit = 0;
        if (i < 0) {
            throw new IllegalArgumentException("Size limit is negative");
        }
        this.sizeLimit = i;
    }

    public static <T> DataStream<T> forStream(DataStream<T> dataStream) {
        return forStream(dataStream, 0);
    }

    public static <T> DataStream<T> forStream(DataStream<T> dataStream, int i) {
        if (dataStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
            throw new IllegalArgumentException("This operator doesn't work with processing time. The time characteristic is set to '" + dataStream.getExecutionEnvironment().getStreamTimeCharacteristic() + "'.");
        }
        return dataStream.transform("TimeSmoother", dataStream.getType(), new TimeSmoother(i));
    }

    public void open() throws Exception {
        if (this.tree == null) {
            this.tree = new TreeMultiMap<>();
        }
        getMetricGroup().gauge("treeSize", new Gauge<Integer>() { // from class: de.robertmetzger.flink.utils.eventtime.TimeSmoother.1
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Integer m0getValue() {
                return Integer.valueOf(TimeSmoother.this.tree.size());
            }
        });
    }

    public void processElement(StreamRecord<T> streamRecord) throws Exception {
        if (this.sizeLimit > 0 && this.tree.size() > this.sizeLimit) {
            Map.Entry<Long, List<T>> firstEntry = this.tree.firstEntry();
            Iterator<T> it = firstEntry.getValue().iterator();
            while (it.hasNext()) {
                this.output.collect(new StreamRecord(it.next(), firstEntry.getKey().longValue()));
            }
            this.tree.remove(firstEntry.getKey());
        }
        this.tree.put(Long.valueOf(streamRecord.getTimestamp()), streamRecord.getValue());
    }

    public void processWatermark(Watermark watermark) throws Exception {
        Iterator<Map.Entry<Long, List<T>>> it = this.tree.headMap(Long.valueOf(watermark.getTimestamp()), true).entrySet().iterator();
        int i = 0;
        while (it.hasNext()) {
            Map.Entry<Long, List<T>> next = it.next();
            Iterator<T> it2 = next.getValue().iterator();
            while (it2.hasNext()) {
                this.output.collect(new StreamRecord(it2.next(), next.getKey().longValue()));
                i++;
            }
            it.remove();
        }
        this.tree.reportRemoved(i);
        this.output.emitWatermark(watermark);
    }
}
