package ch.uzh.ifi.ddis.ifp.streams;

import com.espertech.esper.client.Configuration;
import com.espertech.esper.client.EPAdministrator;
import com.espertech.esper.client.EPRuntime;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.EventType;
import com.espertech.esper.client.soda.EPStatementObjectModel;
import com.espertech.esper.client.time.CurrentTimeEvent;
import java.io.Serializable;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import stream.AbstractProcessor;
import stream.Configurable;
import stream.Data;
import stream.ProcessContext;
import stream.annotations.Parameter;
import stream.io.Queue;
import stream.io.Sink;

/* loaded from: input_file:ch/uzh/ifi/ddis/ifp/streams/EsperProcessor.class */
public class EsperProcessor extends AbstractProcessor implements Configurable {
    private static final Logger _log = LoggerFactory.getLogger(EsperProcessor.class);
    public static final String ESPER_CONFIG_LOCAL_NAME = "esper-configuration";
    public static final String ESPER_NS = "http://www.espertech.com/schema/esper";
    public static final String ESPER_STATEMENT_LOCAL_NAME = "statement";
    public static final String EVENT_TYPE_KEY = "EPEventType";
    protected transient EPServiceProvider _epService;
    protected transient EPRuntime _epRuntime;
    private long _initialTime;
    private String _epProviderURI;
    private Map<String, Sink> _sinksMap;
    private final Configuration _configuration = new Configuration();
    private final Map<String, String> _startTimestampMap = new HashMap();
    private final Map<String, String> _endTimestampMap = new HashMap();
    private long _currentTime = Long.MIN_VALUE;
    private final Set<StatementBean> _esperStatements = new LinkedHashSet();

    public void init(ProcessContext processContext) throws Exception {
        super.init(processContext);
        initEPService();
    }

    private void initEPService() {
        _log.info("Started initializing {} ...", getClass());
        String providerUri = getProviderUri();
        if (providerUri == null || providerUri.isEmpty()) {
            _log.debug("Creating new Esper service from default provider.");
            this._epService = EPServiceProviderManager.getDefaultProvider(this._configuration);
        } else {
            _log.debug("Creating new Esper service from named provider: {}", providerUri);
            this._epService = EPServiceProviderManager.getProvider(providerUri, this._configuration);
        }
        _log.debug("Adding statements to Esper service");
        EPAdministrator ePAdministrator = this._epService.getEPAdministrator();
        for (StatementBean statementBean : this._esperStatements) {
            _log.info("Compiling statement {}", statementBean);
            _log.debug("Compiling Esper statement {}", statementBean.getEsperStatement());
            EPStatementObjectModel compileEPL = ePAdministrator.compileEPL(statementBean.getEsperStatement());
            String name = statementBean.getName();
            EPStatement create = name == null ? ePAdministrator.create(compileEPL) : ePAdministrator.create(compileEPL, name);
            if (statementBean.getOutput() != null) {
                String output = statementBean.getOutput();
                Sink sink = this._sinksMap.get(output);
                String[] propertyNames = create.getEventType().getPropertyNames();
                if (sink == null) {
                    _log.warn("Statement {} declares the sink {} but the corresponding Esper processor does not.", statementBean, output);
                } else {
                    EsperStatementSubscriber esperStatementSubscriber = new EsperStatementSubscriber(sink, propertyNames);
                    _log.info("Adding subscriber {} to statement {}", sink.getId(), statementBean);
                    create.setSubscriber(esperStatementSubscriber);
                }
            }
        }
        _log.debug("Finished adding statements to Esper service");
        _log.debug("Mapping event types to timestamp properties, if any");
        for (EventType eventType : ePAdministrator.getConfiguration().getEventTypes()) {
            String startTimestampPropertyName = eventType.getStartTimestampPropertyName();
            String endTimestampPropertyName = eventType.getEndTimestampPropertyName();
            String str = endTimestampPropertyName == null ? startTimestampPropertyName : endTimestampPropertyName;
            String name2 = eventType.getName();
            _log.debug("Timestamp property for event type {}: {}", name2, str);
            this._startTimestampMap.put(name2, startTimestampPropertyName);
            this._endTimestampMap.put(name2, endTimestampPropertyName);
        }
        _log.debug("Finished mapping event types to timestamp properties, if any");
        this._epRuntime = this._epService.getEPRuntime();
        _log.info("Finished initalizing {}.", getClass());
    }

    public Data process(Data data) {
        String str = (String) data.get("@stream");
        Data createCopy = data.createCopy();
        createCopy.remove("@stream");
        createCopy.remove("@stream:id");
        String str2 = this._startTimestampMap.get(str);
        if (str2 != null) {
            long parseLong = Long.parseLong(((Serializable) data.get(str2)).toString());
            createCopy.put(str2, Long.valueOf(parseLong));
            if (parseLong > this._currentTime) {
                this._currentTime = parseLong;
                CurrentTimeEvent currentTimeEvent = new CurrentTimeEvent(this._currentTime);
                _log.debug("Sending time event new time: {}", Long.valueOf(this._currentTime));
                this._epRuntime.sendEvent(currentTimeEvent);
            }
        }
        String str3 = this._endTimestampMap.get(str);
        if (str3 != null) {
            createCopy.put(str3, Long.valueOf(Long.parseLong(((Serializable) data.get(str3)).toString())));
        }
        this._epRuntime.sendEvent(createCopy, str);
        return null;
    }

    public void resetState() throws Exception {
        super.resetState();
        this._epService.destroy();
        initEPService();
    }

    public void finish() throws Exception {
        _log.debug("Finishing {} ...", getClass());
        if (this._epService != null) {
            this._epService.destroy();
        }
        _log.debug("Finished finishing {}", getClass());
    }

    public void configure(Element element) {
        NodeList elementsByTagName = element.getElementsByTagName(ESPER_CONFIG_LOCAL_NAME);
        for (int i = 0; i < elementsByTagName.getLength(); i++) {
            _log.debug("Configuring Esper with xml node.");
            Node item = elementsByTagName.item(i);
            _log.debug("Esper xml configuration: {}", item.cloneNode(true));
            try {
                Document newDocument = DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument();
                newDocument.appendChild(newDocument.importNode(item, true));
                this._configuration.configure(newDocument);
                _log.debug("Finished configuring Esper with xml node.");
            } catch (ParserConfigurationException e) {
                _log.debug("Error configuring Esper with xml node.");
                throw new RuntimeException(e);
            }
        }
        NodeList elementsByTagName2 = element.getElementsByTagName(ESPER_STATEMENT_LOCAL_NAME);
        for (int i2 = 0; i2 < elementsByTagName2.getLength(); i2++) {
            try {
                this._esperStatements.add(new StatementBeanFactory().createStatementBean(elementsByTagName2.item(i2)));
            } catch (Exception e2) {
                throw new RuntimeException("Error reading Esper statement from configuration!", e2);
            }
        }
    }

    public void setOutput(Queue[] queueArr) {
        this._sinksMap = new HashMap();
        for (int i = 0; i < queueArr.length; i++) {
            this._sinksMap.put(queueArr[i].getId(), queueArr[i]);
        }
    }

    public Configuration getConfiguration() {
        return this._configuration;
    }

    @Parameter(defaultValue = "", description = "The URI of the Esper Runtime.", required = false)
    public String getProviderUri() {
        return this._epProviderURI;
    }

    public void setProviderUri(String str) {
        this._epProviderURI = str;
    }

    public long getInitialTime() {
        return this._initialTime;
    }

    public void setInitialTime(long j) {
        this._initialTime = j;
        this._currentTime = j;
    }
}
