package org.nustaq.reallive.impl.actors;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.function.Function;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Spore;
import org.nustaq.kontraktor.annotations.CallerSideMethod;
import org.nustaq.kontraktor.annotations.Local;
import org.nustaq.kontraktor.impl.CallbackWrapper;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.reallive.api.ChangeMessage;
import org.nustaq.reallive.api.KeySetSubscriber;
import org.nustaq.reallive.api.RLFunction;
import org.nustaq.reallive.api.RLNoQueryPredicate;
import org.nustaq.reallive.api.RLPredicate;
import org.nustaq.reallive.api.RealLiveTable;
import org.nustaq.reallive.api.Record;
import org.nustaq.reallive.api.RecordStorage;
import org.nustaq.reallive.api.Subscriber;
import org.nustaq.reallive.api.TableDescription;
import org.nustaq.reallive.impl.FilterProcessor;
import org.nustaq.reallive.impl.FilterSpore;
import org.nustaq.reallive.impl.RLUtil;
import org.nustaq.reallive.impl.StorageDriver;
import org.nustaq.reallive.impl.storage.StorageStats;
import org.nustaq.reallive.messages.AddMessage;
import org.nustaq.reallive.messages.PutMessage;
import org.nustaq.reallive.records.RecordWrapper;

/* loaded from: input_file:org/nustaq/reallive/impl/actors/RealLiveTableActor.class */
public class RealLiveTableActor extends Actor<RealLiveTableActor> implements RealLiveTable {
    public static int MAX_QUERY_BATCH_SIZE = 10;
    public static boolean DUMP_QUERY_TIME = false;
    StorageDriver storageDriver;
    FilterProcessor filterProcessor;
    TableDescription description;
    HashMap<String, Subscriber> receiverSideSubsMap = new HashMap<>();
    ArrayList<QueryQEntry> queuedSpores = new ArrayList<>();
    int taCount = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/nustaq/reallive/impl/actors/RealLiveTableActor$QueryQEntry.class */
    public static class QueryQEntry {
        Spore spore;
        Runnable onFin;

        public QueryQEntry(Spore spore, Runnable runnable) {
            this.spore = spore;
            this.onFin = runnable;
        }
    }

    @Local
    public IPromise init(Function<TableDescription, RecordStorage> function, TableDescription tableDescription) {
        this.description = tableDescription;
        Thread.currentThread().setName("Table " + (tableDescription == null ? "NULL" : tableDescription.getName()) + " main");
        this.storageDriver = new StorageDriver(function.apply(tableDescription));
        this.filterProcessor = new FilterProcessor(this);
        this.storageDriver.setListener(this.filterProcessor);
        return resolve();
    }

    @Override // org.nustaq.reallive.api.ChangeReceiver
    public void receive(ChangeMessage changeMessage) {
        checkThread();
        try {
            this.storageDriver.receive(changeMessage);
        } catch (Exception e) {
            Log.Error(this, e);
        }
    }

    public <T> void forEachDirect(Spore<Record, T> spore) {
        checkThread();
        try {
            this.storageDriver.getStore().forEachWithSpore(spore);
        } catch (Exception e) {
            spore.complete((Object) null, e);
        }
    }

    @Override // org.nustaq.reallive.api.RealLiveStreamActor
    public <T> void forEachWithSpore(Spore<Record, T> spore) {
        if (!(spore instanceof FilterSpore) || ((FilterSpore) spore).getFilter().getRecordLimit() <= 0) {
            forEachQueued(spore, () -> {
            });
            return;
        }
        FilterSpore filterSpore = new FilterSpore(((FilterSpore) spore).getFilter());
        ArrayList arrayList = new ArrayList();
        filterSpore.onFinish(() -> {
            delayedSend(arrayList, ((FilterSpore) spore).getFilter().getRecordLimit(), spore);
        });
        filterSpore.setForEach((record, obj) -> {
            if (Actors.isResult(obj)) {
                arrayList.add(record.getKey());
            }
        });
        forEachDirect(filterSpore);
    }

