package de.skiptag.roadrunner.disruptor.processor.eventsourcing;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.lmax.disruptor.EventHandler;
import de.skiptag.roadrunner.disruptor.RoadrunnerDisruptor;
import de.skiptag.roadrunner.disruptor.event.RoadrunnerEvent;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import journal.io.api.ClosedJournalException;
import journal.io.api.CompactedDataFileException;
import journal.io.api.Journal;
import journal.io.api.Location;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/skiptag/roadrunner/disruptor/processor/eventsourcing/EventSourceProcessor.class */
public class EventSourceProcessor implements EventHandler<RoadrunnerEvent> {
    private static final Logger logger = LoggerFactory.getLogger(EventSourceProcessor.class);
    private RoadrunnerDisruptor roadrunner;

    /* renamed from: journal, reason: collision with root package name */
    private Journal f0journal = new Journal();
    private Optional<Location> currentLocation = Optional.absent();
    private int snapshotCount = 100;
    private int messageCount = 0;

    public EventSourceProcessor(File file, RoadrunnerDisruptor roadrunnerDisruptor) throws IOException {
        this.f0journal.setDirectory(file);
        this.f0journal.open();
        this.roadrunner = roadrunnerDisruptor;
    }

    @Override // com.lmax.disruptor.EventHandler
    public void onEvent(RoadrunnerEvent roadrunnerEvent, long j, boolean z) throws ClosedJournalException, IOException {
        if (roadrunnerEvent.isFromHistory()) {
            return;
        }
        logger.trace("storing event: " + roadrunnerEvent);
        Location write = this.f0journal.write(roadrunnerEvent.toString().getBytes(), Journal.WriteType.SYNC);
        this.f0journal.sync();
        this.currentLocation = Optional.of(write);
        this.messageCount++;
        if (this.messageCount > this.snapshotCount) {
            this.roadrunner.snapshot();
            this.messageCount = 0;
        }
    }

    public void restore() throws ClosedJournalException, CompactedDataFileException, IOException, RuntimeException {
        Iterator<Location> it = (this.currentLocation.isPresent() ? this.f0journal.redo(this.currentLocation.get()) : this.f0journal.redo()).iterator();
        while (it.hasNext()) {
            RoadrunnerEvent roadrunnerEvent = new RoadrunnerEvent(new String(this.f0journal.read(it.next(), Journal.ReadType.SYNC)));
            roadrunnerEvent.setFromHistory(true);
            Preconditions.checkArgument(roadrunnerEvent.has(RoadrunnerEvent.TYPE), "No type defined in Event");
            this.roadrunner.handleEvent(roadrunnerEvent);
        }
    }

    public Optional<Location> getCurrentLocation() {
        return this.currentLocation;
    }

    public void setCurrentLocation(Location location) {
        this.currentLocation = Optional.fromNullable(location);
    }
}
