package de.otto.synapse.messagestore;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Streams;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.message.TextMessage;
import java.util.Set;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.dizitart.no2.Cursor;
import org.dizitart.no2.Document;
import org.dizitart.no2.IndexOptions;
import org.dizitart.no2.IndexType;
import org.dizitart.no2.Nitrite;
import org.dizitart.no2.NitriteCollection;
import org.dizitart.no2.filters.Filters;

/* loaded from: input_file:de/otto/synapse/messagestore/OffHeapIndexingMessageStore.class */
public class OffHeapIndexingMessageStore implements MessageStore {
    private final Nitrite nitrite;
    private final NitriteCollection messages;
    private final ChannelPositions channelPositions;
    private final Indexer indexer;

    public OffHeapIndexingMessageStore(String str) {
        this(str, Indexers.noOpIndexer());
    }

    public OffHeapIndexingMessageStore(String str, Indexer indexer) {
        this.channelPositions = new ChannelPositions();
        this.nitrite = Nitrite.builder().openOrCreate();
        this.messages = this.nitrite.getCollection(str + "-messages");
        indexer.getIndexes().forEach(index -> {
            this.messages.createIndex("_idx_" + index.getName(), IndexOptions.indexOptions(IndexType.NonUnique));
        });
        this.indexer = indexer;
    }

    @Override // de.otto.synapse.messagestore.MessageStore
    public Set<String> getChannelNames() {
        return this.channelPositions.getChannelNames();
    }

    @Override // de.otto.synapse.messagestore.MessageStore
    public ImmutableSet<Index> getIndexes() {
        return this.indexer.getIndexes();
    }

    @Override // de.otto.synapse.messagestore.MessageStore
    public ChannelPosition getLatestChannelPosition(String str) {
        return this.channelPositions.getLatestChannelPosition(str);
    }

    @Override // de.otto.synapse.messagestore.MessageStore
    public Stream<MessageStoreEntry> stream() {
        return toEntryStream(this.messages.find());
    }

    @Override // de.otto.synapse.messagestore.MessageStore
    public Stream<MessageStoreEntry> stream(Index index, String str) {
        return toEntryStream(this.messages.find(Filters.eq("_idx_" + index.getName(), str)));
    }

    @Override // de.otto.synapse.messagestore.MessageStore
    public void add(@Nonnull MessageStoreEntry messageStoreEntry) {
        MessageStoreEntry index = this.indexer.index(messageStoreEntry);
        Document createDocument = Document.createDocument("channelName", index.getChannelName());
        index.getFilterValues().forEach((index2, str) -> {
            createDocument.put("_idx_" + index2.getName(), str);
        });
        createDocument.put("message", messageStoreEntry.getTextMessage());
        this.messages.insert(createDocument, new Document[0]);
        this.channelPositions.updateFrom(messageStoreEntry);
    }

    @Override // de.otto.synapse.messagestore.MessageStore
    public long size() {
        return this.messages.size();
    }

    @Override // de.otto.synapse.messagestore.MessageStore, java.lang.AutoCloseable
    public void close() {
        this.nitrite.close();
    }

    @Nonnull
    private Stream<MessageStoreEntry> toEntryStream(Cursor cursor) {
        return Streams.stream(cursor).map(document -> {
            ImmutableMap.Builder builder = ImmutableMap.builder();
            this.indexer.getIndexes().forEach(index -> {
                String str = (String) document.get("_idx_" + index.getName(), String.class);
                if (str != null) {
                    builder.put(index, str);
                }
            });
            return MessageStoreEntry.of((String) document.get("channelName", String.class), builder.build(), (TextMessage) document.get("message", TextMessage.class));
        });
    }
}
