package org.nustaq.reallive.impl.actors;

import java.lang.invoke.SerializedLambda;
import java.util.HashMap;
import java.util.function.Supplier;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Spore;
import org.nustaq.kontraktor.annotations.CallerSideMethod;
import org.nustaq.kontraktor.annotations.Local;
import org.nustaq.kontraktor.impl.CallbackWrapper;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.reallive.impl.FilterProcessorImpl;
import org.nustaq.reallive.impl.Mutator;
import org.nustaq.reallive.impl.StorageDriver;
import org.nustaq.reallive.impl.storage.StorageStats;
import org.nustaq.reallive.interfaces.ChangeMessage;
import org.nustaq.reallive.interfaces.FilterProcessor;
import org.nustaq.reallive.interfaces.Mutatable;
import org.nustaq.reallive.interfaces.Mutation;
import org.nustaq.reallive.interfaces.RLPredicate;
import org.nustaq.reallive.interfaces.RealLiveTable;
import org.nustaq.reallive.interfaces.Record;
import org.nustaq.reallive.interfaces.RecordStorage;
import org.nustaq.reallive.interfaces.Subscriber;
import org.nustaq.reallive.interfaces.TableDescription;

/* loaded from: input_file:org/nustaq/reallive/impl/actors/RealLiveStreamActor.class */
public class RealLiveStreamActor<K> extends Actor<RealLiveStreamActor<K>> implements RealLiveTable<K>, Mutatable<K> {
    StorageDriver<K> storageDriver;
    FilterProcessor<K> filterProcessor;
    HashMap<String, Subscriber> receiverSideSubsMap = new HashMap<>();
    TableDescription description;

    @Local
    public void init(Supplier<RecordStorage<K>> supplier, TableDescription tableDescription) {
        this.description = tableDescription;
        this.storageDriver = new StorageDriver<>(supplier.get());
        this.filterProcessor = new FilterProcessorImpl(this);
        this.storageDriver.setListener(this.filterProcessor);
    }

    @Override // org.nustaq.reallive.interfaces.ChangeReceiver
    public void receive(ChangeMessage<K> changeMessage) {
        checkThread();
        try {
            this.storageDriver.receive(changeMessage);
        } catch (Exception e) {
            Log.Error(this, e);
        }
    }

    @Override // org.nustaq.reallive.interfaces.RecordIterable
    public <T> void forEach(Spore<Record<K>, T> spore) {
        checkThread();
        try {
            this.storageDriver.getStore().forEach(spore);
        } catch (Exception e) {
            spore.complete((Object) null, e);
        }
    }

    protected void hasStopped() {
    }

    @Override // org.nustaq.reallive.interfaces.ChangeStream
    @CallerSideMethod
    public void subscribe(Subscriber<K> subscriber) {
        _subscribe(subscriber.getFilter(), (obj, obj2) -> {
            if (Actors.isResult(obj2)) {
                subscriber.getReceiver().receive((ChangeMessage) obj);
            }
        }, subscriber.getId());
    }

    public void _subscribe(RLPredicate rLPredicate, Callback callback, int i) {
        checkThread();
        Subscriber<K> subscriber = new Subscriber<>(rLPredicate, changeMessage -> {
            callback.stream(changeMessage);
        });
        this.receiverSideSubsMap.put(addChannelIdIfPresent(callback, "" + i), subscriber);
        this.filterProcessor.subscribe(subscriber);
    }

    protected String addChannelIdIfPresent(Callback callback, String str) {
        if ((callback instanceof CallbackWrapper) && ((CallbackWrapper) callback).isRemote()) {
            str = str + "#" + ((CallbackWrapper) callback).getRealCallback().getChanId();
        }
        return str;
    }

    @Override // org.nustaq.reallive.interfaces.ChangeStream
    @CallerSideMethod
    public void unsubscribe(Subscriber<K> subscriber) {
        _unsubscribe((obj, obj2) -> {
        }, subscriber.getId());
    }

    public void _unsubscribe(Callback callback, int i) {
        checkThread();
        String addChannelIdIfPresent = addChannelIdIfPresent(callback, "" + i);
        this.filterProcessor.unsubscribe(this.receiverSideSubsMap.get(addChannelIdIfPresent));
        this.receiverSideSubsMap.remove(addChannelIdIfPresent);
    }

    @Override // org.nustaq.reallive.interfaces.AsyncKV
    public IPromise<Record<K>> get(K k) {
        return resolve(this.storageDriver.getStore().get(k));
    }

    @Override // org.nustaq.reallive.interfaces.AsyncKV
    public IPromise<Long> size() {
        return resolve(Long.valueOf(this.storageDriver.getStore().size()));
    }

    @Override // org.nustaq.reallive.interfaces.Mutatable
    @CallerSideMethod
    public Mutation<K> getMutation() {
        return new Mutator(self());
    }

    @Override // org.nustaq.reallive.interfaces.RealLiveTable
    public IPromise<TableDescription> getDescription() {
        return resolve(this.description);
    }

    @Override // org.nustaq.reallive.interfaces.RealLiveTable
    public IPromise<StorageStats> getStats() {
        return resolve(this.storageDriver.getStore().getStats());
    }

    @Override // org.nustaq.reallive.interfaces.RealLiveTable
    public IPromise<Boolean> putCAS(RLPredicate<Record<K>> rLPredicate, K k, Object[] objArr) {
        return this.storageDriver.putCAS(rLPredicate, k, objArr);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1431287007:
                if (implMethodName.equals("lambda$unsubscribe$9b206036$1")) {
                    z = false;
                    break;
                }
                break;
            case -1089918212:
                if (implMethodName.equals("lambda$_subscribe$13b49503$1")) {
                    z = true;
                    break;
                }
                break;
            case 1633187999:
                if (implMethodName.equals("lambda$subscribe$ca966df8$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case ChangeMessage.ADD /* 0 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/kontraktor/Callback") && serializedLambda.getFunctionalInterfaceMethodName().equals("complete") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/nustaq/reallive/impl/actors/RealLiveStreamActor") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V")) {
                    return (obj, obj2) -> {
                    };
                }
                break;
            case ChangeMessage.REMOVE /* 1 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/reallive/interfaces/ChangeReceiver") && serializedLambda.getFunctionalInterfaceMethodName().equals("receive") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/nustaq/reallive/interfaces/ChangeMessage;)V") && serializedLambda.getImplClass().equals("org/nustaq/reallive/impl/actors/RealLiveStreamActor") && serializedLambda.getImplMethodSignature().equals("(Lorg/nustaq/kontraktor/Callback;Lorg/nustaq/reallive/interfaces/ChangeMessage;)V")) {
                    Callback callback = (Callback) serializedLambda.getCapturedArg(0);
                    return changeMessage -> {
                        callback.stream(changeMessage);
                    };
                }
                break;
            case ChangeMessage.UPDATE /* 2 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/kontraktor/Callback") && serializedLambda.getFunctionalInterfaceMethodName().equals("complete") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/nustaq/reallive/impl/actors/RealLiveStreamActor") && serializedLambda.getImplMethodSignature().equals("(Lorg/nustaq/reallive/interfaces/Subscriber;Ljava/lang/Object;Ljava/lang/Object;)V")) {
                    Subscriber subscriber = (Subscriber) serializedLambda.getCapturedArg(0);
                    return (obj3, obj22) -> {
                        if (Actors.isResult(obj22)) {
                            subscriber.getReceiver().receive((ChangeMessage) obj3);
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
