package events.dewdrop.streamstore.eventstore;

import com.eventstore.dbclient.AppendToStreamOptions;
import com.eventstore.dbclient.EventStoreDBClient;
import com.eventstore.dbclient.ReadResult;
import com.eventstore.dbclient.ReadStreamOptions;
import com.eventstore.dbclient.StreamNotFoundException;
import com.eventstore.dbclient.SubscribeToStreamOptions;
import com.eventstore.dbclient.SubscriptionListener;
import events.dewdrop.structure.NoStreamException;
import events.dewdrop.structure.datastore.StreamStore;
import events.dewdrop.structure.events.StreamReadResults;
import events.dewdrop.structure.events.WriteEventData;
import events.dewdrop.structure.read.ReadRequest;
import events.dewdrop.structure.subscribe.SubscribeRequest;
import events.dewdrop.structure.write.WriteRequest;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.collections4.ListUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:events/dewdrop/streamstore/eventstore/EventStore.class */
public class EventStore implements StreamStore {

    @Generated
    private static final Logger log = LogManager.getLogger(EventStore.class);
    public static final int BATCH_SIZE = 500;
    private final EventStoreDBClient client;

    public EventStore(EventStoreDBClient eventStoreDBClient) {
        this.client = eventStoreDBClient;
    }

    @Override // events.dewdrop.structure.datastore.StreamStore
    public StreamReadResults read(ReadRequest readRequest) throws NoStreamException {
        return readFromStream(readRequest);
    }

    @Override // events.dewdrop.structure.datastore.StreamStore
    public boolean subscribeToStream(SubscribeRequest subscribeRequest) throws NoStreamException {
        SubscriptionListener createListener = EventStoreUtils.createListener(subscribeRequest.getConsumeEvent());
        SubscribeToStreamOptions subscribeToStreamOptions = SubscribeToStreamOptions.get();
        subscribeToStreamOptions.fromRevision(subscribeRequest.getLastCheckpoint().longValue());
        subscribeToStreamOptions.resolveLinkTos();
        if (subscribeTo(subscribeRequest.getStreamName(), createListener, subscribeToStreamOptions)) {
            return true;
        }
        return subscribeTo(subscribeRequest.getStreamName(), createListener, subscribeToStreamOptions);
    }

    boolean subscribeTo(String str, SubscriptionListener subscriptionListener, SubscribeToStreamOptions subscribeToStreamOptions) throws NoStreamException {
        try {
            this.client.subscribeToStream(str, subscriptionListener, subscribeToStreamOptions).get();
            return true;
        } catch (InterruptedException e) {
            log.error("Stream was interrupted - name:" + str, e);
            Thread.currentThread().interrupt();
            return false;
        } catch (ExecutionException e2) {
            if (e2.getCause() instanceof StreamNotFoundException) {
                return false;
            }
            log.error("There was an execution exception for streamName:{}, Is EventStore up?", str, e2);
            throw new RuntimeException(e2);
        }
    }

    @Override // events.dewdrop.structure.datastore.StreamStore
    public void appendToStream(WriteRequest writeRequest) {
        AppendToStreamOptions appendToStreamOptions = (AppendToStreamOptions) AppendToStreamOptions.get().expectedRevision(writeRequest.getExpectedVersion().longValue());
        String streamName = writeRequest.getStreamName();
        List<WriteEventData> events2 = writeRequest.getEvents();
        List list = (List) events2.stream().map(EventStoreUtils::toEventData).collect(Collectors.toList());
        try {
            if (events2.size() < 500) {
                ListIterator listIterator = list.listIterator();
                log.info("Appending {} events to stream {}, events:{}", Integer.valueOf(events2.size()), streamName, events2.stream().map(writeEventData -> {
                    return writeEventData.getEventType();
                }).collect(Collectors.joining(",")));
                this.client.appendToStream(streamName, appendToStreamOptions, listIterator).get();
            } else {
                Iterator it = ListUtils.partition(list, BATCH_SIZE).iterator();
                while (it.hasNext()) {
                    this.client.appendToStream(streamName, appendToStreamOptions, ((List) it.next()).listIterator());
                }
            }
        } catch (InterruptedException e) {
            log.error("Append was interrupted", e);
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            log.error("Append had an issue", e2);
        }
    }

    StreamReadResults readFromStream(ReadRequest readRequest) {
        try {
            Optional<ReadResult> performRead = performRead(readRequest);
            if (performRead.isEmpty()) {
                log.debug("Request had not results for Stream:{} - request:{}", readRequest.getStreamName(), readRequest);
                return StreamReadResults.empty();
            }
            ReadResult readResult = performRead.get();
            log.debug("Read {} messages from stream:{}", Integer.valueOf(readResult.getEvents().size()), readRequest.getStreamName());
            return EventStoreUtils.toStreamReadResults(readRequest, readResult);
        } catch (NoStreamException e) {
            return StreamReadResults.noStream();
        }
    }

    Optional<ReadResult> performRead(ReadRequest readRequest) throws NoStreamException {
        String streamName = readRequest.getStreamName();
        ReadStreamOptions options = EventStoreUtils.options(readRequest);
        try {
            log.debug("ReadRequest: {}", readRequest);
            return Optional.of((ReadResult) this.client.readStream(streamName, options).get());
        } catch (InterruptedException e) {
            log.error("Stream interrupted", e);
            Thread.currentThread().interrupt();
            return Optional.empty();
        } catch (ExecutionException e2) {
            if (e2.getCause() instanceof StreamNotFoundException) {
                throw new NoStreamException(streamName);
            }
            log.error("There was an issue reading from stream: {}", streamName, e2);
            return Optional.empty();
        }
    }
}
