package stream.scotty.flinkconnector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import stream.scotty.core.AggregateWindow;
import stream.scotty.core.windowFunction.AggregateFunction;
import stream.scotty.core.windowType.Window;
import stream.scotty.slicing.SlicingWindowOperator;
import stream.scotty.state.memory.MemoryStateFactory;

/* loaded from: input_file:stream/scotty/flinkconnector/KeyedScottyWindowOperator.class */
public class KeyedScottyWindowOperator<Key, InputType, FinalAggregateType> extends KeyedProcessFunction<Key, InputType, AggregateWindow<FinalAggregateType>> {
    private MemoryStateFactory stateFactory;
    private HashMap<Key, SlicingWindowOperator<InputType>> slicingWindowOperatorMap;
    private long lastWatermark;
    private final AggregateFunction<InputType, ?, FinalAggregateType> windowFunction;
    private long allowedLateness = 1;
    private final List<Window> windows = new ArrayList();

    public KeyedScottyWindowOperator(AggregateFunction<InputType, ?, FinalAggregateType> aggregateFunction) {
        this.windowFunction = aggregateFunction;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.stateFactory = new MemoryStateFactory();
        this.slicingWindowOperatorMap = new HashMap<>();
    }

    public SlicingWindowOperator<InputType> initWindowOperator() {
        SlicingWindowOperator<InputType> slicingWindowOperator = new SlicingWindowOperator<>(this.stateFactory);
        Iterator<Window> it = this.windows.iterator();
        while (it.hasNext()) {
            slicingWindowOperator.addWindowAssigner(it.next());
        }
        slicingWindowOperator.addAggregation(this.windowFunction);
        slicingWindowOperator.setMaxLateness(this.allowedLateness);
        return slicingWindowOperator;
    }

    public Key getKey(KeyedProcessFunction<Key, InputType, AggregateWindow<FinalAggregateType>>.Context context) {
        return (Key) context.getCurrentKey();
    }

    public void processElement(InputType inputtype, KeyedProcessFunction<Key, InputType, AggregateWindow<FinalAggregateType>>.Context context, Collector<AggregateWindow<FinalAggregateType>> collector) throws Exception {
        Key key = getKey(context);
        if (!this.slicingWindowOperatorMap.containsKey(key)) {
            this.slicingWindowOperatorMap.put(key, initWindowOperator());
        }
        this.slicingWindowOperatorMap.get(key).processElement(inputtype, getTimestamp(context));
        processWatermark(context, collector);
    }

    private long getTimestamp(KeyedProcessFunction<Key, InputType, AggregateWindow<FinalAggregateType>>.Context context) {
        return context.timestamp() != null ? context.timestamp().longValue() : context.timerService().currentProcessingTime();
    }

    private void processWatermark(KeyedProcessFunction<Key, InputType, AggregateWindow<FinalAggregateType>>.Context context, Collector<AggregateWindow<FinalAggregateType>> collector) {
        long timestamp = context.timerService().currentWatermark() < 0 ? getTimestamp(context) : context.timerService().currentWatermark();
        if (timestamp > this.lastWatermark) {
            Iterator<SlicingWindowOperator<InputType>> it = this.slicingWindowOperatorMap.values().iterator();
            while (it.hasNext()) {
                for (AggregateWindow aggregateWindow : it.next().processWatermark(timestamp)) {
                    if (aggregateWindow.hasValue()) {
                        collector.collect(aggregateWindow);
                    }
                }
            }
            this.lastWatermark = timestamp;
        }
    }

    public KeyedScottyWindowOperator addWindow(Window window) {
        this.windows.add(window);
        return this;
    }

    public KeyedScottyWindowOperator allowedLateness(Time time) {
        this.allowedLateness = time.toMilliseconds();
        return this;
    }
}
