package org.nustaq.kontraktor.services.rlclient;

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.annotations.CallerSideMethod;
import org.nustaq.kontraktor.services.ServiceActor;
import org.nustaq.kontraktor.services.rlclient.DataClient;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.reallive.api.RLPredicate;
import org.nustaq.reallive.api.RealLiveTable;
import org.nustaq.reallive.api.Record;
import org.nustaq.reallive.api.TableDescription;
import org.nustaq.reallive.impl.actors.ShardedTable;
import org.nustaq.reallive.impl.tablespace.ClusteredTableSpaceClient;
import org.nustaq.reallive.impl.tablespace.TableSpaceActor;
import org.nustaq.reallive.impl.tablespace.TableSpaceSharding;
import org.nustaq.serialization.FSTConfiguration;

/* loaded from: input_file:org/nustaq/kontraktor/services/rlclient/DataClient.class */
public class DataClient<T extends DataClient> extends ClusteredTableSpaceClient<T> {
    DataCfg config;
    ServiceActor hostingService;
    TableSpaceActor[] shards;
    HashMap<String, RealLiveTable> syncTableAccess;

    public IPromise connect(DataCfg dataCfg, TableSpaceActor[] tableSpaceActorArr, ServiceActor serviceActor) {
        this.config = dataCfg;
        this.hostingService = serviceActor;
        this.shards = tableSpaceActorArr;
        this.syncTableAccess = new HashMap<>();
        this.tableSpaceSharding = new TableSpaceSharding(tableSpaceActorArr);
        this.tableSpaceSharding.init().await();
        TableDescription[] schema = dataCfg.getSchema();
        return all(schema.length, num -> {
            return initTable(schema[num.intValue()]);
        });
    }

    private IPromise<Object> initTable(TableDescription tableDescription) {
        Promise promise = new Promise();
        this.tableSpaceSharding.createOrLoadTable(tableDescription).then((realLiveTable, obj) -> {
            if (realLiveTable != null) {
                this.syncTableAccess.put(tableDescription.getName(), realLiveTable);
            }
            promise.complete(realLiveTable, obj);
        });
        return promise;
    }

    @CallerSideMethod
    public RealLiveTable getTable(String str) {
        return getActor().syncTableAccess.get(str);
    }

    @CallerSideMethod
    public RealLiveTable tbl(String str) {
        return getActor().syncTableAccess.get(str);
    }

