package org.nustaq.reallive.client;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.Spore;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.reallive.api.ChangeMessage;
import org.nustaq.reallive.api.ChangeReceiver;
import org.nustaq.reallive.api.RLFunction;
import org.nustaq.reallive.api.RLLimitedPredicate;
import org.nustaq.reallive.api.RLNoQueryPredicate;
import org.nustaq.reallive.api.RLPredicate;
import org.nustaq.reallive.api.RealLiveTable;
import org.nustaq.reallive.api.Record;
import org.nustaq.reallive.api.Subscriber;
import org.nustaq.reallive.api.TableDescription;
import org.nustaq.reallive.messages.AddMessage;
import org.nustaq.reallive.messages.PutMessage;
import org.nustaq.reallive.messages.QueryDoneMessage;
import org.nustaq.reallive.records.RecordWrapper;
import org.nustaq.reallive.server.FilterProcessor;
import org.nustaq.reallive.server.FilterSpore;
import org.nustaq.reallive.server.RLUtil;
import org.nustaq.reallive.server.RemoveLog;
import org.nustaq.reallive.server.storage.StorageStats;

/* loaded from: input_file:org/nustaq/reallive/client/ShardedTable.class */
public class ShardedTable implements RealLiveTable {
    public static boolean DUMP_IN_PROC_CHANGES = false;
    protected TableDescription description;
    final ConcurrentHashMap<Subscriber, List<Subscriber>> subsMap = new ConcurrentHashMap<>();
    protected HashMap<Integer, RealLiveTable> tableShardMap = new HashMap<>();
    protected Set<RealLiveTable> shards = new HashSet();
    protected FilterProcessor proc = new FilterProcessor(this);
    protected AtomicBoolean globalListenReady = new AtomicBoolean(false);

    public ShardedTable(RealLiveTable[] realLiveTableArr, TableDescription tableDescription) {
        this.description = tableDescription;
        for (RealLiveTable realLiveTable : realLiveTableArr) {
            addNode(realLiveTable);
        }
        long currentTimeMillis = System.currentTimeMillis() - 8;
        realSubs(record -> {
            return record.getLastModified() > currentTimeMillis;
        }, changeMessage -> {
            globalListen(changeMessage);
        });
    }

    private Subscriber realSubs(RLPredicate<Record> rLPredicate, ChangeReceiver changeReceiver) {
        Subscriber subscriber = new Subscriber(rLPredicate, changeReceiver);
        realSubscribe(subscriber);
        return subscriber;
    }

    private void realSubscribe(Subscriber subscriber) {
        AtomicInteger atomicInteger = new AtomicInteger(this.shards.size());
        ChangeReceiver receiver = subscriber.getReceiver();
        ArrayList arrayList = new ArrayList();
        this.shards.forEach(realLiveTable -> {
            Subscriber subscriber2 = new Subscriber(subscriber.getFilter(), changeMessage -> {
                if (changeMessage.getType() != 3) {
                    receiver.receive(changeMessage);
                } else if (atomicInteger.decrementAndGet() == 0) {
                    receiver.receive(changeMessage);
                }
            });
            arrayList.add(subscriber2);
            realLiveTable.subscribe(subscriber2);
        });
        this.subsMap.put(subscriber, arrayList);
    }

    private void globalListen(ChangeMessage changeMessage) {
        boolean z = this.globalListenReady.get();
        if (!z) {
            if (changeMessage.isDoneMsg()) {
                if (DUMP_IN_PROC_CHANGES) {
                    Log.Info(this, "Global Listen Ready");
                }
                this.globalListenReady.set(true);
                return;
            }
            return;
        }
        if (!z) {
            Log.Error(this, "Unexpected change routing:" + changeMessage);
            return;
        }
        if (DUMP_IN_PROC_CHANGES) {
            Log.Info(this, "Listen Receive:" + changeMessage);
        }
        this.proc.receive(changeMessage);
    }

    public void addNode(RealLiveTable realLiveTable) {
        this.shards.add(realLiveTable);
    }

    public void removeTableShard(RealLiveTable realLiveTable) {
        for (Map.Entry<Integer, RealLiveTable> entry : this.tableShardMap.entrySet()) {
            if (entry.getValue() == realLiveTable) {
                this.tableShardMap.remove(entry.getKey());
                this.shards.remove(realLiveTable);
                return;
            }
        }
    }

