package stream.runtime;

import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.xml.parsers.DocumentBuilderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import stream.data.Data;
import stream.data.DataFactory;
import stream.io.BlockingQueue;
import stream.io.DataStream;
import stream.io.DataStreamQueue;
import stream.runtime.setup.MonitorElementHandler;
import stream.runtime.setup.ObjectFactory;
import stream.runtime.setup.ProcessElementHandler;
import stream.runtime.setup.ProcessorFactory;
import stream.runtime.setup.ServiceElementHandler;
import stream.runtime.setup.ServiceInjection;
import stream.runtime.setup.ServiceReference;
import stream.runtime.setup.StreamElementHandler;
import stream.service.NamingService;
import stream.service.Service;

/* loaded from: input_file:stream/runtime/ProcessContainer.class */
public class ProcessContainer {
    static Logger log = LoggerFactory.getLogger(ProcessContainer.class);
    protected final ObjectFactory objectFactory;
    protected final ProcessorFactory processorFactory;
    protected String name;
    protected final ContainerContext context;
    protected final Map<String, DataStream> streams;
    protected final Map<String, DataStreamQueue> listeners;
    protected final List<AbstractProcess> processes;
    protected final List<ServiceReference> serviceRefs;
    protected final Map<String, ElementHandler> elementHandler;

    public ProcessContainer(URL url) throws Exception {
        this(url, null);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v49, types: [stream.service.NamingService] */
    public ProcessContainer(URL url, Map<String, ElementHandler> map) throws Exception {
        this.objectFactory = ObjectFactory.newInstance();
        this.processorFactory = new ProcessorFactory(this.objectFactory);
        this.name = null;
        this.streams = new LinkedHashMap();
        this.listeners = new LinkedHashMap();
        this.processes = new ArrayList();
        this.serviceRefs = new ArrayList();
        this.elementHandler = new HashMap();
        this.elementHandler.put("Monitor", new MonitorElementHandler(this.objectFactory, this.processorFactory));
        this.elementHandler.put("Process", new ProcessElementHandler(this.objectFactory, this.processorFactory));
        this.elementHandler.put("Stream", new StreamElementHandler(this.objectFactory));
        this.elementHandler.put("Service", new ServiceElementHandler(this.objectFactory));
        if (map != null) {
            this.elementHandler.putAll(map);
        }
        Document parse = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(url.openStream());
        Element documentElement = parse.getDocumentElement();
        if (!documentElement.getNodeName().equalsIgnoreCase("experiment") && !documentElement.getNodeName().equalsIgnoreCase("container")) {
            throw new Exception("Expecting root element to be 'container'!");
        }
        if (documentElement.hasAttribute("id")) {
            this.name = documentElement.getAttribute("id");
        } else {
            this.name = "local";
        }
        DefaultNamingService defaultNamingService = new DefaultNamingService();
        try {
            String attribute = documentElement.getAttribute("namingService");
            if (attribute != null && !attribute.trim().isEmpty()) {
                defaultNamingService = (NamingService) this.objectFactory.create(attribute, new HashMap());
            }
            this.context = new ContainerContext(this.name, defaultNamingService);
            init(parse);
        } catch (Exception e) {
            log.error("Faild to instantiate naming service '{}': {}", documentElement.getAttribute("namingService"), e.getMessage());
            throw new Exception("Faild to instantiate naming service '" + documentElement.getAttribute("namingService") + "': " + e.getMessage());
        }
    }

    public String getName() {
        return this.name;
    }

    public void setName(String str) {
        this.name = str;
    }

    public ContainerContext getContext() {
        return this.context;
    }

    public List<AbstractProcess> getProcesses() {
        return this.processes;
    }

    public List<ServiceReference> getServiceRefs() {
        return this.serviceRefs;
    }

    private void init(Document document) throws Exception {
        Element documentElement = document.getDocumentElement();
        if (documentElement.getAttribute("import") != null) {
            for (String str : documentElement.getAttribute("import").split(",")) {
                if (!str.trim().isEmpty()) {
                    this.objectFactory.addPackage(str);
                }
            }
        }
        if (documentElement.getAttribute("name") == null) {
        }
        this.context.getProperties().putAll(getProperties(documentElement));
        this.objectFactory.addVariables(this.context.getProperties());
        NodeList childNodes = documentElement.getChildNodes();
        if (this.context.getProperties().get("container.datafactory") != null) {
            log.info("Using {} as default DataFactory for this container...", this.context.getProperties().get("container.datafactory"));
            DataFactory.setDefaultDataFactory((DataFactory) Class.forName(this.context.getProperties().get("container.datafactory")).newInstance());
        }
        for (int i = 0; i < childNodes.getLength(); i++) {
            Node item = childNodes.item(i);
            if (item.getNodeType() == 1) {
                Element element = (Element) item;
                for (ElementHandler elementHandler : this.elementHandler.values()) {
                    if (elementHandler.handlesElement(element)) {
                        elementHandler.handleElement(this, element);
                    }
                }
            }
        }
        connectProcesses();
        injectServices();
    }

    protected void connectProcesses() throws Exception {
        log.debug("Wiring process inputs to data-streams...");
        for (AbstractProcess abstractProcess : this.processes) {
            if (abstractProcess instanceof Process) {
                Process process = (Process) abstractProcess;
                String input = process.getInput();
                if (input == null) {
                    throw new RuntimeException("Process '" + process + "' is not connected to any input-stream!");
                }
                Service service = (DataStream) this.streams.get(input);
                if (service == null) {
                    log.debug("No stream defined for name '{}' - creating a listener-queue for key '{}'", input, input);
                    Service blockingQueue = new BlockingQueue();
                    this.listeners.put(input, blockingQueue);
                    setStream(input, blockingQueue);
                    this.context.register(input, blockingQueue);
                    service = blockingQueue;
                }
                process.setDataStream(service);
            }
        }
    }

    protected void injectServices() throws Exception {
        ServiceInjection.injectServices(getServiceRefs(), getContext());
    }

    public void setStream(String str, DataStream dataStream) {
        this.streams.put(str, dataStream);
    }

    protected Map<String, String> getProperties(Element element) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        NodeList childNodes = element.getChildNodes();
        for (int i = 0; i < childNodes.getLength(); i++) {
            Node item = childNodes.item(i);
            if (item instanceof Element) {
                Element element2 = (Element) item;
                if (element2.getNodeName().equalsIgnoreCase("property")) {
                    String attribute = element2.getAttribute("name");
                    String attribute2 = element2.getAttribute("value");
                    if (attribute != null && !"".equals(attribute.trim()) && attribute2 != null && !"".equals(attribute2.trim())) {
                        linkedHashMap.put(attribute, attribute2);
                    }
                }
            }
        }
        log.debug("Found properties: {}", linkedHashMap);
        return linkedHashMap;
    }

