package de.otto.edison.eventsourcing.compaction;

import de.otto.edison.eventsourcing.CompactingKinesisEventSource;
import de.otto.edison.eventsourcing.consumer.DefaultEventConsumer;
import de.otto.edison.eventsourcing.consumer.Event;
import de.otto.edison.eventsourcing.consumer.StreamPosition;
import de.otto.edison.eventsourcing.s3.SnapshotConsumerService;
import de.otto.edison.eventsourcing.s3.SnapshotReadService;
import de.otto.edison.eventsourcing.s3.SnapshotWriteService;
import de.otto.edison.eventsourcing.state.StateRepository;
import java.time.Instant;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.services.kinesis.KinesisClient;

@ConditionalOnProperty(name = {"edison.eventsourcing.compaction.enabled"}, havingValue = "true")
@Service
/* loaded from: input_file:de/otto/edison/eventsourcing/compaction/CompactionService.class */
public class CompactionService {
    private static final Logger LOG = LoggerFactory.getLogger(CompactionService.class);
    private final SnapshotReadService snapshotReadService;
    private final SnapshotWriteService snapshotWriteService;
    private final SnapshotConsumerService snapshotConsumerService;
    private final KinesisClient kinesisClient;
    private final StateRepository<String> stateRepository;

    @Autowired
    public CompactionService(SnapshotReadService snapshotReadService, SnapshotWriteService snapshotWriteService, SnapshotConsumerService snapshotConsumerService, KinesisClient kinesisClient, StateRepository<String> stateRepository) {
        this.snapshotReadService = snapshotReadService;
        this.snapshotWriteService = snapshotWriteService;
        this.snapshotConsumerService = snapshotConsumerService;
        this.kinesisClient = kinesisClient;
        this.stateRepository = stateRepository;
    }

    public String compact(String str) {
        LOG.info("Start compacting stream {}", str);
        LOG.info(this.stateRepository.getStats());
        this.stateRepository.clear();
        LOG.info("Start loading entries into inMemoryCache from snapshot");
        CompactingKinesisEventSource compactingKinesisEventSource = new CompactingKinesisEventSource(str, String.class, this.snapshotReadService, this.snapshotConsumerService, Function.identity(), this.kinesisClient);
        try {
            StreamPosition consumeAll = compactingKinesisEventSource.consumeAll((Predicate) stopCondition(), (Consumer) new DefaultEventConsumer(str, ".*", this.stateRepository).consumerFunction());
            LOG.info("Finished updating snapshot data. StateRepository now holds {} entries.", Long.valueOf(this.stateRepository.size()));
            return this.snapshotWriteService.takeSnapshot(str, consumeAll, this.stateRepository);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private Predicate<Event<String>> stopCondition() {
        Instant now = Instant.now();
        return event -> {
            if (event != null) {
                return event.arrivalTimestamp().isAfter(now);
            }
            return true;
        };
    }
}
