package cc.renken.pipeio.async.impl;

import cc.renken.pipeio.async.IAsyncFaucet;
import cc.renken.pipeio.async.IAsyncSink;
import cc.renken.pipeio.async.IAsyncTube;
import cc.renken.pipeio.core.IListener;
import cc.renken.pipeio.core.IPipeline;
import cc.renken.pipeio.core.impl.EventNotifier;
import cc.renken.pipeio.core.impl.Scheduler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:cc/renken/pipeio/async/impl/AsyncPipeline.class */
public class AsyncPipeline<RECV, PUSH> implements IPipeline<RECV, PUSH> {
    private static final Logger logger;
    private final String id;
    private final EventNotifier notifier;
    private final AsyncFaucetContainer<RECV, PUSH> asyncFaucetContainer;
    private final LinkedList<AsyncTubeContainer<?, ?, ?, ?>> partHandlers;
    private final AsyncSinkContainer<?, ?> asyncSinkContainer;
    private final Scheduler pipeScheduler;
    private boolean isDeactivated;
    private IPipeline.State lastState;
    static final /* synthetic */ boolean $assertionsDisabled;

    public AsyncPipeline(IAsyncFaucet<RECV, PUSH> iAsyncFaucet, LinkedHashSet<IAsyncTube<?, ?, ?, ?>> linkedHashSet, IAsyncSink<?, ?> iAsyncSink) {
        this(null, iAsyncFaucet, linkedHashSet, iAsyncSink);
    }

    public AsyncPipeline(String str, IAsyncFaucet<RECV, PUSH> iAsyncFaucet, LinkedHashSet<IAsyncTube<?, ?, ?, ?>> linkedHashSet, IAsyncSink<?, ?> iAsyncSink) {
        this.partHandlers = new LinkedList<>();
        this.pipeScheduler = new Scheduler("Pipelinescheduler", id(), this::exceptionEncounteredNotifyListeners);
        this.isDeactivated = true;
        this.lastState = IPipeline.State.DEACTIVATED;
        this.id = str;
        this.notifier = new EventNotifier(str, this::exceptionEncounteredNotifyListeners);
        this.asyncSinkContainer = new AsyncSinkContainer<>(scheduler(), iAsyncSink, this::exceptionEncounteredNotifyListeners);
        AAsyncComponentContainer aAsyncComponentContainer = this.asyncSinkContainer;
        ArrayList arrayList = new ArrayList(linkedHashSet);
        Collections.reverse(arrayList);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            AsyncTubeContainer<?, ?, ?, ?> asyncTubeContainer = new AsyncTubeContainer<>(scheduler(), (IAsyncTube) it.next(), aAsyncComponentContainer, this::exceptionEncounteredNotifyListeners);
            this.partHandlers.addFirst(asyncTubeContainer);
            aAsyncComponentContainer = asyncTubeContainer;
        }
        this.asyncFaucetContainer = new AsyncFaucetContainer<>(this, iAsyncFaucet, !this.partHandlers.isEmpty() ? this.partHandlers.getFirst() : this.asyncSinkContainer, this::exceptionEncounteredNotifyListeners);
    }

    public String id() {
        return this.id;
    }

    public void activate() throws IOException {
        try {
            scheduler().activate();
            scheduler().waitForExec(() -> {
                if (state() != IPipeline.State.DEACTIVATED) {
                    return;
                }
                this.asyncFaucetContainer.activateFaucet();
                setDeactivated(false);
            });
        } catch (ExecutionException | TimeoutException e) {
            Exception exc = e;
            if (exc instanceof ExecutionException) {
                exc = (Exception) ((ExecutionException) exc).getCause();
            }
            exceptionEncounteredNotifyListeners(exc);
            logger.error("Could not activate pipeline {}", id() != null ? id() : "", exc);
            throw new IOException(exc);
        }
    }

    public IPipeline.State getState() throws IOException {
        try {
            return state();
        } catch (ExecutionException | TimeoutException e) {
            Exception exc = e;
            if (exc instanceof ExecutionException) {
                exc = (Exception) ((ExecutionException) exc).getCause();
            }
            exceptionEncounteredNotifyListeners(exc);
            logger.error("Could not get state from pipeline {}", id() != null ? id() : "", exc);
            throw new IOException(exc);
        }
    }

    private IPipeline.State state() throws ExecutionException, TimeoutException {
        Callable callable = () -> {
            if ($assertionsDisabled || !this.isDeactivated || (this.isDeactivated && !this.asyncFaucetContainer.faucetIsActive())) {
                return this.isDeactivated ? IPipeline.State.DEACTIVATED : this.asyncFaucetContainer.faucetIsActive() ? IPipeline.State.ACTIVE : IPipeline.State.INACTIVE;
            }
            throw new AssertionError();
        };
        if (scheduler().isActive()) {
            return (IPipeline.State) scheduler().waitForExec(callable);
        }
        try {
            return (IPipeline.State) callable.call();
        } catch (Exception e) {
            exceptionEncounteredNotifyListeners(e);
            throw new ExecutionException(e);
        }
    }

    public void deactivate() throws IOException {
        try {
            scheduler().waitForExec(() -> {
                if (state() == IPipeline.State.DEACTIVATED) {
                    return;
                }
                this.asyncFaucetContainer.deactivateFaucet();
                setDeactivated(true);
            });
        } catch (ExecutionException | TimeoutException e) {
            Exception exc = e;
            if (exc instanceof ExecutionException) {
                exc = (Exception) ((ExecutionException) exc).getCause();
            }
            exceptionEncounteredNotifyListeners(exc);
            logger.error("Could not get state from pipeline {}", id() != null ? id() : "", exc);
            throw new IOException(exc);
        }
    }

    private void setDeactivated(boolean z) throws ExecutionException, TimeoutException {
        scheduler().waitForExec(() -> {
            this.isDeactivated = z;
            stateChangedNotifyListeners();
            if (this.isDeactivated) {
                scheduler().deactivate();
            } else {
                scheduler().activate();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void stateChangedNotifyListeners() throws ExecutionException, TimeoutException {
        if (!$assertionsDisabled && !scheduler().isMe()) {
            throw new AssertionError();
        }
        IPipeline.State state = state();
        if (this.lastState == state) {
            return;
        }
        this.lastState = state;
        this.notifier.handleEvent(IListener.EventType.STATE_CHANGED, state);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void receivedNotifyListeners(RECV recv) {
        this.notifier.handleEvent(IListener.EventType.DATA_RECEIVED, recv);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void sendNotifyListeners(PUSH push) {
        this.notifier.handleEvent(IListener.EventType.DATA_SEND, push);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void exceptionEncounteredNotifyListeners(Exception exc) {
        this.notifier.handleEvent(IListener.EventType.EXCEPTION_OCCURRED, exc);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Scheduler scheduler() {
        return this.pipeScheduler;
    }

    public void addListener(IListener iListener) {
        this.notifier.addListener(iListener);
    }

    public void removeListener(IListener iListener) {
        this.notifier.removeListener(iListener);
    }

    static {
        $assertionsDisabled = !AsyncPipeline.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(AsyncPipeline.class);
    }
}