    public IPromise export(String str) {
        Promise promise = new Promise();
        Actors.exec.execute(() -> {
            File file = new File(str);
            if (file.exists() && (!file.isDirectory() || !file.canWrite())) {
                promise.reject(new RuntimeException("cannot write to " + file + " or not a directory"));
                return;
            }
            file.mkdirs();
            FSTConfiguration createDefaultConfiguration = FSTConfiguration.createDefaultConfiguration();
            Arrays.stream(this.config.getSchema()).forEach(tableDescription -> {
                try {
                    DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(new File(file, tableDescription.getName() + ".oos")));
                    CountDownLatch countDownLatch = new CountDownLatch(this.shards.length);
                    for (int i = 0; i < this.shards.length; i++) {
                        TableSpaceActor tableSpaceActor = this.shards[i];
                        Log.Info(this, "exporting shard " + i + " table " + tableDescription.getName());
                        try {
                            ((RealLiveTable) tableSpaceActor.getTableAsync(tableDescription.getName()).await(60000L)).forEach(record -> {
                                return true;
                            }, (record2, obj) -> {
                                if (record2 != null) {
                                    writeRecord(createDefaultConfiguration, dataOutputStream, record2);
                                } else if (obj == null) {
                                    countDownLatch.countDown();
                                } else {
                                    Log.Warn(this, "error during export " + obj);
                                    countDownLatch.countDown();
                                }
                            });
                        } catch (Exception e) {
                            Log.Error(this, "export failure " + tableDescription.getName() + " shard " + i);
                        }
                    }
                    try {
                        if (!countDownLatch.await(5L, TimeUnit.MINUTES)) {
                            Log.Error(this, "export timed out on table " + tableDescription.getName());
                        }
                        try {
                            dataOutputStream.close();
                        } catch (IOException e2) {
                            Log.Error(this, e2);
                        }
                    } catch (InterruptedException e3) {
                        e3.printStackTrace();
                    }
                } catch (FileNotFoundException e4) {
                    Log.Error(this, e4);
                }
            });
            promise.complete();
        });
        return promise;
    }

    private void writeRecord(FSTConfiguration fSTConfiguration, DataOutputStream dataOutputStream, Record record) {
        try {
            synchronized (dataOutputStream) {
                dataOutputStream.write(31);
                dataOutputStream.write(32);
                dataOutputStream.write(33);
                dataOutputStream.write(34);
                byte[] asByteArray = fSTConfiguration.asByteArray(record);
                dataOutputStream.writeInt(asByteArray.length);
                dataOutputStream.write(asByteArray);
            }
        } catch (IOException e) {
            Log.Error(this, e);
        }
    }

    public IPromise<Integer> getNoShards() {
        return resolve(Integer.valueOf(this.shards.length));
    }

    public void processSharded(String str, RLPredicate<Record> rLPredicate, int i, Callback<Record> callback) {
        this.shards[i].getTableAsync(str).then(realLiveTable -> {
            realLiveTable.forEach(rLPredicate, callback);
        });
    }

    public void nodeDisconnected(Actor actor) {
        this.syncTableAccess.values().forEach(realLiveTable -> {
            ((ShardedTable) realLiveTable).removeNode(actor.getActorRef());
        });
    }

    @CallerSideMethod
    public TableSpaceActor[] getShards() {
        return getActor().shards;
    }

    public void unsubscribe(int i) {
        this.syncTableAccess.values().forEach(realLiveTable -> {
            realLiveTable.unsubscribeById(i);
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -453920019:
                if (implMethodName.equals("lambda$initTable$f7d41c8b$1")) {
                    z = 2;
                    break;
                }
                break;
            case 826225432:
                if (implMethodName.equals("lambda$null$5fe8939f$1")) {
                    z = true;
                    break;
                }
                break;
            case 1911186664:
                if (implMethodName.equals("lambda$null$bb9f4d86$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/kontraktor/Callback") && serializedLambda.getFunctionalInterfaceMethodName().equals("complete") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/nustaq/kontraktor/services/rlclient/DataClient") && serializedLambda.getImplMethodSignature().equals("(Lorg/nustaq/serialization/FSTConfiguration;Ljava/io/DataOutputStream;Ljava/util/concurrent/CountDownLatch;Lorg/nustaq/reallive/api/Record;Ljava/lang/Object;)V")) {
                    DataClient dataClient = (DataClient) serializedLambda.getCapturedArg(0);
                    FSTConfiguration fSTConfiguration = (FSTConfiguration) serializedLambda.getCapturedArg(1);
                    DataOutputStream dataOutputStream = (DataOutputStream) serializedLambda.getCapturedArg(2);
                    CountDownLatch countDownLatch = (CountDownLatch) serializedLambda.getCapturedArg(3);
                    return (record2, obj) -> {
                        if (record2 != null) {
                            writeRecord(fSTConfiguration, dataOutputStream, record2);
                        } else if (obj == null) {
                            countDownLatch.countDown();
                        } else {
                            Log.Warn(this, "error during export " + obj);
                            countDownLatch.countDown();
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/reallive/api/RLPredicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("test") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/nustaq/kontraktor/services/rlclient/DataClient") && serializedLambda.getImplMethodSignature().equals("(Lorg/nustaq/reallive/api/Record;)Z")) {
                    return record -> {
                        return true;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/kontraktor/Callback") && serializedLambda.getFunctionalInterfaceMethodName().equals("complete") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/nustaq/kontraktor/services/rlclient/DataClient") && serializedLambda.getImplMethodSignature().equals("(Lorg/nustaq/reallive/api/TableDescription;Lorg/nustaq/kontraktor/Promise;Lorg/nustaq/reallive/api/RealLiveTable;Ljava/lang/Object;)V")) {
                    DataClient dataClient2 = (DataClient) serializedLambda.getCapturedArg(0);
                    TableDescription tableDescription = (TableDescription) serializedLambda.getCapturedArg(1);
                    Promise promise = (Promise) serializedLambda.getCapturedArg(2);
                    return (realLiveTable, obj2) -> {
                        if (realLiveTable != null) {
                            this.syncTableAccess.put(tableDescription.getName(), realLiveTable);
                        }
                        promise.complete(realLiveTable, obj2);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
