package org.nustaq.reallive.impl.actors;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.Spore;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.reallive.api.ChangeMessage;
import org.nustaq.reallive.api.ChangeReceiver;
import org.nustaq.reallive.api.RLFunction;
import org.nustaq.reallive.api.RLNoQueryPredicate;
import org.nustaq.reallive.api.RLPredicate;
import org.nustaq.reallive.api.RealLiveTable;
import org.nustaq.reallive.api.Record;
import org.nustaq.reallive.api.Subscriber;
import org.nustaq.reallive.api.TableDescription;
import org.nustaq.reallive.impl.FilterProcessor;
import org.nustaq.reallive.impl.RLUtil;
import org.nustaq.reallive.impl.storage.StorageStats;
import org.nustaq.reallive.messages.AddMessage;
import org.nustaq.reallive.messages.PutMessage;
import org.nustaq.reallive.messages.QueryDoneMessage;
import org.nustaq.reallive.records.RecordWrapper;

/* loaded from: input_file:org/nustaq/reallive/impl/actors/ShardedTable.class */
public class ShardedTable implements RealLiveTable {
    static final int NUM_SLOTS = 10000;
    private TableDescription description;
    final ConcurrentHashMap<Subscriber, List<Subscriber>> subsMap = new ConcurrentHashMap<>();
    private HashMap<Integer, RealLiveTable[]> shardMap = new HashMap<>();
    private Set<RealLiveTable> shards = new HashSet();
    private FilterProcessor proc = new FilterProcessor(this);
    AtomicBoolean globalListenReady = new AtomicBoolean(false);

    public ShardedTable(RealLiveTable[] realLiveTableArr, TableDescription tableDescription) {
        this.description = tableDescription;
        for (int i = 0; i < realLiveTableArr.length; i++) {
            addNode(createSlots(realLiveTableArr.length, i), realLiveTableArr[i]);
        }
        if (!isComplete()) {
            Log.Error(this, "incomplete key coverage");
        }
        System.currentTimeMillis();
        realSubs(obj -> {
            return true;
        }, changeMessage -> {
            globalListen(changeMessage);
        });
    }

    private Subscriber realSubs(RLPredicate<Record> rLPredicate, ChangeReceiver changeReceiver) {
        Subscriber subscriber = new Subscriber(rLPredicate, changeReceiver);
        realSubscribe(subscriber);
        return subscriber;
    }

    private void realSubscribe(Subscriber subscriber) {
        AtomicInteger atomicInteger = new AtomicInteger(this.shards.size());
        ChangeReceiver receiver = subscriber.getReceiver();
        ArrayList arrayList = new ArrayList();
        this.shards.forEach(realLiveTable -> {
            Subscriber subscriber2 = new Subscriber(subscriber.getFilter(), changeMessage -> {
                if (changeMessage.getType() != 3) {
                    receiver.receive(changeMessage);
                } else if (atomicInteger.decrementAndGet() == 0) {
                    receiver.receive(changeMessage);
                }
            });
            arrayList.add(subscriber2);
            realLiveTable.subscribe(subscriber2);
        });
        this.subsMap.put(subscriber, arrayList);
    }

    private void globalListen(ChangeMessage changeMessage) {
        boolean z = this.globalListenReady.get();
        if (!z && changeMessage.isDoneMsg()) {
            this.globalListenReady.set(true);
        } else if (z) {
            this.proc.receive(changeMessage);
        }
    }

    private void dumpMisses() {
        for (int i = 0; i < NUM_SLOTS; i++) {
            if (!this.shardMap.containsKey(Integer.valueOf(i))) {
                Log.Error(this, "   missing bucket " + i);
            }
        }
    }

    private boolean isComplete() {
        for (int i = 0; i < NUM_SLOTS; i++) {
            if (!this.shardMap.containsKey(Integer.valueOf(i))) {
                return false;
            }
        }
        return true;
    }

    private int[] createSlots(int i, int i2) {
        int i3 = (NUM_SLOTS / i) * i2;
        int i4 = (NUM_SLOTS / i) * (i2 + 1);
        if (i2 == i - 1) {
            i4 = NUM_SLOTS;
        }
        int[] iArr = new int[i4 - i3];
        for (int i5 = i3; i5 < i4; i5++) {
            iArr[i5 - i3] = i5;
        }
        return iArr;
    }

