package org.nustaq.reallive.impl.actors;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.Spore;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.reallive.impl.RLUtil;
import org.nustaq.reallive.impl.storage.StorageStats;
import org.nustaq.reallive.interfaces.ChangeMessage;
import org.nustaq.reallive.interfaces.ChangeReceiver;
import org.nustaq.reallive.interfaces.Mutation;
import org.nustaq.reallive.interfaces.RLConsumer;
import org.nustaq.reallive.interfaces.RLPredicate;
import org.nustaq.reallive.interfaces.RealLiveTable;
import org.nustaq.reallive.interfaces.Record;
import org.nustaq.reallive.interfaces.Subscriber;
import org.nustaq.reallive.interfaces.TableDescription;
import org.nustaq.reallive.messages.AddMessage;
import org.nustaq.reallive.messages.PutMessage;
import org.nustaq.reallive.records.RecordWrapper;

/* loaded from: input_file:org/nustaq/reallive/impl/actors/TableSharding.class */
public class TableSharding<K> implements RealLiveTable<K> {
    ShardFunc<K> func;
    RealLiveTable<K>[] shards;
    final ConcurrentHashMap<Subscriber, List<Subscriber>> subsMap = new ConcurrentHashMap<>();
    private TableDescription description;

    /* loaded from: input_file:org/nustaq/reallive/impl/actors/TableSharding$ShardMutation.class */
    protected class ShardMutation implements Mutation<K> {
        protected ShardMutation() {
        }

        @Override // org.nustaq.reallive.interfaces.Mutation
        public IPromise<Boolean> putCAS(RLPredicate<Record<K>> rLPredicate, K k, Object... objArr) {
            return TableSharding.this.shards[TableSharding.this.func.apply(k)].getMutation().putCAS(rLPredicate, k, objArr);
        }

        @Override // org.nustaq.reallive.interfaces.Mutation
        public void put(K k, Object... objArr) {
            TableSharding.this.shards[TableSharding.this.func.apply(k)].receive(new PutMessage(RLUtil.get().record(k, objArr)));
        }

        @Override // org.nustaq.reallive.interfaces.Mutation
        public void atomic(K k, RLConsumer rLConsumer) {
            TableSharding.this.shards[TableSharding.this.func.apply(k)].getMutation().atomic(k, rLConsumer);
        }

        @Override // org.nustaq.reallive.interfaces.Mutation
        public void addOrUpdate(K k, Object... objArr) {
            TableSharding.this.shards[TableSharding.this.func.apply(k)].receive(RLUtil.get().addOrUpdate(k, objArr));
        }

        @Override // org.nustaq.reallive.interfaces.Mutation
        public void add(K k, Object... objArr) {
            TableSharding.this.shards[TableSharding.this.func.apply(k)].receive(RLUtil.get().add(k, objArr));
        }

        @Override // org.nustaq.reallive.interfaces.Mutation
        public void add(Record<K> record) {
            if (record instanceof RecordWrapper) {
                record = ((RecordWrapper) record).getRecord();
            }
            TableSharding.this.shards[TableSharding.this.func.apply(record.getKey())].receive(new AddMessage(record));
        }

        @Override // org.nustaq.reallive.interfaces.Mutation
        public void addOrUpdateRec(Record<K> record) {
            if (record instanceof RecordWrapper) {
                record = ((RecordWrapper) record).getRecord();
            }
            TableSharding.this.shards[TableSharding.this.func.apply(record.getKey())].receive(new AddMessage(true, record));
        }

        @Override // org.nustaq.reallive.interfaces.Mutation
        public void put(Record<K> record) {
            if (record instanceof RecordWrapper) {
                record = ((RecordWrapper) record).getRecord();
            }
            TableSharding.this.shards[TableSharding.this.func.apply(record.getKey())].receive(new PutMessage(record));
        }

        @Override // org.nustaq.reallive.interfaces.Mutation
        public void update(K k, Object... objArr) {
            TableSharding.this.shards[TableSharding.this.func.apply(k)].receive(RLUtil.get().update(k, objArr));
        }

        @Override // org.nustaq.reallive.interfaces.Mutation
        public void remove(K k) {
            TableSharding.this.shards[TableSharding.this.func.apply(k)].receive(RLUtil.get().remove(k));
        }
    }

    public TableSharding(ShardFunc<K> shardFunc, RealLiveTable<K>[] realLiveTableArr, TableDescription tableDescription) {
        this.func = shardFunc;
        this.shards = realLiveTableArr;
        this.description = tableDescription;
    }

    @Override // org.nustaq.reallive.interfaces.ChangeReceiver
    public void receive(ChangeMessage<K> changeMessage) {
        if (changeMessage.getType() != 3) {
            this.shards[this.func.apply(changeMessage.getKey())].receive(changeMessage);
        }
    }

