package de.skiptag.roadrunner.disruptor;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.dsl.Disruptor;
import de.skiptag.roadrunner.authorization.Authorization;
import de.skiptag.roadrunner.disruptor.event.RoadrunnerEvent;
import de.skiptag.roadrunner.disruptor.event.RoadrunnerEventTranslator;
import de.skiptag.roadrunner.disruptor.processor.authorization.AuthorizationProcessor;
import de.skiptag.roadrunner.disruptor.processor.distribution.DistributionProcessor;
import de.skiptag.roadrunner.disruptor.processor.eventsourcing.EventSourceProcessor;
import de.skiptag.roadrunner.disruptor.processor.persistence.PersistenceProcessor;
import de.skiptag.roadrunner.messaging.RoadrunnerEndpoint;
import de.skiptag.roadrunner.persistence.Persistence;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.Executors;
import journal.io.api.ClosedJournalException;
import journal.io.api.CompactedDataFileException;
import journal.io.api.Journal;
import journal.io.api.Location;
import org.json.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/skiptag/roadrunner/disruptor/RoadrunnerDisruptor.class */
public class RoadrunnerDisruptor implements ExceptionHandler {
    public static final EventFactory<RoadrunnerEvent> EVENT_FACTORY = new EventFactory<RoadrunnerEvent>() { // from class: de.skiptag.roadrunner.disruptor.RoadrunnerDisruptor.1
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // com.lmax.disruptor.EventFactory
        public RoadrunnerEvent newInstance() {
            return new RoadrunnerEvent();
        }
    };
    private static final Logger logger = LoggerFactory.getLogger(RoadrunnerDisruptor.class);
    private static final int RING_SIZE = 256;
    private EventSourceProcessor eventSourceProcessor;
    private PersistenceProcessor persistenceProcessor;
    private DistributionProcessor distributionProcessor;
    private Disruptor<RoadrunnerEvent> disruptor;
    private AuthorizationProcessor authorizationProcessor;
    private Persistence persistence;
    private Optional<Journal> snapshotJournal = Optional.absent();
    private long currentSequence;

    public Disruptor<RoadrunnerEvent> getDisruptor() {
        return this.disruptor;
    }

    public RoadrunnerDisruptor(File file, Optional<File> optional, Persistence persistence, Authorization authorization) throws IOException {
        this.persistence = persistence;
        initDisruptor(file, persistence, authorization);
        if (optional.isPresent()) {
            restoreFromSnapshot(optional.get(), persistence);
        } else {
            restoreFromJournal();
        }
    }

    private void restoreFromJournal() throws ClosedJournalException, CompactedDataFileException, IOException {
        this.eventSourceProcessor.restore();
    }

    private void restoreFromSnapshot(File file, Persistence persistence) throws IOException, ClosedJournalException, CompactedDataFileException {
        Journal journal2 = new Journal();
        journal2.setDirectory(file);
        this.snapshotJournal = Optional.fromNullable(journal2);
        journal2.open();
        Iterator<Location> it = journal2.undo().iterator();
        if (it.hasNext()) {
            Node node = new Node(new String(journal2.read(it.next(), Journal.ReadType.SYNC)));
            int i = node.getInt("currentEventLogPointer");
            int i2 = node.getInt("currentEventLogDataFileId");
            node.getNode(RoadrunnerEvent.PAYLOAD);
            Node node2 = new Node();
            node2.populate(null, node2);
            persistence.restoreSnapshot(node2);
            this.eventSourceProcessor.setCurrentLocation(new Location(i2, i));
        }
        restoreFromJournal();
    }

    private void initDisruptor(File file, Persistence persistence, Authorization authorization) throws IOException {
        this.disruptor = new Disruptor<>(EVENT_FACTORY, RING_SIZE, Executors.newCachedThreadPool());
        this.authorizationProcessor = new AuthorizationProcessor(authorization, persistence);
        this.eventSourceProcessor = new EventSourceProcessor(file, this);
        this.persistenceProcessor = new PersistenceProcessor(persistence);
        this.distributionProcessor = new DistributionProcessor();
        this.disruptor.handleExceptionsWith(this);
        this.disruptor.handleEventsWith(this.authorizationProcessor).then(this.eventSourceProcessor).then(this.persistenceProcessor).then(this.distributionProcessor);
        this.disruptor.start();
    }

    public void handleEvent(RoadrunnerEvent roadrunnerEvent) {
        Preconditions.checkArgument(roadrunnerEvent.has(RoadrunnerEvent.TYPE), "No type defined in Event");
        RoadrunnerEventTranslator roadrunnerEventTranslator = new RoadrunnerEventTranslator(roadrunnerEvent);
        logger.trace("handling event: " + roadrunnerEvent + "(" + roadrunnerEvent.length() + ")");
        this.disruptor.publishEvent(roadrunnerEventTranslator);
        this.currentSequence = roadrunnerEventTranslator.getSequence();
    }

    public void snapshot() throws IOException, RuntimeException {
        Optional<Location> currentLocation = this.eventSourceProcessor.getCurrentLocation();
        if (this.snapshotJournal.isPresent() || currentLocation.isPresent()) {
            Location location = currentLocation.get();
            Node dumpSnapshot = this.persistence.dumpSnapshot();
            Node node = new Node();
            node.put("currentEventLogPointer", location.getPointer());
            node.put("currentEventLogDataFileId", location.getDataFileId());
            node.put(RoadrunnerEvent.PAYLOAD, dumpSnapshot);
            this.snapshotJournal.get().open();
            this.snapshotJournal.get().write(node.toString().getBytes(), Journal.WriteType.SYNC);
            this.snapshotJournal.get().close();
        }
    }

    public void shutdown() {
        this.disruptor.shutdown();
    }

    public void addEndpoint(RoadrunnerEndpoint roadrunnerEndpoint) {
        this.distributionProcessor.addHandler(roadrunnerEndpoint);
    }

    public void removeEndpoint(RoadrunnerEndpoint roadrunnerEndpoint) {
        this.distributionProcessor.removeHandler(roadrunnerEndpoint);
    }

    public DistributionProcessor getDistributor() {
        return this.distributionProcessor;
    }

    public boolean hasBacklog() {
        return this.currentSequence != this.distributionProcessor.getSequence();
    }

    @Override // com.lmax.disruptor.ExceptionHandler
    public void handleEventException(Throwable th, long j, Object obj) {
        logger.error("Event Exception (msg: " + th.getMessage() + ", sequence: +" + j + ", event: " + obj + ")", th);
    }

    @Override // com.lmax.disruptor.ExceptionHandler
    public void handleOnStartException(Throwable th) {
        logger.error("OnStart Exception (msg: " + th.getMessage() + ")", th);
    }

    @Override // com.lmax.disruptor.ExceptionHandler
    public void handleOnShutdownException(Throwable th) {
        logger.error("OnShutdown Exception (msg: " + th.getMessage() + ")", th);
    }
}