    public void addNode(int[] iArr, RealLiveTable realLiveTable) {
        this.shards.add(realLiveTable);
        for (int i : iArr) {
            addSlot(realLiveTable, i);
        }
    }

    public void removeTableShard(RealLiveTable realLiveTable) {
        this.shardMap.forEach((num, realLiveTableArr) -> {
            for (RealLiveTable realLiveTable2 : realLiveTableArr) {
                if (realLiveTable2 == realLiveTable) {
                    RealLiveTable[] realLiveTableArr = new RealLiveTable[realLiveTableArr.length - 1];
                    int i = 0;
                    int i2 = 0;
                    while (i2 < realLiveTableArr.length) {
                        if (realLiveTableArr[i] != realLiveTable) {
                            realLiveTableArr[i2] = realLiveTableArr[i];
                            i2++;
                            i++;
                        } else {
                            i++;
                        }
                    }
                    return;
                }
            }
        });
        this.shards.remove(realLiveTable);
    }

    private void addSlot(RealLiveTable realLiveTable, int i) {
        RealLiveTable[] realLiveTableArr = this.shardMap.get(Integer.valueOf(i));
        if (realLiveTableArr == null) {
            this.shardMap.put(Integer.valueOf(i), new RealLiveTable[]{realLiveTable});
            return;
        }
        RealLiveTable[] realLiveTableArr2 = new RealLiveTable[realLiveTableArr.length + 1];
        System.arraycopy(realLiveTableArr, 0, realLiveTableArr2, 0, realLiveTableArr.length + 1);
        realLiveTableArr2[realLiveTableArr2.length - 1] = realLiveTable;
        this.shardMap.put(Integer.valueOf(i), realLiveTableArr2);
    }

    protected RealLiveTable[] hashAll(String str) {
        int abs = Math.abs(str.hashCode()) % NUM_SLOTS;
        RealLiveTable[] realLiveTableArr = this.shardMap.get(Integer.valueOf(abs));
        if (realLiveTableArr != null && realLiveTableArr.length != 0) {
            return realLiveTableArr;
        }
        Log.Warn(this, "cannot map keyHash " + abs);
        return null;
    }

    protected RealLiveTable hashAny(String str) {
        int abs = Math.abs(str.hashCode()) % NUM_SLOTS;
        RealLiveTable[] realLiveTableArr = this.shardMap.get(Integer.valueOf(abs));
        if (realLiveTableArr != null && realLiveTableArr.length != 0) {
            return realLiveTableArr[0];
        }
        Log.Warn(this, "cannot map keyHash " + abs);
        return null;
    }

