package org.occurrent.eventstore.mongodb.spring.blocking;

import com.mongodb.MongoException;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.Indexes;
import io.cloudevents.CloudEvent;
import java.net.URI;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.occurrent.cloudevents.OccurrentExtensionGetter;
import org.occurrent.eventstore.api.LongConditionEvaluator;
import org.occurrent.eventstore.api.SortBy;
import org.occurrent.eventstore.api.WriteCondition;
import org.occurrent.eventstore.api.WriteConditionNotFulfilledException;
import org.occurrent.eventstore.api.WriteResult;
import org.occurrent.eventstore.api.blocking.EventStore;
import org.occurrent.eventstore.api.blocking.EventStoreOperations;
import org.occurrent.eventstore.api.blocking.EventStoreQueries;
import org.occurrent.eventstore.api.blocking.EventStream;
import org.occurrent.eventstore.mongodb.internal.MongoExceptionTranslator;
import org.occurrent.eventstore.mongodb.internal.OccurrentCloudEventMongoDocumentMapper;
import org.occurrent.eventstore.mongodb.internal.StreamVersionDiff;
import org.occurrent.filter.Filter;
import org.occurrent.functionalsupport.internal.FunctionalSupport;
import org.occurrent.mongodb.spring.filterqueryconversion.internal.FilterConverter;
import org.occurrent.mongodb.spring.sortconversion.internal.SortConverter;
import org.occurrent.mongodb.timerepresentation.TimeRepresentation;
import org.springframework.dao.DataAccessException;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.SessionSynchronization;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;

/* loaded from: input_file:org/occurrent/eventstore/mongodb/spring/blocking/SpringMongoEventStore.class */
public class SpringMongoEventStore implements EventStore, EventStoreOperations, EventStoreQueries {
    private static final String ID = "_id";
    private final MongoTemplate mongoTemplate;
    private final String eventStoreCollectionName;
    private final TimeRepresentation timeRepresentation;
    private final TransactionTemplate transactionTemplate;
    private final Function<Query, Query> queryOptions;
    private final Function<Query, Query> readOptions;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/occurrent/eventstore/mongodb/spring/blocking/SpringMongoEventStore$EventStreamImpl.class */
    public static class EventStreamImpl<T> implements EventStream<T> {
        private String _id;
        private long version;
        private Stream<T> events;

        EventStreamImpl() {
        }

        EventStreamImpl(String str, long j, Stream<T> stream) {
            this._id = str;
            this.version = j;
            this.events = stream;
        }

        public String id() {
            return this._id;
        }

        public long version() {
            return this.version;
        }

        public Stream<T> events() {
            return this.events;
        }

        public void set_id(String str) {
            this._id = str;
        }

        public void setVersion(long j) {
            this.version = j;
        }

        public void setEvents(Stream<T> stream) {
            this.events = stream;
        }
    }

    public SpringMongoEventStore(MongoTemplate mongoTemplate, EventStoreConfig eventStoreConfig) {
        Objects.requireNonNull(mongoTemplate, MongoTemplate.class.getSimpleName() + " cannot be null");
        Objects.requireNonNull(mongoTemplate, EventStoreConfig.class.getSimpleName() + " cannot be null");
        this.mongoTemplate = mongoTemplate;
        this.eventStoreCollectionName = eventStoreConfig.eventStoreCollectionName;
        this.transactionTemplate = eventStoreConfig.transactionTemplate;
        this.timeRepresentation = eventStoreConfig.timeRepresentation;
        this.queryOptions = eventStoreConfig.queryOptions;
        this.readOptions = eventStoreConfig.readOptions;
        initializeEventStore(this.eventStoreCollectionName, mongoTemplate);
    }

    public EventStream<CloudEvent> read(String str, int i, int i2) {
        return ((EventStream) Objects.requireNonNull(readEventStream(str, i, i2))).map(document -> {
            return OccurrentCloudEventMongoDocumentMapper.convertToCloudEvent(this.timeRepresentation, document);
        });
    }

