package org.apache.flink.cep.operator;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Set;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/* loaded from: input_file:org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.class */
public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends AbstractCEPBasePatternOperator<IN, OUT> {
    private static final long serialVersionUID = -7234999752950159178L;
    private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorState";
    private static final String PRIORIRY_QUEUE_STATE_NAME = "priorityQueueStateName";
    private final KeySelector<IN, KEY> keySelector;
    private final TypeSerializer<KEY> keySerializer;
    private final PriorityQueueFactory<StreamRecord<IN>> priorityQueueFactory;
    private final NFACompiler.NFAFactory<IN> nfaFactory;
    private transient Set<KEY> keys;
    private transient ValueState<NFA<IN>> nfaOperatorState;
    private transient ValueState<PriorityQueue<StreamRecord<IN>>> priorityQueueOperatorState;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator$PriorityQueueFactory.class */
    public interface PriorityQueueFactory<T> extends Serializable {
        PriorityQueue<T> createPriorityQueue();
    }

    /* loaded from: input_file:org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator$PriorityQueueSerializer.class */
    private static class PriorityQueueSerializer<T> extends TypeSerializer<PriorityQueue<T>> {
        private static final long serialVersionUID = -231980397616187715L;
        private final TypeSerializer<T> elementSerializer;
        private final PriorityQueueFactory<T> factory;

        public PriorityQueueSerializer(TypeSerializer<T> typeSerializer, PriorityQueueFactory<T> priorityQueueFactory) {
            this.elementSerializer = typeSerializer;
            this.factory = priorityQueueFactory;
        }

        public boolean isImmutableType() {
            return false;
        }

        public TypeSerializer<PriorityQueue<T>> duplicate() {
            return new PriorityQueueSerializer(this.elementSerializer.duplicate(), this.factory);
        }

        /* renamed from: createInstance, reason: merged with bridge method [inline-methods] */
        public PriorityQueue<T> m7createInstance() {
            return this.factory.createPriorityQueue();
        }

        /* JADX WARN: Multi-variable type inference failed */
        public PriorityQueue<T> copy(PriorityQueue<T> priorityQueue) {
            PriorityQueue<T> createPriorityQueue = this.factory.createPriorityQueue();
            Iterator<T> it = priorityQueue.iterator();
            while (it.hasNext()) {
                createPriorityQueue.offer(this.elementSerializer.copy(it.next()));
            }
            return createPriorityQueue;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public PriorityQueue<T> copy(PriorityQueue<T> priorityQueue, PriorityQueue<T> priorityQueue2) {
            priorityQueue2.clear();
            Iterator<T> it = priorityQueue.iterator();
            while (it.hasNext()) {
                priorityQueue2.offer(this.elementSerializer.copy(it.next()));
            }
            return priorityQueue2;
        }

        public int getLength() {
            return 0;
        }

        public void serialize(PriorityQueue<T> priorityQueue, DataOutputView dataOutputView) throws IOException {
            dataOutputView.writeInt(priorityQueue.size());
            Iterator<T> it = priorityQueue.iterator();
            while (it.hasNext()) {
                this.elementSerializer.serialize(it.next(), dataOutputView);
            }
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public PriorityQueue<T> m6deserialize(DataInputView dataInputView) throws IOException {
            return deserialize((PriorityQueue) this.factory.createPriorityQueue(), dataInputView);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public PriorityQueue<T> deserialize(PriorityQueue<T> priorityQueue, DataInputView dataInputView) throws IOException {
            priorityQueue.clear();
            int readInt = dataInputView.readInt();
            for (int i = 0; i < readInt; i++) {
                priorityQueue.offer(this.elementSerializer.deserialize(dataInputView));
            }
            return priorityQueue;
        }

        public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof PriorityQueueSerializer)) {
                return false;
            }
            PriorityQueueSerializer priorityQueueSerializer = (PriorityQueueSerializer) obj;
            return this.factory.equals(priorityQueueSerializer.factory) && this.elementSerializer.equals(priorityQueueSerializer.elementSerializer);
        }

        public boolean canEqual(Object obj) {
            return obj instanceof PriorityQueueSerializer;
        }

        public int hashCode() {
            return Objects.hash(this.factory, this.elementSerializer);
        }
    }

    /* loaded from: input_file:org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator$PriorityQueueStreamRecordFactory.class */
    private static class PriorityQueueStreamRecordFactory<T> implements PriorityQueueFactory<StreamRecord<T>> {
        private static final long serialVersionUID = 1254766984454616593L;

        private PriorityQueueStreamRecordFactory() {
        }

        @Override // org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.PriorityQueueFactory
        public PriorityQueue<StreamRecord<T>> createPriorityQueue() {
            return new PriorityQueue<>(11, new StreamRecordComparator());
        }

        public boolean equals(Object obj) {
            return obj instanceof PriorityQueueStreamRecordFactory;
        }

        public int hashCode() {
            return getClass().hashCode();
        }
    }

    public AbstractKeyedCEPPatternOperator(TypeSerializer<IN> typeSerializer, boolean z, KeySelector<IN, KEY> keySelector, TypeSerializer<KEY> typeSerializer2, NFACompiler.NFAFactory<IN> nFAFactory) {
        super(typeSerializer, z);
        this.priorityQueueFactory = new PriorityQueueStreamRecordFactory();
        this.keySelector = keySelector;
        this.keySerializer = typeSerializer2;
        this.nfaFactory = nFAFactory;
    }

    public void open() throws Exception {
        super.open();
        if (this.keys == null) {
            this.keys = new HashSet();
        }
        if (this.nfaOperatorState == null) {
            this.nfaOperatorState = getPartitionedState(new ValueStateDescriptor(NFA_OPERATOR_STATE_NAME, new NFA.Serializer()));
        }
        StreamElementSerializer streamElementSerializer = new StreamElementSerializer(getInputSerializer());
        if (this.priorityQueueOperatorState == null) {
            this.priorityQueueOperatorState = getPartitionedState(new ValueStateDescriptor(PRIORIRY_QUEUE_STATE_NAME, new PriorityQueueSerializer(streamElementSerializer, new PriorityQueueStreamRecordFactory())));
        }
    }

    @Override // org.apache.flink.cep.operator.AbstractCEPBasePatternOperator
    protected NFA<IN> getNFA() throws IOException {
        NFA<IN> nfa = (NFA) this.nfaOperatorState.value();
        if (nfa == null) {
            nfa = this.nfaFactory.createNFA();
            this.nfaOperatorState.update(nfa);
        }
        return nfa;
    }

    @Override // org.apache.flink.cep.operator.AbstractCEPBasePatternOperator
    protected void updateNFA(NFA<IN> nfa) throws IOException {
        this.nfaOperatorState.update(nfa);
    }

    @Override // org.apache.flink.cep.operator.AbstractCEPBasePatternOperator
    protected PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException {
        PriorityQueue<StreamRecord<IN>> priorityQueue = (PriorityQueue) this.priorityQueueOperatorState.value();
        if (priorityQueue == null) {
            priorityQueue = this.priorityQueueFactory.createPriorityQueue();
            this.priorityQueueOperatorState.update(priorityQueue);
        }
        return priorityQueue;
    }

    @Override // org.apache.flink.cep.operator.AbstractCEPBasePatternOperator
    protected void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> priorityQueue) throws IOException {
        this.priorityQueueOperatorState.update(priorityQueue);
    }

    @Override // org.apache.flink.cep.operator.AbstractCEPBasePatternOperator
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        this.keys.add(this.keySelector.getKey(streamRecord.getValue()));
        super.processElement(streamRecord);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processWatermark(Watermark watermark) throws Exception {
        Iterator<KEY> it = this.keys.iterator();
        while (it.hasNext()) {
            setCurrentKey(it.next());
            PriorityQueue priorityQueue = getPriorityQueue();
            NFA nfa = getNFA();
            if (priorityQueue.isEmpty()) {
                advanceTime(nfa, watermark.getTimestamp());
            } else {
                while (!priorityQueue.isEmpty() && ((StreamRecord) priorityQueue.peek()).getTimestamp() <= watermark.getTimestamp()) {
                    StreamRecord streamRecord = (StreamRecord) priorityQueue.poll();
                    processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
                }
            }
            updateNFA(nfa);
            updatePriorityQueue(priorityQueue);
        }
        this.output.emitWatermark(watermark);
    }

    public void snapshotState(FSDataOutputStream fSDataOutputStream, long j, long j2) throws Exception {
        DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(fSDataOutputStream);
        dataOutputViewStreamWrapper.writeInt(this.keys.size());
        Iterator<KEY> it = this.keys.iterator();
        while (it.hasNext()) {
            this.keySerializer.serialize(it.next(), dataOutputViewStreamWrapper);
        }
    }

    public void restoreState(FSDataInputStream fSDataInputStream) throws Exception {
        DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(fSDataInputStream);
        if (this.keys == null) {
            this.keys = new HashSet();
        }
        int readInt = dataInputViewStreamWrapper.readInt();
        for (int i = 0; i < readInt; i++) {
            this.keys.add(this.keySerializer.deserialize(dataInputViewStreamWrapper));
        }
    }
}