    @Override // org.nustaq.reallive.api.ChangeReceiver
    public void receive(ChangeMessage changeMessage) {
        if (changeMessage.getType() != 3) {
            hashAny(changeMessage.getKey()).receive(changeMessage);
        }
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise resizeIfLoadFactorLarger(double d, long j) {
        ArrayList arrayList = new ArrayList();
        this.shards.forEach(realLiveTable -> {
            arrayList.add(realLiveTable.resizeIfLoadFactorLarger(d, j));
        });
        return Actors.all(arrayList);
    }

    @Override // org.nustaq.reallive.api.ChangeStream
    public void subscribe(Subscriber subscriber) {
        if (subscriber.getFilter() instanceof RLNoQueryPredicate) {
            subscriber.getReceiver().receive(new QueryDoneMessage());
            this.proc.startListening(subscriber);
        } else {
            this.proc.startListening(subscriber);
            forEach(subscriber.getFilter(), (record, obj) -> {
                if (Actors.isResult(obj)) {
                    subscriber.getReceiver().receive(new AddMessage(0, record));
                } else if (Actors.isComplete(obj)) {
                    subscriber.getReceiver().receive(new QueryDoneMessage());
                }
            });
        }
    }

    @Override // org.nustaq.reallive.api.ChangeStream
    public void unsubscribe(Subscriber subscriber) {
        this.proc.unsubscribe(subscriber);
    }

    public void realUnsubscribe(Subscriber subscriber) {
        if (subscriber == null) {
            Log.Warn(this, "unsubscribed is null");
            return;
        }
        List<Subscriber> list = this.subsMap.get(subscriber);
        if (list == null) {
            Log.Warn(this, "unknown subscriber to unsubscribe " + subscriber);
            return;
        }
        for (int i = 0; i < list.size(); i++) {
            Subscriber subscriber2 = list.get(i);
            this.shards.forEach(realLiveTable -> {
                realLiveTable.unsubscribe(subscriber2);
            });
        }
        this.subsMap.remove(subscriber);
    }

    @Override // org.nustaq.reallive.api.RealLiveTable
    public IPromise atomic(String str, RLFunction<Record, Object> rLFunction) {
        return hashAny(str).atomic(str, rLFunction);
    }

    @Override // org.nustaq.reallive.api.RealLiveTable
    public void atomicUpdate(RLPredicate<Record> rLPredicate, RLFunction<Record, Boolean> rLFunction) {
        this.shards.forEach(realLiveTable -> {
            realLiveTable.atomicUpdate(rLPredicate, rLFunction);
        });
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public void put(int i, String str, Object... objArr) {
        hashAny(str).receive(new PutMessage(i, RLUtil.get().record(str, objArr)));
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public void merge(int i, String str, Object... objArr) {
        hashAny(str).receive(RLUtil.get().addOrUpdate(i, str, objArr));
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise<Boolean> add(int i, String str, Object... objArr) {
        return hashAny(str).add(i, str, objArr);
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise<Boolean> addRecord(int i, Record record) {
        if (record instanceof RecordWrapper) {
            record = ((RecordWrapper) record).getRecord();
        }
        return hashAny(record.getKey()).addRecord(i, record);
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public void mergeRecord(int i, Record record) {
        if (record instanceof RecordWrapper) {
            record = ((RecordWrapper) record).getRecord();
        }
        hashAny(record.getKey()).receive(new AddMessage(i, true, record));
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public void setRecord(int i, Record record) {
        if (record instanceof RecordWrapper) {
            record = ((RecordWrapper) record).getRecord();
        }
        hashAny(record.getKey()).receive(new PutMessage(i, record));
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public void update(int i, String str, Object... objArr) {
        hashAny(str).receive(RLUtil.get().update(i, str, objArr));
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise<Record> take(int i, String str) {
        return hashAny(str).take(i, str);
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public void remove(int i, String str) {
        hashAny(str).receive(RLUtil.get().remove(i, str));
    }

    @Override // org.nustaq.reallive.api.RealLiveStreamActor
    public <T> void forEachWithSpore(Spore<Record, T> spore) {
        spore.setExpectedFinishCount(this.shards.size());
        this.shards.forEach(realLiveTable -> {
            realLiveTable.forEachWithSpore(spore);
        });
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise ping() {
        new ArrayList();
        return Actors.all((List) this.shards.stream().map(realLiveTable -> {
            return realLiveTable.ping();
        }).collect(Collectors.toList()));
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise<TableDescription> getDescription() {
        return new Promise(this.description);
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public void stop() {
        this.shards.forEach(realLiveTable -> {
            realLiveTable.stop();
        });
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise<StorageStats> getStats() {
        Promise promise = new Promise();
        try {
            Actors.all((List) this.shards.stream().map(realLiveTable -> {
                return realLiveTable.getStats();
            }).collect(Collectors.toList())).then((list, obj) -> {
                if (list == null) {
                    promise.reject(obj);
                    return;
                }
                StorageStats storageStats = new StorageStats();
                list.stream().map(iPromise -> {
                    return (StorageStats) iPromise.get();
                }).forEach(storageStats2 -> {
                    storageStats2.addTo(storageStats);
                });
                promise.resolve(storageStats);
            });
        } catch (Exception e) {
            Log.Warn(this, e);
            promise.reject(e);
        }
        return promise;
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise<Record> get(String str) {
        if (str == null) {
            return null;
        }
        return hashAny(str).get(str);
    }

    @Override // org.nustaq.reallive.api.SafeRealLiveTable
    public IPromise<Long> size() {
        Promise promise = new Promise();
        Actors.all((List) this.shards.stream().map(realLiveTable -> {
            return realLiveTable.size();
        }).collect(Collectors.toList())).then(list -> {
            promise.resolve(Long.valueOf(list.stream().mapToLong(iPromise -> {
                return ((Long) iPromise.get()).longValue();
            }).sum()));
        });
        return promise;
    }

    public void removeNode(Actor actor) {
        ((List) this.shards.stream().filter(realLiveTable -> {
            return ((Actor) realLiveTable).__clientConnection.getFacadeProxy().getActorRef() == actor.getActorRef();
        }).collect(Collectors.toList())).forEach(realLiveTable2 -> {
            removeTableShard(realLiveTable2);
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2016390939:
                if (implMethodName.equals("lambda$new$28280fd9$1")) {
                    z = true;
                    break;
                }
                break;
            case -433838800:
                if (implMethodName.equals("lambda$null$fa1317f$1")) {
                    z = false;
                    break;
                }
                break;
            case 503737328:
                if (implMethodName.equals("lambda$subscribe$92f55504$1")) {
                    z = 2;
                    break;
                }
                break;
            case 583111225:
                if (implMethodName.equals("lambda$new$eaf2bb03$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1502044240:
                if (implMethodName.equals("lambda$getStats$718ebbdb$1")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case ChangeMessage.ADD /* 0 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/reallive/api/ChangeReceiver") && serializedLambda.getFunctionalInterfaceMethodName().equals("receive") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/nustaq/reallive/api/ChangeMessage;)V") && serializedLambda.getImplClass().equals("org/nustaq/reallive/impl/actors/ShardedTable") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lorg/nustaq/reallive/api/ChangeReceiver;Lorg/nustaq/reallive/api/ChangeMessage;)V")) {
                    AtomicInteger atomicInteger = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    ChangeReceiver changeReceiver = (ChangeReceiver) serializedLambda.getCapturedArg(1);
                    return changeMessage -> {
                        if (changeMessage.getType() != 3) {
                            changeReceiver.receive(changeMessage);
                        } else if (atomicInteger.decrementAndGet() == 0) {
                            changeReceiver.receive(changeMessage);
                        }
                    };
                }
                break;
            case ChangeMessage.REMOVE /* 1 */:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/reallive/api/ChangeReceiver") && serializedLambda.getFunctionalInterfaceMethodName().equals("receive") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/nustaq/reallive/api/ChangeMessage;)V") && serializedLambda.getImplClass().equals("org/nustaq/reallive/impl/actors/ShardedTable") && serializedLambda.getImplMethodSignature().equals("(Lorg/nustaq/reallive/api/ChangeMessage;)V")) {
                    ShardedTable shardedTable = (ShardedTable) serializedLambda.getCapturedArg(0);
                    return changeMessage2 -> {
                        globalListen(changeMessage2);
                    };
                }
                break;
            case ChangeMessage.UPDATE /* 2 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/kontraktor/Callback") && serializedLambda.getFunctionalInterfaceMethodName().equals("complete") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/nustaq/reallive/impl/actors/ShardedTable") && serializedLambda.getImplMethodSignature().equals("(Lorg/nustaq/reallive/api/Subscriber;Lorg/nustaq/reallive/api/Record;Ljava/lang/Object;)V")) {
                    Subscriber subscriber = (Subscriber) serializedLambda.getCapturedArg(0);
                    return (record, obj) -> {
                        if (Actors.isResult(obj)) {
                            subscriber.getReceiver().receive(new AddMessage(0, record));
                        } else if (Actors.isComplete(obj)) {
                            subscriber.getReceiver().receive(new QueryDoneMessage());
                        }
                    };
                }
                break;
            case ChangeMessage.QUERYDONE /* 3 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/kontraktor/Callback") && serializedLambda.getFunctionalInterfaceMethodName().equals("complete") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/nustaq/reallive/impl/actors/ShardedTable") && serializedLambda.getImplMethodSignature().equals("(Lorg/nustaq/kontraktor/Promise;Ljava/util/List;Ljava/lang/Object;)V")) {
                    Promise promise = (Promise) serializedLambda.getCapturedArg(0);
                    return (list, obj2) -> {
                        if (list == null) {
                            promise.reject(obj2);
                            return;
                        }
                        StorageStats storageStats = new StorageStats();
                        list.stream().map(iPromise -> {
                            return (StorageStats) iPromise.get();
                        }).forEach(storageStats2 -> {
                            storageStats2.addTo(storageStats);
                        });
                        promise.resolve(storageStats);
                    };
                }
                break;
            case ChangeMessage.PUT /* 4 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/reallive/api/RLNoQueryPredicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("test") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/nustaq/reallive/impl/actors/ShardedTable") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Z")) {
                    return obj3 -> {
                        return true;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