    protected RealLiveTable getTableForKey(String str) {
        int abs = Math.abs(str.hashCode()) % this.tableShardMap.size();
        RealLiveTable realLiveTable = this.tableShardMap.get(Integer.valueOf(abs));
        if (realLiveTable != null) {
            return realLiveTable;
        }
        Log.Warn(this, "cannot map keyHash " + abs);
        return null;
    }

    @Override // org.nustaq.reallive.api.ChangeReceiver
    public void receive(ChangeMessage changeMessage) {
        if (changeMessage.getType() != 3) {
            getTableForKey(changeMessage.getKey()).receive(changeMessage);
        }
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise resizeIfLoadFactorLarger(double d, long j) {
        ArrayList arrayList = new ArrayList();
        this.shards.forEach(realLiveTable -> {
            arrayList.add(realLiveTable.resizeIfLoadFactorLarger(d, j));
        });
        return Actors.all(arrayList);
    }

    @Override // org.nustaq.reallive.api.ChangeStream
    public void subscribe(Subscriber subscriber) {
        if (subscriber.getFilter() instanceof RLNoQueryPredicate) {
            subscriber.getReceiver().receive(new QueryDoneMessage());
            this.proc.startListening(subscriber);
        } else {
            this.proc.startListening(subscriber);
            forEach(subscriber.getFilter(), (record, obj) -> {
                if (Actors.isResult(obj)) {
                    subscriber.getReceiver().receive(new AddMessage(0, record));
                } else if (Actors.isComplete(obj)) {
                    subscriber.getReceiver().receive(new QueryDoneMessage());
                }
            });
        }
    }

    protected void adjustLimitFilter(RLPredicate rLPredicate) {
        if (rLPredicate instanceof RLLimitedPredicate) {
            ((RLLimitedPredicate) rLPredicate)._setLimit(Math.max(1, ((RLLimitedPredicate) rLPredicate).getRecordLimit() / this.shards.size()));
        }
    }

    @Override // org.nustaq.reallive.api.ChangeStream
    public void unsubscribe(Subscriber subscriber) {
        this.proc.unsubscribe(subscriber);
    }

    @Override // org.nustaq.reallive.api.RealLiveTable
    public void unsubscribeById(int i) {
        this.proc.unsubscribeById(i);
    }

    public void realUnsubscribe(Subscriber subscriber) {
        if (subscriber == null) {
            Log.Warn(this, "unsubscribed is null");
            return;
        }
        List<Subscriber> list = this.subsMap.get(subscriber);
        if (list == null) {
            Log.Warn(this, "unknown subscriber to unsubscribe " + subscriber);
            return;
        }
        for (int i = 0; i < list.size(); i++) {
            Subscriber subscriber2 = list.get(i);
            this.shards.forEach(realLiveTable -> {
                realLiveTable.unsubscribe(subscriber2);
            });
        }
        this.subsMap.remove(subscriber);
    }

    @Override // org.nustaq.reallive.api.RealLiveTable
    public IPromise atomic(int i, String str, RLFunction<Record, Object> rLFunction) {
        return getTableForKey(str).atomic(i, str, rLFunction);
    }

    @Override // org.nustaq.reallive.api.RealLiveTable
    public void atomicUpdate(RLPredicate<Record> rLPredicate, RLFunction<Record, Boolean> rLFunction) {
        this.shards.forEach(realLiveTable -> {
            realLiveTable.atomicUpdate(rLPredicate, rLFunction);
        });
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public void put(int i, String str, Object... objArr) {
        getTableForKey(str).receive(new PutMessage(i, RLUtil.get().record(str, objArr)));
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public void merge(int i, String str, Object... objArr) {
        getTableForKey(str).receive(RLUtil.get().addOrUpdate(i, str, objArr));
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public void _deepMerge(int i, Record record) {
        getTableForKey(record.getKey())._deepMerge(i, record);
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise<Boolean> add(int i, String str, Object... objArr) {
        return getTableForKey(str).add(i, str, objArr);
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise<Boolean> addRecord(int i, Record record) {
        if (record instanceof RecordWrapper) {
            record = ((RecordWrapper) record).getRecord();
        }
        return getTableForKey(record.getKey()).addRecord(i, record);
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public void mergeRecord(int i, Record record) {
        if (record instanceof RecordWrapper) {
            record = ((RecordWrapper) record).getRecord();
        }
        getTableForKey(record.getKey()).receive(new AddMessage(i, true, record));
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public void setRecord(int i, Record record) {
        if (record instanceof RecordWrapper) {
            record = ((RecordWrapper) record).getRecord();
        }
        getTableForKey(record.getKey()).receive(new PutMessage(i, record));
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public void update(int i, String str, Object... objArr) {
        getTableForKey(str).receive(RLUtil.get().update(i, str, objArr));
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise<Record> take(int i, String str) {
        return getTableForKey(str).take(i, str);
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public void remove(int i, String str) {
        getTableForKey(str).receive(RLUtil.get().remove(i, str));
    }

    @Override // org.nustaq.reallive.api.RealLiveStreamActor
    public <T> void forEachWithSpore(Spore<Record, T> spore) {
        if (spore instanceof FilterSpore) {
            adjustLimitFilter(((FilterSpore) spore).getFilter());
        }
        spore.setExpectedFinishCount(this.shards.size());
        this.shards.forEach(realLiveTable -> {
            realLiveTable.forEachWithSpore(spore);
        });
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise ping() {
        new ArrayList();
        return Actors.all((List) this.shards.stream().map(realLiveTable -> {
            return realLiveTable.ping();
        }).collect(Collectors.toList()));
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise<TableDescription> getDescription() {
        return new Promise(this.description);
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public void stop() {
        this.shards.forEach(realLiveTable -> {
            realLiveTable.stop();
        });
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise<StorageStats> getStats() {
        Promise promise = new Promise();
        try {
            Actors.all((List) this.shards.stream().map(realLiveTable -> {
                return realLiveTable.getStats();
            }).collect(Collectors.toList())).then((list, obj) -> {
                if (list == null) {
                    promise.reject(obj);
                    return;
                }
                StorageStats storageStats = new StorageStats();
                list.stream().map(iPromise -> {
                    return (StorageStats) iPromise.get();
                }).forEach(storageStats2 -> {
                    storageStats2.addTo(storageStats);
                });
                promise.resolve(storageStats);
            });
        } catch (Exception e) {
            Log.Warn(this, e);
            promise.reject(e);
        }
        return promise;
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise<Record> get(String str) {
        if (str == null) {
            return null;
        }
        return getTableForKey(str).get(str);
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise<Long> size() {
        Promise promise = new Promise();
        Actors.all((List) this.shards.stream().map(realLiveTable -> {
            return realLiveTable.size();
        }).collect(Collectors.toList())).then(list -> {
            promise.resolve(Long.valueOf(list.stream().mapToLong(iPromise -> {
                return ((Long) iPromise.get()).longValue();
            }).sum()));
        });
        return promise;
    }

    public void removeNode(Actor actor) {
        ((List) this.shards.stream().filter(realLiveTable -> {
            return ((Actor) realLiveTable).__clientConnection.getFacadeProxy().getActorRef() == actor.getActorRef();
        }).collect(Collectors.toList())).forEach(realLiveTable2 -> {
            removeTableShard(realLiveTable2);
        });
    }

    @Override // org.nustaq.reallive.api.RealLiveTable
    public void queryRemoveLog(long j, long j2, Callback<RemoveLog.RemoveLogEntry> callback) {
        for (RealLiveTable realLiveTable : this.shards) {
            AtomicInteger atomicInteger = new AtomicInteger();
            realLiveTable.queryRemoveLog(j, j2, (removeLogEntry, obj) -> {
                if (removeLogEntry != null) {
                    callback.pipe(removeLogEntry);
                    return;
                }
                atomicInteger.incrementAndGet();
                if (atomicInteger.get() == this.shards.size()) {
                    callback.finish();
                }
            });
        }
    }

    @Override // org.nustaq.reallive.api.RealLiveTable
    public void pruneRemoveLog(long j) {
        this.shards.stream().forEach(realLiveTable -> {
            realLiveTable.pruneRemoveLog(j);
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2016390939:
                if (implMethodName.equals("lambda$new$28280fd9$1")) {
                    z = 2;
                    break;
                }
                break;
            case 503737328:
                if (implMethodName.equals("lambda$subscribe$92f55504$1")) {
                    z = 3;
                    break;
                }
                break;
            case 628154544:
                if (implMethodName.equals("lambda$realSubscribe$15465c93$1")) {
                    z = false;
                    break;
                }
                break;
            case 793076069:
                if (implMethodName.equals("lambda$getStats$3a42dd7e$1")) {
                    z = true;
                    break;
                }
                break;
            case 1211726339:
                if (implMethodName.equals("lambda$queryRemoveLog$bac526e7$1")) {
                    z = 5;
                    break;
                }
                break;
            case 2131258257:
                if (implMethodName.equals("lambda$new$b9aa6aef$1")) {
                    z = 4;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/reallive/api/ChangeReceiver") && serializedLambda.getFunctionalInterfaceMethodName().equals("receive") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/nustaq/reallive/api/ChangeMessage;)V") && serializedLambda.getImplClass().equals("org/nustaq/reallive/client/ShardedTable") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lorg/nustaq/reallive/api/ChangeReceiver;Lorg/nustaq/reallive/api/ChangeMessage;)V")) {
                    AtomicInteger atomicInteger = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    ChangeReceiver changeReceiver = (ChangeReceiver) serializedLambda.getCapturedArg(1);
                    return changeMessage -> {
                        if (changeMessage.getType() != 3) {
                            changeReceiver.receive(changeMessage);
                        } else if (atomicInteger.decrementAndGet() == 0) {
                            changeReceiver.receive(changeMessage);
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/kontraktor/Callback") && serializedLambda.getFunctionalInterfaceMethodName().equals("complete") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/nustaq/reallive/client/ShardedTable") && serializedLambda.getImplMethodSignature().equals("(Lorg/nustaq/kontraktor/Promise;Ljava/util/List;Ljava/lang/Object;)V")) {
                    Promise promise = (Promise) serializedLambda.getCapturedArg(0);
                    return (list, obj) -> {
                        if (list == null) {
                            promise.reject(obj);
                            return;
                        }
                        StorageStats storageStats = new StorageStats();
                        list.stream().map(iPromise -> {
                            return (StorageStats) iPromise.get();
                        }).forEach(storageStats2 -> {
                            storageStats2.addTo(storageStats);
                        });
                        promise.resolve(storageStats);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/reallive/api/ChangeReceiver") && serializedLambda.getFunctionalInterfaceMethodName().equals("receive") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/nustaq/reallive/api/ChangeMessage;)V") && serializedLambda.getImplClass().equals("org/nustaq/reallive/client/ShardedTable") && serializedLambda.getImplMethodSignature().equals("(Lorg/nustaq/reallive/api/ChangeMessage;)V")) {
                    ShardedTable shardedTable = (ShardedTable) serializedLambda.getCapturedArg(0);
                    return changeMessage2 -> {
                        globalListen(changeMessage2);
                    };
                }
                break;
            case ChangeMessage.QUERYDONE /* 3 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/kontraktor/Callback") && serializedLambda.getFunctionalInterfaceMethodName().equals("complete") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/nustaq/reallive/client/ShardedTable") && serializedLambda.getImplMethodSignature().equals("(Lorg/nustaq/reallive/api/Subscriber;Lorg/nustaq/reallive/api/Record;Ljava/lang/Object;)V")) {
                    Subscriber subscriber = (Subscriber) serializedLambda.getCapturedArg(0);
                    return (record, obj2) -> {
                        if (Actors.isResult(obj2)) {
                            subscriber.getReceiver().receive(new AddMessage(0, record));
                        } else if (Actors.isComplete(obj2)) {
                            subscriber.getReceiver().receive(new QueryDoneMessage());
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/reallive/api/RLPredicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("test") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/nustaq/reallive/client/ShardedTable") && serializedLambda.getImplMethodSignature().equals("(JLorg/nustaq/reallive/api/Record;)Z")) {
                    long longValue = ((Long) serializedLambda.getCapturedArg(0)).longValue();
                    return record2 -> {
                        return record2.getLastModified() > longValue;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/kontraktor/Callback") && serializedLambda.getFunctionalInterfaceMethodName().equals("complete") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/nustaq/reallive/client/ShardedTable") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lorg/nustaq/kontraktor/Callback;Lorg/nustaq/reallive/server/RemoveLog$RemoveLogEntry;Ljava/lang/Object;)V")) {
                    ShardedTable shardedTable2 = (ShardedTable) serializedLambda.getCapturedArg(0);
                    AtomicInteger atomicInteger2 = (AtomicInteger) serializedLambda.getCapturedArg(1);
                    Callback callback = (Callback) serializedLambda.getCapturedArg(2);
                    return (removeLogEntry, obj3) -> {
                        if (removeLogEntry != null) {
                            callback.pipe(removeLogEntry);
                            return;
                        }
                        atomicInteger2.incrementAndGet();
                        if (atomicInteger2.get() == this.shards.size()) {
                            callback.finish();
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
