package org.nustaq.kontraktor.services.rlserver.mongodb;

import com.mongodb.client.model.Filters;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.reactivestreams.client.MongoCollection;
import java.util.concurrent.atomic.AtomicInteger;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.nustaq.kontraktor.Spore;
import org.nustaq.kontraktor.services.rlserver.mongodb.SubscriberHelpers;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.reallive.api.Record;
import org.nustaq.reallive.api.TableDescription;
import org.nustaq.reallive.impl.storage.RecordPersistance;
import org.nustaq.reallive.impl.storage.StorageStats;

/* loaded from: input_file:org/nustaq/kontraktor/services/rlserver/mongodb/MongoPersistance.class */
public class MongoPersistance implements RecordPersistance {
    private final TableDescription description;
    MongoCollection collection;
    ReplaceOptions upsert = new ReplaceOptions().upsert(true);

    public MongoPersistance(MongoCollection mongoCollection, TableDescription tableDescription) {
        this.collection = mongoCollection;
        this.description = tableDescription;
    }

    public Record remove(String str) {
        this.collection.deleteOne(Filters.eq("_id", new ObjectId(str)));
        return null;
    }

    public StorageStats getStats() {
        return new StorageStats().name("mongo:" + this.description.getName());
    }

    public <T> void forEachWithSpore(final Spore<Record, T> spore) {
        final AtomicInteger atomicInteger = new AtomicInteger();
        final long currentTimeMillis = System.currentTimeMillis();
        this.collection.find().subscribe(new SubscriberHelpers.OperationSubscriber() { // from class: org.nustaq.kontraktor.services.rlserver.mongodb.MongoPersistance.1
            @Override // org.nustaq.kontraktor.services.rlserver.mongodb.SubscriberHelpers.ObservableSubscriber
            public void onNext(Object obj) {
                super.onNext(obj);
                atomicInteger.incrementAndGet();
                try {
                    spore.remote(MongoUtil.get().toRecord((Document) obj));
                } catch (Throwable th) {
                    Log.Warn(this, th, "exception in spore " + spore);
                    throw th;
                }
            }

            @Override // org.nustaq.kontraktor.services.rlserver.mongodb.SubscriberHelpers.ObservableSubscriber
            public void onError(Throwable th) {
                spore.complete((Object) null, th);
            }

            @Override // org.nustaq.kontraktor.services.rlserver.mongodb.SubscriberHelpers.ObservableSubscriber
            public void onComplete() {
                Log.Info(this, "" + MongoPersistance.this.description + " count forEach " + atomicInteger + " time:" + (System.currentTimeMillis() - currentTimeMillis));
                spore.finish();
            }
        });
    }

    public RecordPersistance put(String str, Record record) {
        record.internal_setLastModified(System.currentTimeMillis());
        return _put(str, record);
    }

    public RecordPersistance _put(String str, Record record) {
        record.key(str);
        this.collection.replaceOne(Filters.eq("key", str), MongoUtil.get().fromRecord(record), this.upsert).subscribe(new SubscriberHelpers.OperationSubscriber() { // from class: org.nustaq.kontraktor.services.rlserver.mongodb.MongoPersistance.2
            @Override // org.nustaq.kontraktor.services.rlserver.mongodb.SubscriberHelpers.ObservableSubscriber
            public void onError(Throwable th) {
                th.printStackTrace();
            }

            @Override // org.nustaq.kontraktor.services.rlserver.mongodb.SubscriberHelpers.ObservableSubscriber
            public void onComplete() {
            }
        });
        return this;
    }
}
