package org.nustaq.reallive.impl.storage;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.nustaq.kontraktor.Spore;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.offheap.FSTAsciiStringOffheapMap;
import org.nustaq.offheap.FSTSerializedOffheapMap;
import org.nustaq.reallive.api.Record;
import org.nustaq.reallive.api.RecordStorage;
import org.nustaq.serialization.FSTConfiguration;
import org.nustaq.serialization.simpleapi.DefaultCoder;
import org.nustaq.serialization.simpleapi.FSTCoder;
import org.nustaq.serialization.util.FSTUtil;

/* loaded from: input_file:org/nustaq/reallive/impl/storage/OffHeapRecordStorage.class */
public class OffHeapRecordStorage implements RecordStorage {
    private static final boolean DEBUG = false;
    OutputStream protocol;
    FSTCoder coder;
    FSTSerializedOffheapMap<String, Record> store;
    int keyLen;
    Thread _t;

    protected OffHeapRecordStorage() {
    }

    public OffHeapRecordStorage(int i, int i2, int i3) {
        this.keyLen = i;
        init(null, i2, i3, i, false, Record.class);
    }

    public OffHeapRecordStorage(String str, int i, int i2, int i3) {
        this.keyLen = i;
        init(str, i2, i3, i, true, Record.class);
    }

    protected void init(String str, int i, int i2, int i3, boolean z, Class... clsArr) {
        this.keyLen = i3;
        this.coder = new DefaultCoder();
        if (clsArr != null) {
            this.coder.getConf().registerClass(clsArr);
        }
        if (!z) {
            this.store = createMemMap(i, i2, i3);
            return;
        }
        try {
            this.store = createPersistentMap(str, i, i2, i3);
        } catch (Exception e) {
            FSTUtil.rethrow(e);
        }
    }

    protected FSTSerializedOffheapMap<String, Record> createMemMap(int i, int i2, int i3) {
        return new FSTAsciiStringOffheapMap(i3, 1048576 * i, i2, this.coder);
    }

    protected FSTSerializedOffheapMap<String, Record> createPersistentMap(String str, int i, int i2, int i3) throws Exception {
        return new FSTAsciiStringOffheapMap(str, i3, 1048576 * i, i2, this.coder);
    }

    @Override // org.nustaq.reallive.api.RecordStorage
    public StorageStats getStats() {
        return new StorageStats().name(this.store.getFileName()).capacity(this.store.getCapacityMB()).freeMem(this.store.getFreeMem()).usedMem(this.store.getUsedMem()).numElems(this.store.getSize());
    }

    @Override // org.nustaq.reallive.api.RecordStorage
    public RecordStorage put(String str, Record record) {
        record.internal_updateLastModified();
        return _put(str, record);
    }

    public RecordStorage _put(String str, Record record) {
        if (this.protocol != null) {
            try {
                FSTConfiguration.getDefaultConfiguration().encodeToStream(this.protocol, new Object[]{"putRecord", str, record});
                this.protocol.flush();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        this.store.put(str, record);
        return this;
    }

    void checkThread() {
        if (this._t == null) {
            this._t = Thread.currentThread();
        } else if (this._t != Thread.currentThread()) {
            throw new RuntimeException("Unexpected MultiThreading");
        }
    }

    @Override // org.nustaq.reallive.api.RecordStorage
    public Record get(String str) {
        checkThread();
        return (Record) this.store.get(str);
    }

    @Override // org.nustaq.reallive.api.RecordStorage
    public Record remove(String str) {
        if (this.protocol != null) {
            try {
                FSTConfiguration.getDefaultConfiguration().encodeToStream(this.protocol, new Object[]{"remove", str});
                this.protocol.flush();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        Record record = get(str);
        if (record != null) {
            this.store.remove(str);
            record.internal_updateLastModified();
        }
        return record;
    }

    @Override // org.nustaq.reallive.api.RecordStorage
    public long size() {
        return this.store.getSize();
    }

    @Override // org.nustaq.reallive.api.RecordStorage
    public Stream<Record> stream() {
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(this.store.values(), 1024), false);
    }

    @Override // org.nustaq.reallive.api.RecordStorage
    public void resizeIfLoadFactorLarger(double d, long j) {
        if (this.store.getUsedMem() / (this.store.getUsedMem() + this.store.getFreeMem()) >= d) {
            this.store.resizeStore(this.store.getCapacityMB() * 1024 * 1024 * 2, j);
        }
    }

    @Override // org.nustaq.reallive.api.RealLiveStreamActor
    public <T> void forEachWithSpore(Spore<Record, T> spore) {
        Iterator values = this.store.values();
        while (values.hasNext()) {
            try {
                spore.remote((Record) values.next());
                if (spore.isFinished()) {
                    break;
                }
            } catch (Throwable th) {
                Log.Warn(this, th, "exception in spore " + spore);
                throw th;
            }
        }
        spore.finish();
    }
}
