package cz.seznam.euphoria.core.executor.greduce;

import cz.seznam.euphoria.core.client.accumulators.AccumulatorProvider;
import cz.seznam.euphoria.core.client.accumulators.Counter;
import cz.seznam.euphoria.core.client.accumulators.Histogram;
import cz.seznam.euphoria.core.client.accumulators.Timer;
import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.functional.BinaryFunction;
import cz.seznam.euphoria.core.client.io.Context;
import cz.seznam.euphoria.core.client.operator.state.ListStorage;
import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor;
import cz.seznam.euphoria.core.client.operator.state.MergingStorageDescriptor;
import cz.seznam.euphoria.core.client.operator.state.State;
import cz.seznam.euphoria.core.client.operator.state.StateContext;
import cz.seznam.euphoria.core.client.operator.state.StateFactory;
import cz.seznam.euphoria.core.client.operator.state.StateMerger;
import cz.seznam.euphoria.core.client.operator.state.Storage;
import cz.seznam.euphoria.core.client.operator.state.StorageDescriptor;
import cz.seznam.euphoria.core.client.operator.state.ValueStorage;
import cz.seznam.euphoria.core.client.operator.state.ValueStorageDescriptor;
import cz.seznam.euphoria.core.client.triggers.Trigger;
import cz.seznam.euphoria.core.client.triggers.TriggerContext;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.shadow.com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;

/* loaded from: input_file:cz/seznam/euphoria/core/executor/greduce/GroupReducer.class */
public class GroupReducer<WID extends Window, KEY, I> {
    private final StateFactory<I, ?, State<I, ?>> stateFactory;
    private final StateMerger<I, ?, State<I, ?>> stateCombiner;
    private final WindowedElementFactory<WID, Object> elementFactory;
    private final StateContext stateContext;
    private final Collector<WindowedElement<?, Pair<KEY, ?>>> collector;
    private final Windowing windowing;
    private final Trigger trigger;
    private final AccumulatorProvider accumulators;
    final TriggerStorage triggerStorage;
    final TimerSupport<WID> clock = new TimerSupport<>();
    final HashMap<WID, State> states = new HashMap<>();
    KEY key;

    @FunctionalInterface
    /* loaded from: input_file:cz/seznam/euphoria/core/executor/greduce/GroupReducer$Collector.class */
    public interface Collector<T> {
        void collect(T t);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cz/seznam/euphoria/core/executor/greduce/GroupReducer$ElementCollector.class */
    public class ElementCollector<T> implements Context, cz.seznam.euphoria.core.client.io.Collector<T> {
        final Collector<WindowedElement<WID, Pair<KEY, T>>> out;
        final WID window;

        ElementCollector(Collector<WindowedElement<WID, Pair<KEY, T>>> collector, WID wid) {
            this.out = collector;
            this.window = wid;
        }

        @Override // cz.seznam.euphoria.core.client.io.Collector
        public void collect(T t) {
            this.out.collect(GroupReducer.this.elementFactory.create(this.window, this.window.maxTimestamp() - 1, Pair.of(GroupReducer.this.key, t)));
        }

        @Override // cz.seznam.euphoria.core.client.io.Collector
        public Context asContext() {
            return this;
        }

        @Override // cz.seznam.euphoria.core.client.io.Environment
        public Window<?> getWindow() {
            return this.window;
        }

        @Override // cz.seznam.euphoria.core.client.io.Environment
        public Counter getCounter(String str) {
            return GroupReducer.this.accumulators.getCounter(str);
        }

        @Override // cz.seznam.euphoria.core.client.io.Environment
        public Histogram getHistogram(String str) {
            return GroupReducer.this.accumulators.getHistogram(str);
        }