    private <T> void delayedSend(List<String> list, int i, Spore<Record, T> spore) {
        int i2 = 0;
        RecordStorage store = this.storageDriver.getStore();
        for (int size = list.size() - 1; size >= 0 && i2 < i; size--) {
            Record record = store.get(list.get(size));
            if (record != null) {
                spore.remote(record);
            }
            list.remove(size);
            i2++;
        }
        if (list.size() > 0) {
            delayed(1000L, () -> {
                delayedSend(list, i, spore);
            });
        } else {
            spore.finish();
        }
    }

    protected void hasStopped() {
    }

    @Override // org.nustaq.reallive.api.ChangeStream
    @CallerSideMethod
    public void subscribe(Subscriber subscriber) {
        _subscribe(subscriber.getFilter(), (obj, obj2) -> {
            if (Actors.isResult(obj2)) {
                subscriber.getReceiver().receive((ChangeMessage) obj);
            }
        }, subscriber.getId());
    }

    public void _subscribe(RLPredicate rLPredicate, Callback callback, int i) {
        checkThread();
        Subscriber serverSideCB = new Subscriber(rLPredicate, changeMessage -> {
            callback.pipe(changeMessage);
        }).serverSideCB(callback);
        this.receiverSideSubsMap.put(addChannelIdIfPresent(callback, "" + i), serverSideCB);
        FilterSpore modifiesResult = new FilterSpore(serverSideCB.getFilter()).modifiesResult(false);
        modifiesResult.onFinish(() -> {
            serverSideCB.getReceiver().receive(RLUtil.get().done());
        });
        modifiesResult.setForEach((record, obj) -> {
            if (Actors.isResult(obj)) {
                serverSideCB.getReceiver().receive(new AddMessage(0, record));
            } else {
                serverSideCB.getReceiver().receive(RLUtil.get().done());
            }
        });
        if (rLPredicate instanceof KeySetSubscriber.KSPredicate) {
            ((KeySetSubscriber.KSPredicate) rLPredicate).getKeys().forEach(str -> {
                Record record2 = this.storageDriver.getStore().get(str);
                if (record2 != null) {
                    serverSideCB.getReceiver().receive(new AddMessage(0, record2));
                }
            });
            serverSideCB.getReceiver().receive(RLUtil.get().done());
            this.filterProcessor.startListening(serverSideCB);
        } else {
            if (rLPredicate instanceof RLNoQueryPredicate) {
                serverSideCB.getReceiver().receive(RLUtil.get().done());
            } else {
                forEachDirect(modifiesResult);
            }
            this.filterProcessor.startListening(serverSideCB);
        }
    }

    private void forEachQueued(Spore spore, Runnable runnable) {
        this.queuedSpores.add(new QueryQEntry(spore, runnable));
        ((RealLiveTableActor) self())._execQueriesOrDelay(this.queuedSpores.size(), this.taCount);
    }

