package ch.uzh.ifi.ddis.ifp.esper.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.espertech.esper.client.hook.VirtualDataWindow;
import com.espertech.esper.client.hook.VirtualDataWindowContext;
import com.espertech.esper.client.hook.VirtualDataWindowFactory;
import com.espertech.esper.client.hook.VirtualDataWindowFactoryContext;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ch/uzh/ifi/ddis/ifp/esper/cassandra/CassandraVirtualDataWindowFactory.class */
public class CassandraVirtualDataWindowFactory implements VirtualDataWindowFactory {
    private static final Logger _log = LoggerFactory.getLogger(CassandraVirtualDataWindowFactory.class);
    private Cluster _cluster;
    private CassandraConfiguration _configuration;

    public void initialize(VirtualDataWindowFactoryContext virtualDataWindowFactoryContext) {
        Object[] parameters = virtualDataWindowFactoryContext.getParameters();
        if (parameters.length < 1) {
            throw new RuntimeException(String.format("Cassandra window has at least one parameter. Found %s", Integer.valueOf(parameters.length)));
        }
        String trim = parameters[0].toString().trim();
        _log.info("Started parsing Cassandra configuration...");
        try {
            this._configuration = CassandraConfiguration.create(trim);
            _log.info("Finished parsing Cassandra configuration.");
            try {
                connect(this._configuration.getHost());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } catch (Exception e2) {
            _log.error("Error parsing Cassandra configuration!");
            throw new RuntimeException(e2);
        }
    }

    public VirtualDataWindow create(VirtualDataWindowContext virtualDataWindowContext) {
        if (this._cluster == null) {
            throw new RuntimeException("Cassandra cluster has not yet been initialized!");
        }
        try {
            _log.info("Started connecting to cluster...");
            Session connect = this._cluster.connect(this._configuration.getKeyspace());
            _log.info("Finished connecting to cluster...");
            return new CassandraVirtualDataWindow(connect, virtualDataWindowContext);
        } catch (Exception e) {
            _log.error("Error connecting to cluster!");
            throw new RuntimeException(e);
        }
    }

    public void destroyAllContextPartitions() {
        close();
    }

    public Set<String> getUniqueKeyPropertyNames() {
        return null;
    }

    protected void connect(String str) throws Exception {
        try {
            _log.info("Started connecting to cluster: {} ...", str);
            this._cluster = Cluster.builder().addContactPoint(str).build();
            _log.info("Finished connecting to cluster: {} .", str);
        } catch (Exception e) {
            _log.error("Error connecting to cluster: {} !", str);
            throw new RuntimeException(String.format("Could not connect to Cassandra cluster!", str), e);
        }
    }

    protected void close() {
        try {
            _log.info("Started shutting down connection to Cassandra cluster...");
            this._cluster.shutdown();
            _log.info("Finished shutting down connection to Cassandra cluster.");
        } catch (Exception e) {
            _log.error("Error shutting down connection to Cassandra cluster!", e);
        }
    }
}