    public WriteResult write(String str, WriteCondition writeCondition, Stream<CloudEvent> stream) {
        BiFunction biFunction;
        StreamVersionDiff streamVersionDiff;
        if (writeCondition == null) {
            throw new IllegalArgumentException(WriteCondition.class.getSimpleName() + " cannot be null");
        }
        if (writeCondition.isAnyStreamVersion()) {
            List<CloudEvent> list = stream.toList();
            biFunction = (stream2, l) -> {
                return convertCloudEventsToDocuments(str, list.stream(), l.longValue());
            };
        } else {
            biFunction = (stream3, l2) -> {
                return convertCloudEventsToDocuments(str, stream3, l2.longValue());
            };
        }
        BiFunction biFunction2 = biFunction;
        TransactionCallback transactionCallback = transactionStatus -> {
            long j;
            long currentStreamVersion = currentStreamVersion(str);
            if (!isFulfilled(currentStreamVersion, writeCondition)) {
                throw new WriteConditionNotFulfilledException(str, currentStreamVersion, writeCondition, String.format("%s was not fulfilled. Expected version %s but was %s.", WriteCondition.class.getSimpleName(), writeCondition, Long.valueOf(currentStreamVersion)));
            }
            List<Document> list2 = (List) biFunction2.apply(stream, Long.valueOf(currentStreamVersion));
            if (list2.isEmpty()) {
                j = currentStreamVersion;
            } else {
                insertAll(str, currentStreamVersion, writeCondition, list2);
                j = list2.get(list2.size() - 1).getLong("streamversion").longValue();
            }
            return new StreamVersionDiff(currentStreamVersion, j);
        };
        try {
            streamVersionDiff = (StreamVersionDiff) this.transactionTemplate.execute(transactionCallback);
        } catch (WriteConditionNotFulfilledException e) {
            if (!writeCondition.isAnyStreamVersion()) {
                throw e;
            }
            streamVersionDiff = (StreamVersionDiff) this.transactionTemplate.execute(transactionCallback);
        }
        return new WriteResult(str, streamVersionDiff.oldStreamVersion, streamVersionDiff.newStreamVersion);
    }

    public WriteResult write(String str, Stream<CloudEvent> stream) {
        return write(str, WriteCondition.StreamVersionWriteCondition.any(), stream);
    }

    public boolean exists(String str) {
        return this.mongoTemplate.exists(this.queryOptions.apply(streamIdEqualTo(str)), this.eventStoreCollectionName);
    }

    public boolean exists(Filter filter) {
        Objects.requireNonNull(filter, "Filter cannot be null");
        if (filter instanceof Filter.All) {
            return count() > 0;
        }
        return this.mongoTemplate.exists(this.queryOptions.apply(FilterConverter.convertFilterToQuery(this.timeRepresentation, filter)), this.eventStoreCollectionName);
    }

    public void deleteEventStream(String str) {
        Objects.requireNonNull(str, "Stream id cannot be null");
        this.transactionTemplate.executeWithoutResult(transactionStatus -> {
            this.mongoTemplate.remove(Query.query(streamIdEqualToCriteria(str)), this.eventStoreCollectionName);
        });
    }

    public void deleteEvent(String str, URI uri) {
        Objects.requireNonNull(str, "Cloud event id cannot be null");
        Objects.requireNonNull(uri, "Cloud event source cannot be null");
        this.mongoTemplate.remove(cloudEventIdEqualTo(str, uri), this.eventStoreCollectionName);
    }

    public void delete(Filter filter) {
        Objects.requireNonNull(filter, "Filter cannot be null");
        this.mongoTemplate.remove(FilterConverter.convertFilterToQuery(this.timeRepresentation, filter), this.eventStoreCollectionName);
    }

    public Optional<CloudEvent> updateEvent(String str, URI uri, Function<CloudEvent, CloudEvent> function) {
        Function function2 = function3 -> {
            Query cloudEventIdEqualTo = cloudEventIdEqualTo(str, uri);
            Document document = (Document) this.mongoTemplate.findOne(cloudEventIdEqualTo, Document.class, this.eventStoreCollectionName);
            if (document == null) {
                return Optional.empty();
            }
            CloudEvent convertToCloudEvent = OccurrentCloudEventMongoDocumentMapper.convertToCloudEvent(this.timeRepresentation, document);
            CloudEvent cloudEvent = (CloudEvent) function3.apply(convertToCloudEvent);
            if (cloudEvent == null) {
                throw new IllegalArgumentException("Cloud event update function is not allowed to return null");
            }
            if (!Objects.equals(cloudEvent, convertToCloudEvent)) {
                Document convertToDocument = OccurrentCloudEventMongoDocumentMapper.convertToDocument(this.timeRepresentation, OccurrentExtensionGetter.getStreamId(convertToCloudEvent), OccurrentExtensionGetter.getStreamVersion(convertToCloudEvent), cloudEvent);
                convertToDocument.put(ID, document.get(ID));
                this.mongoTemplate.findAndReplace(cloudEventIdEqualTo, convertToDocument, this.eventStoreCollectionName);
            }
            return Optional.of(cloudEvent);
        };
        return (Optional) this.transactionTemplate.execute(transactionStatus -> {
            return (Optional) function2.apply(function);
        });
    }