    public void _execQueriesOrDelay(int i, int i2) {
        if (!(this.queuedSpores.size() == i && this.taCount == i2) && this.queuedSpores.size() <= MAX_QUERY_BATCH_SIZE) {
            _execQueriesOrDelay(this.queuedSpores.size(), this.taCount);
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        this.storageDriver.getStore().stream().forEach(record -> {
            for (int i3 = 0; i3 < this.queuedSpores.size(); i3++) {
                Spore spore = this.queuedSpores.get(i3).spore;
                if (!spore.isFinished()) {
                    try {
                        spore.remote(record);
                    } catch (Throwable th) {
                        Log.Warn(this, th, "exception in spore " + spore);
                        spore.complete((Object) null, th);
                    }
                }
            }
        });
        this.queuedSpores.forEach(queryQEntry -> {
            queryQEntry.spore.finish();
            queryQEntry.onFin.run();
        });
        if (DUMP_QUERY_TIME) {
            System.out.println("tim for " + this.queuedSpores.size() + " " + (System.currentTimeMillis() - currentTimeMillis));
        }
        this.queuedSpores.clear();
    }

    protected String addChannelIdIfPresent(Callback callback, String str) {
        if ((callback instanceof CallbackWrapper) && ((CallbackWrapper) callback).isRemote()) {
            str = str + "#" + ((CallbackWrapper) callback).getRealCallback().getChanId();
        }
        return str;
    }

    @Override // org.nustaq.reallive.api.ChangeStream
    @CallerSideMethod
    public void unsubscribe(Subscriber subscriber) {
        _unsubscribe((obj, obj2) -> {
        }, subscriber.getId());
    }

    @Override // org.nustaq.reallive.api.RealLiveTable
    public void unsubscribeById(int i) {
        _unsubscribe((obj, obj2) -> {
        }, i);
    }

    public void _unsubscribe(Callback callback, int i) {
        checkThread();
        String addChannelIdIfPresent = addChannelIdIfPresent(callback, "" + i);
        Subscriber subscriber = this.receiverSideSubsMap.get(addChannelIdIfPresent);
        this.filterProcessor.unsubscribe(subscriber);
        this.receiverSideSubsMap.remove(addChannelIdIfPresent);
        callback.finish();
        subscriber.getServerSideCB().finish();
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise<Record> get(String str) {
        this.taCount++;
        return resolve(this.storageDriver.getStore().get(str));
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise<Long> size() {
        return resolve(Long.valueOf(this.storageDriver.getStore().size()));
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise<TableDescription> getDescription() {
        return resolve(this.description);
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise<StorageStats> getStats() {
        try {
            return resolve(this.storageDriver.getStore().getStats());
        } catch (Throwable th) {
            Log.Warn(this, th);
            return reject(th.getMessage());
        }
    }

    @Override // org.nustaq.reallive.api.RealLiveTable
    public IPromise atomic(int i, String str, RLFunction<Record, Object> rLFunction) {
        this.taCount++;
        return this.storageDriver.atomicQuery(i, str, rLFunction);
    }

    @Override // org.nustaq.reallive.api.RealLiveTable
    public void atomicUpdate(RLPredicate<Record> rLPredicate, RLFunction<Record, Boolean> rLFunction) {
        this.taCount++;
        this.storageDriver.atomicUpdate(rLPredicate, rLFunction);
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise resizeIfLoadFactorLarger(double d, long j) {
        Log.Info(this, "resizing table if lf: " + d + " maxgrow:" + j);
        long currentTimeMillis = System.currentTimeMillis();
        this.storageDriver.resizeIfLoadFactorLarger(d, j);
        Log.Info(this, "resizing duration" + (System.currentTimeMillis() - currentTimeMillis));
        return resolve();
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public void put(int i, String str, Object... objArr) {
        receive(RLUtil.get().put(i, str, objArr));
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public void merge(int i, String str, Object... objArr) {
        if (str instanceof Record) {
            throw new RuntimeException("probably accidental method resolution fail. Use merge instead");
        }
        receive(RLUtil.get().addOrUpdate(i, str, objArr));
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise<Boolean> add(int i, String str, Object... objArr) {
        if (this.storageDriver.getStore().get(str) != null) {
            return resolve(false);
        }
        receive(RLUtil.get().add(i, str, objArr));
        return resolve(true);
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise<Boolean> addRecord(int i, Record record) {
        if (record instanceof RecordWrapper) {
            record = ((RecordWrapper) record).getRecord();
        }
        if (this.storageDriver.getStore().get(record.getKey()) != null) {
            return resolve(false);
        }
        receive(new AddMessage(i, record));
        return resolve(true);
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public void mergeRecord(int i, Record record) {
        if (record instanceof RecordWrapper) {
            record = ((RecordWrapper) record).getRecord();
        }
        receive(new AddMessage(i, true, record));
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public void setRecord(int i, Record record) {
        if (record instanceof RecordWrapper) {
            record = ((RecordWrapper) record).getRecord();
        }
        receive(new PutMessage(i, record));
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public void update(int i, String str, Object... objArr) {
        receive(RLUtil.get().update(i, str, objArr));
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise<Record> take(int i, String str) {
        Record record = this.storageDriver.getStore().get(str);
        receive(RLUtil.get().remove(i, str));
        return resolve(record);
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public void remove(int i, String str) {
        receive(RLUtil.get().remove(i, str));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1987569713:
                if (implMethodName.equals("lambda$subscribe$e06a0d78$1")) {
                    z = 2;
                    break;
                }
                break;
            case -1810446055:
                if (implMethodName.equals("lambda$forEachWithSpore$e15369d1$1")) {
                    z = 5;
                    break;
                }
                break;
            case -1351104860:
                if (implMethodName.equals("lambda$_subscribe$938149ad$1")) {
                    z = false;
                    break;
                }
                break;
            case -1072181740:
                if (implMethodName.equals("lambda$_subscribe$74208d37$1")) {
                    z = true;
                    break;
                }
                break;
            case -786189234:
                if (implMethodName.equals("lambda$unsubscribeById$3634ce52$1")) {
                    z = 3;
                    break;
                }
                break;
            case 1660743746:
                if (implMethodName.equals("lambda$unsubscribe$beb213b2$1")) {
                    z = 4;
                    break;
                }
                break;
        }
        switch (z) {
            case ChangeMessage.ADD /* 0 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/reallive/api/ChangeReceiver") && serializedLambda.getFunctionalInterfaceMethodName().equals("receive") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/nustaq/reallive/api/ChangeMessage;)V") && serializedLambda.getImplClass().equals("org/nustaq/reallive/impl/actors/RealLiveTableActor") && serializedLambda.getImplMethodSignature().equals("(Lorg/nustaq/kontraktor/Callback;Lorg/nustaq/reallive/api/ChangeMessage;)V")) {
                    Callback callback = (Callback) serializedLambda.getCapturedArg(0);
                    return changeMessage -> {
                        callback.pipe(changeMessage);
                    };
                }
                break;
            case ChangeMessage.REMOVE /* 1 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/kontraktor/Callback") && serializedLambda.getFunctionalInterfaceMethodName().equals("complete") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/nustaq/reallive/impl/actors/RealLiveTableActor") && serializedLambda.getImplMethodSignature().equals("(Lorg/nustaq/reallive/api/Subscriber;Lorg/nustaq/reallive/api/Record;Ljava/lang/Object;)V")) {
                    Subscriber subscriber = (Subscriber) serializedLambda.getCapturedArg(0);
                    return (record, obj) -> {
                        if (Actors.isResult(obj)) {
                            subscriber.getReceiver().receive(new AddMessage(0, record));
                        } else {
                            subscriber.getReceiver().receive(RLUtil.get().done());
                        }
                    };
                }
                break;
            case ChangeMessage.UPDATE /* 2 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/kontraktor/Callback") && serializedLambda.getFunctionalInterfaceMethodName().equals("complete") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/nustaq/reallive/impl/actors/RealLiveTableActor") && serializedLambda.getImplMethodSignature().equals("(Lorg/nustaq/reallive/api/Subscriber;Ljava/lang/Object;Ljava/lang/Object;)V")) {
                    Subscriber subscriber2 = (Subscriber) serializedLambda.getCapturedArg(0);
                    return (obj2, obj22) -> {
                        if (Actors.isResult(obj22)) {
                            subscriber2.getReceiver().receive((ChangeMessage) obj2);
                        }
                    };
                }
                break;
            case ChangeMessage.QUERYDONE /* 3 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/kontraktor/Callback") && serializedLambda.getFunctionalInterfaceMethodName().equals("complete") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/nustaq/reallive/impl/actors/RealLiveTableActor") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V")) {
                    return (obj3, obj23) -> {
                    };
                }
                break;
            case ChangeMessage.PUT /* 4 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/kontraktor/Callback") && serializedLambda.getFunctionalInterfaceMethodName().equals("complete") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/nustaq/reallive/impl/actors/RealLiveTableActor") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V")) {
                    return (obj4, obj24) -> {
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/kontraktor/Callback") && serializedLambda.getFunctionalInterfaceMethodName().equals("complete") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/nustaq/reallive/impl/actors/RealLiveTableActor") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/List;Lorg/nustaq/reallive/api/Record;Ljava/lang/Object;)V")) {
                    List list = (List) serializedLambda.getCapturedArg(0);
                    return (record2, obj5) -> {
                        if (Actors.isResult(obj5)) {
                            list.add(record2.getKey());
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
