package stream.runtime;

import java.util.HashMap;
import java.util.Iterator;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.Process;
import stream.io.LinkedListSink;
import stream.io.Stream;
import stream.io.multi.LazySeqMultiStream;
import stream.io.multi.MultiStream;
import stream.runtime.setup.factory.ObjectFactory;
import stream.runtime.setup.factory.ProcessorFactory;
import stream.runtime.setup.handler.DProcessElementHandler;
import stream.util.Variables;

/* loaded from: input_file:stream/runtime/BatchTask.class */
public class BatchTask implements FlatMapFunction<String, Data> {
    private static final long serialVersionUID = 6227552219561519374L;
    private static final transient Logger log = LoggerFactory.getLogger(BatchTask.class);
    private final DProcessContext context;
    private final LinkedListSink output = new LinkedListSink();

    public BatchTask(DProcessContext dProcessContext) {
        this.context = dProcessContext;
    }

    public Iterator<Data> call(String str) throws Exception {
        log.info("\n\n==== Worker started ====\nProcess ID: {}\nStream ID: {}\n\n=========================================\n", this.context.getProcessId(), str);
        ProcessContainer prepareContainer = prepareContainer(str, this.output);
        prepareContainer.execute();
        prepareContainer.shutdown();
        return this.output.iterator();
    }

    public ProcessContainer prepareContainer(String str, LinkedListSink linkedListSink) throws Exception {
        Variables loadUserProperties = StreamRuntime.loadUserProperties();
        HashMap hashMap = new HashMap();
        ObjectFactory newInstance = ObjectFactory.newInstance();
        hashMap.put(DProcessElementHandler.ELEMENT_TAG, new DProcessElementHandler(newInstance, new ProcessorFactory(newInstance), true));
        ProcessContainer processContainer = new ProcessContainer(this.context.getXmlDoc(), hashMap, loadUserProperties);
        ProcessContainer.container.remove(processContainer);
        Process process = null;
        Iterator it = processContainer.getProcesses().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Process process2 = (Process) it.next();
            if (((String) process2.getProperties().get("id")).equals(this.context.getProcessId())) {
                process = process2;
                break;
            }
        }
        if (process == null) {
            throw new IllegalArgumentException("Could not find process with ID " + this.context.getProcessId());
        }
        if (!(process.getInput() instanceof MultiStream)) {
            str = null;
        }
        LazySeqMultiStream lazySeqMultiStream = new LazySeqMultiStream() { // from class: stream.runtime.BatchTask.1
            @Override // stream.io.multi.LazySeqMultiStream
            public Data readNext() throws Exception {
                try {
                    Data readNext = super.readNext();
                    if (readNext == null) {
                        return null;
                    }
                    readNext.put(BatchTask.this.context.getWorkerIdKey(), "WorkerSource" + readNext.get(getSourceKey()));
                    return readNext;
                } catch (IllegalMonitorStateException e) {
                    BatchTask.log.warn("Underlying stream threw IllegamMonitorStateException.I ignore this and just end reading the stream", e);
                    return null;
                }
            }
        };
        if (str != null) {
            lazySeqMultiStream.addStream(str, (Stream) process.getInput().getStreams().get(str));
        }
        process.setInput(lazySeqMultiStream);
        if (process.getInput() == null) {
            throw new Exception("Desired stream with id " + lazySeqMultiStream.getId() + ":" + str + " not found among " + lazySeqMultiStream.getStreams().size() + " streams.");
        }
        process.setOutput(linkedListSink);
        processContainer.streams.clear();
        processContainer.streams.put(lazySeqMultiStream.getId(), lazySeqMultiStream);
        processContainer.getProcesses().clear();
        processContainer.getProcesses().add(process);
        return processContainer;
    }
}