    @Override // org.nustaq.reallive.interfaces.ChangeStream
    public void subscribe(Subscriber<K> subscriber) {
        AtomicInteger atomicInteger = new AtomicInteger(this.shards.length);
        ChangeReceiver<K> receiver = subscriber.getReceiver();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.shards.length; i++) {
            RealLiveTable<K> realLiveTable = this.shards[i];
            Subscriber<K> subscriber2 = new Subscriber<>(subscriber.getFilter(), changeMessage -> {
                if (changeMessage.getType() != 3) {
                    receiver.receive(changeMessage);
                } else if (atomicInteger.decrementAndGet() == 0) {
                    receiver.receive(changeMessage);
                }
            });
            arrayList.add(subscriber2);
            realLiveTable.subscribe(subscriber2);
        }
        this.subsMap.put(subscriber, arrayList);
    }

    @Override // org.nustaq.reallive.interfaces.ChangeStream
    public void unsubscribe(Subscriber<K> subscriber) {
        if (subscriber == null) {
            Log.Warn(this, "unsubscribed is null");
            return;
        }
        List<Subscriber> list = this.subsMap.get(subscriber);
        if (list == null) {
            Log.Warn(this, "unknown subscriber to unsubscribe " + subscriber);
            return;
        }
        for (int i = 0; i < list.size(); i++) {
            this.shards[i].unsubscribe(list.get(i));
        }
        this.subsMap.remove(subscriber);
    }

    @Override // org.nustaq.reallive.interfaces.RealLiveTable
    public IPromise<Boolean> putCAS(RLPredicate<Record<K>> rLPredicate, K k, Object... objArr) {
        return this.shards[this.func.apply(k)].getMutation().putCAS(rLPredicate, k, objArr);
    }

    @Override // org.nustaq.reallive.interfaces.RealLiveTable
    public void atomic(K k, RLConsumer<Record<K>> rLConsumer) {
        this.shards[this.func.apply(k)].getMutation().atomic(k, rLConsumer);
    }

    @Override // org.nustaq.reallive.interfaces.Mutatable
    public Mutation<K> getMutation() {
        return new ShardMutation();
    }

    @Override // org.nustaq.reallive.interfaces.RecordIterable
    public <T> void forEach(Spore<Record<K>, T> spore) {
        spore.setExpectedFinishCount(this.shards.length);
        for (int i = 0; i < this.shards.length; i++) {
            this.shards[i].forEach(spore);
        }
    }

    @Override // org.nustaq.reallive.interfaces.RealLiveTable
    public IPromise ping() {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.shards.length; i++) {
            arrayList.add(this.shards[i].ping());
        }
        return Actors.all(arrayList);
    }

    @Override // org.nustaq.reallive.interfaces.RealLiveTable
    public IPromise<TableDescription> getDescription() {
        return new Promise(this.description);
    }

    @Override // org.nustaq.reallive.interfaces.RealLiveTable
    public void stop() {
        for (int i = 0; i < this.shards.length; i++) {
            this.shards[i].stop();
        }
    }

    @Override // org.nustaq.reallive.interfaces.RealLiveTable
    public IPromise<StorageStats> getStats() {
        IPromise[] iPromiseArr = (IPromise[]) Actors.all(this.shards.length, num -> {
            return this.shards[num.intValue()].getStats();
        }).await();
        StorageStats storageStats = new StorageStats();
        for (IPromise iPromise : iPromiseArr) {
            ((StorageStats) iPromise.get()).addTo(storageStats);
        }
        return new Promise(storageStats);
    }

    @Override // org.nustaq.reallive.interfaces.AsyncKV
    public IPromise<Record<K>> get(K k) {
        if (k == null) {
            return null;
        }
        return this.shards[this.func.apply(k)].get(k);
    }

    @Override // org.nustaq.reallive.interfaces.AsyncKV
    public IPromise<Long> size() {
        Promise promise = new Promise();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.shards.length; i++) {
            arrayList.add(this.shards[i].size());
        }
        Actors.all(arrayList).then(list -> {
            promise.resolve(Long.valueOf(list.stream().mapToLong(iPromise -> {
                return ((Long) iPromise.get()).longValue();
            }).sum()));
        });
        return promise;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1573890687:
                if (implMethodName.equals("lambda$subscribe$7f74e169$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case ChangeMessage.ADD /* 0 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/reallive/interfaces/ChangeReceiver") && serializedLambda.getFunctionalInterfaceMethodName().equals("receive") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/nustaq/reallive/interfaces/ChangeMessage;)V") && serializedLambda.getImplClass().equals("org/nustaq/reallive/impl/actors/TableSharding") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lorg/nustaq/reallive/interfaces/ChangeReceiver;Lorg/nustaq/reallive/interfaces/ChangeMessage;)V")) {
                    AtomicInteger atomicInteger = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    ChangeReceiver changeReceiver = (ChangeReceiver) serializedLambda.getCapturedArg(1);
                    return changeMessage -> {
                        if (changeMessage.getType() != 3) {
                            changeReceiver.receive(changeMessage);
                        } else if (atomicInteger.decrementAndGet() == 0) {
                            changeReceiver.receive(changeMessage);
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
