package stream.runtime;

import java.lang.invoke.SerializedLambda;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.io.RddSink;
import stream.io.Sink;
import stream.io.multi.MultiStream;
import streams.spark.StreamsSparkContext;

/* loaded from: input_file:stream/runtime/StreamingExecution.class */
public class StreamingExecution implements DProcessExecution {
    private static final transient Logger log = LoggerFactory.getLogger(StreamingExecution.class);
    private DProcessContext context;
    private transient JavaDStream<Data> dataStream;
    private transient Sink output;

    @Override // stream.runtime.DProcessExecution
    public void init(DProcessContext dProcessContext, MultiStream multiStream, Sink sink) throws Exception {
        this.context = dProcessContext;
        int i = StreamsSparkContext.sc().getConf().getInt("spark.executor.instances", 1);
        log.info("There are {} executor(s) with {} core(s) each and {} input streams", new Object[]{Integer.valueOf(i), Integer.valueOf(StreamsSparkContext.sc().getConf().getInt("spark.executor.cores", 2)), Integer.valueOf(multiStream.getStreams().size())});
        int numReceivers = StreamsSparkContext.getInstance().getNumReceivers();
        if (numReceivers < 1) {
            numReceivers = i;
        } else {
            log.info("Runtime argument 'numreceivers' is set to {}", Integer.valueOf(numReceivers));
        }
        int min = Math.min(numReceivers, multiStream.getStreams().size());
        log.info("Setting numReceivers to {}", Integer.valueOf(min));
        LinkedList linkedList = new LinkedList();
        int i2 = 0;
        for (String str : multiStream.getStreams().keySet()) {
            int i3 = i2 % min;
            if (linkedList.size() <= i3) {
                linkedList.add(new LinkedList());
            }
            ((List) linkedList.get(i3)).add(str);
            i2++;
        }
        LinkedList linkedList2 = new LinkedList();
        Iterator it = linkedList.iterator();
        while (it.hasNext()) {
            linkedList2.add(StreamsSparkContext.ssc().receiverStream(new StreamingReceiver(this.context, (List) it.next())));
        }
        this.dataStream = StreamsSparkContext.ssc().union((JavaDStream) linkedList2.getFirst(), linkedList2.subList(1, linkedList2.size()));
        this.output = sink;
    }

    @Override // stream.runtime.DProcessExecution
    public void execute() throws Exception {
        StreamingMonitor streamingMonitor = new StreamingMonitor(this.context);
        StreamsSparkContext.ssc().addStreamingListener(streamingMonitor);
        this.dataStream.foreachRDD((javaRDD, time) -> {
            if (!streamingMonitor.isProcessRunning()) {
                Logger logger = log;
                Object[] objArr = new Object[3];
                objArr[0] = this.context.getProcessId();
                objArr[1] = Integer.valueOf(javaRDD.partitions().size());
                objArr[2] = log.isDebugEnabled() ? ", " + javaRDD.count() + " items" : "";
                logger.info("{} received batch without running ({} partitions{}).", objArr);
                return;
            }
            if (this.output == null || !(this.output instanceof RddSink)) {
                List collect = javaRDD.collect();
                javaRDD.unpersist();
                log.info("{} collected {} items from {} partitions.", new Object[]{this.context.getProcessId(), Integer.valueOf(collect.size()), Integer.valueOf(javaRDD.partitions().size())});
                if (this.output == null || collect.size() <= 0) {
                    return;
                }
                this.output.write(collect);
                return;
            }
            ((RddSink) this.output).write((JavaRDD<Data>) javaRDD);
            Logger logger2 = log;
            Object[] objArr2 = new Object[4];
            objArr2[0] = this.context.getProcessId();
            objArr2[1] = Integer.valueOf(javaRDD.partitions().size());
            objArr2[2] = log.isDebugEnabled() ? " and " + javaRDD.count() + " items " : "";
            objArr2[3] = this.output.getId();
            logger2.info("{} forwarded RDD with {} partitions{} to {}.", objArr2);
        });
        StreamsSparkContext.ssc().start();
        streamingMonitor.onProcessStarting();
        while (true) {
            if (StreamsSparkContext.ssc().awaitTerminationOrTimeout(250L)) {
                break;
            }
            if (streamingMonitor.isProcessStopped()) {
                log.info("{} STOPPING.", this.context.getProcessId());
                StreamsSparkContext.ssc().stop(false, true);
                break;
            }
        }
        streamingMonitor.onProcessFinished();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 634966228:
                if (implMethodName.equals("lambda$execute$961e93c3$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/VoidFunction2") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("stream/runtime/StreamingExecution") && serializedLambda.getImplMethodSignature().equals("(Lstream/runtime/StreamingMonitor;Lorg/apache/spark/api/java/JavaRDD;Lorg/apache/spark/streaming/Time;)V")) {
                    StreamingExecution streamingExecution = (StreamingExecution) serializedLambda.getCapturedArg(0);
                    StreamingMonitor streamingMonitor = (StreamingMonitor) serializedLambda.getCapturedArg(1);
                    return (javaRDD, time) -> {
                        if (!streamingMonitor.isProcessRunning()) {
                            Logger logger = log;
                            Object[] objArr = new Object[3];
                            objArr[0] = this.context.getProcessId();
                            objArr[1] = Integer.valueOf(javaRDD.partitions().size());
                            objArr[2] = log.isDebugEnabled() ? ", " + javaRDD.count() + " items" : "";
                            logger.info("{} received batch without running ({} partitions{}).", objArr);
                            return;
                        }
                        if (this.output == null || !(this.output instanceof RddSink)) {
                            List collect = javaRDD.collect();
                            javaRDD.unpersist();
                            log.info("{} collected {} items from {} partitions.", new Object[]{this.context.getProcessId(), Integer.valueOf(collect.size()), Integer.valueOf(javaRDD.partitions().size())});
                            if (this.output == null || collect.size() <= 0) {
                                return;
                            }
                            this.output.write(collect);
                            return;
                        }
                        ((RddSink) this.output).write((JavaRDD<Data>) javaRDD);
                        Logger logger2 = log;
                        Object[] objArr2 = new Object[4];
                        objArr2[0] = this.context.getProcessId();
                        objArr2[1] = Integer.valueOf(javaRDD.partitions().size());
                        objArr2[2] = log.isDebugEnabled() ? " and " + javaRDD.count() + " items " : "";
                        objArr2[3] = this.output.getId();
                        logger2.info("{} forwarded RDD with {} partitions{} to {}.", objArr2);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
