package cc.redberry.pipe.blocks;

import cc.redberry.pipe.InputPort;
import cc.redberry.pipe.OutputPort;
import cc.redberry.pipe.OutputPortCloseable;
import cc.redberry.pipe.util.ExceptionHandler;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:cc/redberry/pipe/blocks/Merger.class */
public final class Merger<E> implements OutputPortCloseable<E> {
    final Buffer<E> buffer;
    final List<OutputPort<? extends E>> inputPorts;
    final List<Future<?>> futures;
    final List<Thread> threads;
    final List<O2ITransmitter> transmitters;
    private final ExceptionHandler<O2ITransmitter> exceptionsHandler;
    private final AtomicBoolean closed;
    List<ExecutorService> targetExecutors;
    private boolean failSafe;
    private volatile RuntimeException thrown;

    public Merger() {
        this(Buffer.DEFAULT_SIZE);
    }

    public Merger(int i) {
        this.inputPorts = new ArrayList();
        this.futures = new ArrayList();
        this.threads = new ArrayList();
        this.transmitters = new ArrayList();
        this.exceptionsHandler = new ExceptionHandler<O2ITransmitter>() { // from class: cc.redberry.pipe.blocks.Merger.1
            @Override // cc.redberry.pipe.util.ExceptionHandler
            public void handle(RuntimeException runtimeException, O2ITransmitter o2ITransmitter) {
                if (Merger.this.thrown == null) {
                    Merger.this.thrown = runtimeException;
                }
                Merger.this.close();
            }
        };
        this.closed = new AtomicBoolean(false);
        this.targetExecutors = new ArrayList();
        this.failSafe = false;
        this.thrown = null;
        this.buffer = new Buffer<>(i);
    }

    public void setFailSafe(boolean z) {
        this.failSafe = z;
    }

    public void merge(OutputPort<E> outputPort, ExecutorService executorService) {
        if (this.targetExecutors == null) {
            throw new RuntimeException("This merger is already started.");
        }
        this.inputPorts.add(outputPort);
        this.targetExecutors.add(executorService);
    }

    public void merge(OutputPort<E> outputPort) {
        if (this.targetExecutors == null) {
            throw new RuntimeException("This merger is already started.");
        }
        this.inputPorts.add(outputPort);
        this.targetExecutors.add(null);
    }

    public synchronized void start() {
        if (this.targetExecutors == null) {
            throw new RuntimeException("This merger is already started.");
        }
        InputPort[] inputPortArr = new InputPort[this.inputPorts.size()];
        for (int i = 0; i < this.inputPorts.size(); i++) {
            inputPortArr[i] = this.buffer.createInputPort();
        }
        for (int i2 = 0; i2 < this.inputPorts.size(); i2++) {
            ExecutorService executorService = this.targetExecutors.get(i2);
            OutputPort<? extends E> outputPort = this.inputPorts.get(i2);
            List<O2ITransmitter> list = this.transmitters;
            O2ITransmitter o2ITransmitter = new O2ITransmitter(outputPort, inputPortArr[i2], this.exceptionsHandler);
            list.add(o2ITransmitter);
            if (executorService == null) {
                Thread thread = new Thread(o2ITransmitter);
                this.threads.add(thread);
                thread.start();
            } else {
                this.futures.add(executorService.submit(o2ITransmitter));
            }
        }
        this.targetExecutors = null;
    }

    public void join() throws InterruptedException {
        try {
            Iterator<Future<?>> it = this.futures.iterator();
            while (it.hasNext()) {
                it.next().get();
            }
            Iterator<Thread> it2 = this.threads.iterator();
            while (it2.hasNext()) {
                it2.next().join();
            }
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
        } catch (ExecutionException e) {
            throw new RuntimeException("This expression should not be thrown.", e);
        }
    }

    @Override // cc.redberry.pipe.OutputPort
    public E take() {
        E take = this.buffer.take();
        if (take != null) {
            return take;
        }
        if (this.thrown == null) {
            return null;
        }
        synchronized (this) {
            if (this.thrown != null) {
                RuntimeException runtimeException = this.thrown;
                this.thrown = null;
                throw runtimeException;
            }
        }
        return null;
    }

    @Override // cc.redberry.pipe.OutputPortCloseable, java.lang.AutoCloseable
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            for (OutputPort<? extends E> outputPort : this.inputPorts) {
                if (outputPort instanceof OutputPortCloseable) {
                    ((OutputPortCloseable) outputPort).close();
                }
            }
            Iterator<O2ITransmitter> it = this.transmitters.iterator();
            while (it.hasNext()) {
                it.next().stop();
            }
            if (this.failSafe) {
                return;
            }
            this.buffer.close();
        }
    }

    public String toString() {
        return this.buffer.toString();
    }
}