    public Stream<CloudEvent> query(Filter filter, int i, int i2, SortBy sortBy) {
        Objects.requireNonNull(filter, Filter.class.getSimpleName() + " cannot be null");
        Objects.requireNonNull(sortBy, SortBy.class.getSimpleName() + " cannot be null");
        return readCloudEvents(this.queryOptions.apply(FilterConverter.convertFilterToQuery(this.timeRepresentation, filter)), i, i2, sortBy).map(document -> {
            return OccurrentCloudEventMongoDocumentMapper.convertToCloudEvent(this.timeRepresentation, document);
        });
    }

    public long count(Filter filter) {
        Objects.requireNonNull(filter, "Filter cannot be null");
        if (filter instanceof Filter.All) {
            return ((Long) this.mongoTemplate.execute(this.eventStoreCollectionName, (v0) -> {
                return v0.estimatedDocumentCount();
            })).longValue();
        }
        return this.mongoTemplate.count(this.queryOptions.apply(FilterConverter.convertFilterToQuery(this.timeRepresentation, filter)), this.eventStoreCollectionName);
    }

    private List<Document> convertCloudEventsToDocuments(String str, Stream<CloudEvent> stream, long j) {
        return FunctionalSupport.mapWithIndex(stream, j, pair -> {
            return OccurrentCloudEventMongoDocumentMapper.convertToDocument(this.timeRepresentation, str, ((Long) pair.t1).longValue(), (CloudEvent) pair.t2);
        }).toList();
    }

    private void insertAll(String str, long j, WriteCondition writeCondition, List<Document> list) {
        try {
            this.mongoTemplate.insert(list, this.eventStoreCollectionName);
        } catch (DataAccessException e) {
            MongoException rootCause = e.getRootCause();
            if (!(rootCause instanceof MongoException)) {
                throw e;
            }
            throw MongoExceptionTranslator.translateException(new MongoExceptionTranslator.WriteContext(str, j, writeCondition), rootCause);
        }
    }

    private static boolean isFulfilled(long j, WriteCondition writeCondition) {
        if (writeCondition.isAnyStreamVersion()) {
            return true;
        }
        if (writeCondition instanceof WriteCondition.StreamVersionWriteCondition) {
            return LongConditionEvaluator.evaluate(((WriteCondition.StreamVersionWriteCondition) writeCondition).condition(), j);
        }
        throw new IllegalArgumentException("Invalid " + WriteCondition.class.getSimpleName() + ": " + writeCondition);
    }

    private static Query streamIdEqualTo(String str) {
        return Query.query(streamIdEqualToCriteria(str));
    }

    private static Criteria streamIdEqualToCriteria(String str) {
        return Criteria.where("streamid").is(str);
    }

    private EventStreamImpl<Document> readEventStream(String str, int i, int i2) {
        long currentStreamVersion = currentStreamVersion(str);
        return currentStreamVersion == 0 ? new EventStreamImpl<>(str, 0L, Stream.empty()) : new EventStreamImpl<>(str, currentStreamVersion, readCloudEvents(this.readOptions.apply(Query.query(streamIdEqualToCriteria(str).and("streamversion").lte(Long.valueOf(currentStreamVersion)))), i, i2, SortBy.streamVersion(SortBy.SortDirection.ASCENDING)));
    }

    private long currentStreamVersion(String str) {
        Query apply = this.readOptions.apply(streamIdEqualTo(str));
        apply.fields().include("streamversion");
        Document document = (Document) this.mongoTemplate.findOne(this.queryOptions.apply(apply.with(Sort.by(Sort.Direction.DESC, new String[]{"streamversion"})).limit(1)), Document.class, this.eventStoreCollectionName);
        return document == null ? 0L : document.getLong("streamversion").longValue();
    }

    private Stream<Document> readCloudEvents(Query query, int i, int i2, SortBy sortBy) {
        if (i != 0 || i2 != Integer.MAX_VALUE) {
            query.skip(i).limit(i2);
        }
        return this.mongoTemplate.stream(query.with(SortConverter.convertToSpringSort(sortBy)), Document.class, this.eventStoreCollectionName);
    }

    private static void initializeEventStore(String str, MongoTemplate mongoTemplate) {
        if (!mongoTemplate.collectionExists(str)) {
            mongoTemplate.createCollection(str);
        }
        MongoCollection collection = mongoTemplate.getCollection(str);
        collection.createIndex(Indexes.compoundIndex(new Bson[]{Indexes.ascending(new String[]{"id"}), Indexes.ascending(new String[]{"source"})}), new IndexOptions().unique(true));
        collection.createIndex(Indexes.compoundIndex(new Bson[]{Indexes.ascending(new String[]{"streamid"}), Indexes.ascending(new String[]{"streamversion"})}), new IndexOptions().unique(true));
        mongoTemplate.setSessionSynchronization(SessionSynchronization.ALWAYS);
    }

    private static Query cloudEventIdEqualTo(String str, URI uri) {
        return Query.query(Criteria.where("id").is(str).and("source").is(uri));
    }
}