    public void run() throws Exception {
        if (this.streams.isEmpty() && this.listeners.isEmpty()) {
            throw new Exception("No data-stream defined!");
        }
        log.debug("Need to handle {} sources: {}", Integer.valueOf(this.streams.size()), this.streams.keySet());
        log.debug("Experiment contains {} stream processes", Integer.valueOf(this.processes.size()));
        log.debug("Initializing all DataStreams...");
        for (String str : this.streams.keySet()) {
            DataStream dataStream = this.streams.get(str);
            log.debug("Initializing stream '{}'", str);
            dataStream.init();
        }
        log.debug("Creating {} active processes...", Integer.valueOf(this.processes.size()));
        long currentTimeMillis = System.currentTimeMillis();
        for (AbstractProcess abstractProcess : this.processes) {
            abstractProcess.setDaemon(true);
            ProcessContextImpl processContextImpl = new ProcessContextImpl(this.context);
            log.debug("Initializing process with process-context...");
            abstractProcess.init(processContextImpl);
            log.debug("Starting stream-process [{}]", abstractProcess);
            abstractProcess.start();
            log.debug("Stream-process started.");
        }
        Thread.sleep(1000L);
        log.debug("waiting for processes to finish...");
        while (!this.processes.isEmpty()) {
            log.debug("{} processes running", Integer.valueOf(this.processes.size()));
            Iterator<AbstractProcess> it = this.processes.iterator();
            while (it.hasNext()) {
                AbstractProcess next = it.next();
                if (!next.isRunning() || (next instanceof Monitor)) {
                    log.debug("Process '{}' is finished.", next);
                    log.debug("Removing finished process {}", next);
                    it.remove();
                } else {
                    log.debug("    {} is still running", next);
                }
            }
            try {
                Thread.sleep(500L);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        log.info("ProcessContainer finished all processes after about {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public Set<String> getStreamListenerNames() {
        return this.listeners.keySet();
    }

    public void dataArrived(String str, Data data) {
        if (this.listeners.containsKey(str)) {
            this.listeners.get(str).process(data);
        } else {
            log.warn("No listener defined for {}", str);
        }
    }

    public void shutdown() {
        synchronized (this.processes) {
            for (AbstractProcess abstractProcess : this.processes) {
                log.debug("Sending SHUTDOWN signal to process {}", abstractProcess);
                try {
                    abstractProcess.finish();
                } catch (Exception e) {
                    log.error("Failed to properly shutdown process: {}", e.getMessage());
                }
            }
        }
        while (!this.processes.isEmpty()) {
            try {
                log.info("Waiting for processes to finish...");
                Thread.sleep(500L);
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }
}