        @Override // cz.seznam.euphoria.core.client.io.Environment
        public Timer getTimer(String str) {
            return GroupReducer.this.accumulators.getTimer(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cz/seznam/euphoria/core/executor/greduce/GroupReducer$ElementTriggerContext.class */
    public class ElementTriggerContext implements TriggerContext {
        protected final Window window;

        ElementTriggerContext(Window window) {
            this.window = (Window) Objects.requireNonNull(window);
        }

        @Override // cz.seznam.euphoria.core.client.triggers.TriggerContext
        public boolean registerTimer(long j, Window window) {
            GroupReducer.this.clock.registerTimer(j, window);
            return true;
        }

        @Override // cz.seznam.euphoria.core.client.triggers.TriggerContext
        public void deleteTimer(long j, Window window) {
            GroupReducer.this.clock.deleteTimer(j, window);
        }

        @Override // cz.seznam.euphoria.core.client.triggers.TriggerContext
        public long getCurrentTimestamp() {
            return GroupReducer.this.clock.getStamp();
        }

        @Override // cz.seznam.euphoria.core.client.operator.state.StorageProvider
        public <T> ValueStorage<T> getValueStorage(ValueStorageDescriptor<T> valueStorageDescriptor) {
            return GroupReducer.this.triggerStorage.getValueStorage(this.window, valueStorageDescriptor);
        }

        @Override // cz.seznam.euphoria.core.client.operator.state.StorageProvider
        public <T> ListStorage<T> getListStorage(ListStorageDescriptor<T> listStorageDescriptor) {
            return GroupReducer.this.triggerStorage.getListStorage(this.window, listStorageDescriptor);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cz/seznam/euphoria/core/executor/greduce/GroupReducer$MergingTriggerContext.class */
    public class MergingTriggerContext extends GroupReducer<WID, KEY, I>.ElementTriggerContext implements TriggerContext.TriggerMergeContext {
        private Collection<? extends Window> sources;

        MergingTriggerContext(Collection<? extends Window> collection, Window window) {
            super(window);
            this.sources = collection;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v29, types: [cz.seznam.euphoria.core.client.operator.state.ValueStorage] */
        /* JADX WARN: Type inference failed for: r0v3, types: [cz.seznam.euphoria.core.client.operator.state.MergingStorageDescriptor, java.lang.Object] */
        /* JADX WARN: Type inference failed for: r2v1, types: [java.lang.StringBuilder] */
        /* JADX WARN: Type inference failed for: r6v0, types: [cz.seznam.euphoria.core.client.operator.state.StorageDescriptor] */
        @Override // cz.seznam.euphoria.core.client.triggers.TriggerContext.TriggerMergeContext
        public void mergeStoredState(StorageDescriptor storageDescriptor) {
            ListStorage listStorage;
            if (!(storageDescriptor instanceof MergingStorageDescriptor)) {
                throw new IllegalStateException("Storage descriptor must support merging!");
            }
            ?? r0 = (MergingStorageDescriptor) storageDescriptor;
            BinaryFunction merger = r0.getMerger();
            if (r0 instanceof ValueStorageDescriptor) {
                listStorage = getValueStorage((ValueStorageDescriptor) r0);
            } else {
                if (!(r0 instanceof ListStorageDescriptor)) {
                    throw new IllegalStateException("Cannot merge states for " + r0);
                }
                listStorage = getListStorage((ListStorageDescriptor) r0);
            }
            Iterator<? extends Window> it = this.sources.iterator();
            while (it.hasNext()) {
                Storage<?> storage = GroupReducer.this.triggerStorage.getStorage(it.next(), storageDescriptor);
                if (storage != null) {
                    merger.apply(listStorage, storage);
                }
            }
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:cz/seznam/euphoria/core/executor/greduce/GroupReducer$WindowedElementFactory.class */
    public interface WindowedElementFactory<W extends Window, T> {
        WindowedElement<W, T> create(W w, long j, T t);
    }

    public GroupReducer(StateFactory<I, ?, State<I, ?>> stateFactory, StateMerger<I, ?, State<I, ?>> stateMerger, StateContext stateContext, WindowedElementFactory<WID, Object> windowedElementFactory, Windowing windowing, Trigger trigger, Collector<WindowedElement<?, Pair<KEY, ?>>> collector, AccumulatorProvider accumulatorProvider) {
        this.stateFactory = (StateFactory) Objects.requireNonNull(stateFactory);
        this.elementFactory = (WindowedElementFactory) Objects.requireNonNull(windowedElementFactory);
        this.stateCombiner = (StateMerger) Objects.requireNonNull(stateMerger);
        this.stateContext = (StateContext) Objects.requireNonNull(stateContext);
        this.collector = (Collector) Objects.requireNonNull(collector);
        this.windowing = (Windowing) Objects.requireNonNull(windowing);
        this.trigger = (Trigger) Objects.requireNonNull(trigger);
        this.accumulators = (AccumulatorProvider) Objects.requireNonNull(accumulatorProvider);
        this.triggerStorage = new TriggerStorage(stateContext.getStorageProvider());
    }

    public void process(WindowedElement<WID, Pair<KEY, I>> windowedElement) {
        updateKey(windowedElement);
        this.clock.updateStamp(windowedElement.getTimestamp(), this::onTimerCallback);
        WID window = windowedElement.getWindow();
        if (this.windowing instanceof MergingWindowing) {
            window = mergeWindows(window);
        }
        getStateForUpdate(window).add(windowedElement.getElement().getSecond());
        GroupReducer<WID, KEY, I>.ElementTriggerContext elementTriggerContext = new ElementTriggerContext(window);
        processTriggerResult(window, elementTriggerContext, this.trigger.onElement(windowedElement.getTimestamp(), window, elementTriggerContext));
    }

    private State getStateForUpdate(WID wid) {
        return this.states.computeIfAbsent(wid, window -> {
            return this.stateFactory.createState(this.stateContext, null);
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void close() {
        this.clock.updateStamp(Long.MAX_VALUE, this::onTimerCallback);
        Iterator it = new ArrayList(this.states.keySet()).iterator();
        while (it.hasNext()) {
            Window window = (Window) it.next();
            processTriggerResult(window, new ElementTriggerContext(window), Trigger.TriggerResult.FLUSH_AND_PURGE);
        }
    }

    private void onTimerCallback(long j, WID wid) {
        GroupReducer<WID, KEY, I>.ElementTriggerContext elementTriggerContext = new ElementTriggerContext(wid);
        processTriggerResult(wid, elementTriggerContext, this.trigger.onTimer(j, wid, elementTriggerContext));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private WID mergeWindows(WID wid) {
        if (this.states.containsKey(wid)) {
            return wid;
        }
        for (Pair pair : ((MergingWindowing) this.windowing).mergeWindows(getActivesWindowsPlus(wid))) {
            Collection<Window> collection = (Collection) pair.getFirst();
            Window window = (Window) pair.getSecond();
            if (collection.contains(wid)) {
                wid = window;
            }
            collection.remove(window);
            if (!collection.isEmpty()) {
                this.stateCombiner.merge(getStateForUpdate(window), removeStatesForMerging(collection));
                this.trigger.onMerge(window, new MergingTriggerContext(collection, window));
                for (Window window2 : collection) {
                    if (!window2.equals(wid)) {
                        this.trigger.onClear(window2, new ElementTriggerContext(window2));
                    }
                }
            }
        }
        return wid;
    }

    private List<WID> getActivesWindowsPlus(WID wid) {
        ArrayList arrayList = new ArrayList(this.states.keySet().size() + 1);
        arrayList.addAll(this.states.keySet());
        arrayList.add(wid);
        return arrayList;
    }

    private List<State> removeStatesForMerging(Collection<WID> collection) {
        ArrayList arrayList = new ArrayList(collection.size());
        Iterator<WID> it = collection.iterator();
        while (it.hasNext()) {
            State remove = this.states.remove(it.next());
            if (remove != null) {
                arrayList.add(remove);
            }
        }
        return arrayList;
    }

    private void updateKey(WindowedElement<WID, Pair<KEY, I>> windowedElement) {
        if (this.key == null) {
            this.key = windowedElement.getElement().getFirst();
        } else {
            Preconditions.checkState(this.key.equals(windowedElement.getElement().getFirst()));
        }
    }

    private void processTriggerResult(WID wid, GroupReducer<WID, KEY, I>.ElementTriggerContext elementTriggerContext, Trigger.TriggerResult triggerResult) {
        if (triggerResult.isFlush() && triggerResult.isPurge()) {
            State remove = this.states.remove(wid);
            if (remove != null) {
                remove.flush(new ElementCollector(this.collector, wid));
                remove.close();
            }
            this.trigger.onClear(wid, elementTriggerContext);
        }
    }
}
