package ch.uzh.ifi.ddis.ifp.esper.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.AlreadyExistsException;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.espertech.esper.client.hook.VirtualDataWindowContext;
import com.espertech.esper.client.hook.VirtualDataWindowFactory;
import com.espertech.esper.client.hook.VirtualDataWindowFactoryContext;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ch/uzh/ifi/ddis/ifp/esper/cassandra/CassandraVirtualDataWindowFactory.class */
public class CassandraVirtualDataWindowFactory implements VirtualDataWindowFactory {
    private static final Logger _log = LoggerFactory.getLogger(CassandraVirtualDataWindowFactory.class);
    public static final String CASSANDRA_USERNAME = "cassandra-username";
    public static final String CASSANDRA_ROOT_USERNAME = "cassandra-root-username";
    public static final String CASSANDRA_PASSWORD = "cassandra-password";
    public static final String CASSANDRA_ROOT_PASSWORD = "cassandra-root-password";
    private Cluster _cluster;
    private Cluster _rootCluster;
    private CassandraConfiguration _configuration;
    private String _username;

    private Cluster connectToCluster(String str, String str2, String str3) throws Exception {
        Cluster.Builder withoutMetrics = Cluster.builder().withoutMetrics();
        try {
            _log.info("Started connecting to cluster: {} ...", str);
            _log.debug("Adding contact point...");
            Cluster.Builder addContactPoint = withoutMetrics.addContactPoint(str);
            if (str2 != null && str3 != null) {
                _log.debug("Adding credentials...");
                addContactPoint = addContactPoint.withCredentials(str2, str3);
            }
            _log.debug("Building cluster user...");
            Cluster build = addContactPoint.build();
            _log.info("Finished connecting to cluster user: {} .", str);
            return build;
        } catch (Exception e) {
            _log.error("Error connecting to cluster user: {} !", str);
            throw new Exception(String.format("Could not connect to Cassandra cluster!", str), e);
        }
    }

    private void createKeyspace(String str, String str2) throws Exception {
        Session session = null;
        Exception exc = null;
        try {
            _log.info("Started checking whether keyspace {} exists...", str2);
            session = this._rootCluster.connect(str2);
            _log.info("Finished checking whether keyspace {} exists...", str2);
            _log.info("Keyspace {} exists, nothing to do.", str2);
        } catch (InvalidQueryException e) {
        } catch (AlreadyExistsException e2) {
        } catch (Exception e3) {
            _log.error("Unexcpected Exception when checking for keyspace {}!", str2, e3);
            exc = e3;
        }
        if (session == null && exc == null) {
            _log.info("Finished checking whether keyspace {} exists...", str2);
            _log.info("Keyspace does not yet {} exist, trying to create it", str2);
            try {
                try {
                    _log.info("Started creating keyspace {} ...", str2);
                    session = this._rootCluster.connect();
                    session.execute(String.format("CREATE keyspace %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3}", str2));
                    _log.info("Finished creating keyspace {} .", str2);
                    if (session != null) {
                        _log.info("Started shutting down session for creating keyspace ...");
                        session.shutdown();
                        _log.info("Finished shutting down session for creating keyspace.");
                    }
                } catch (Exception e4) {
                    _log.error("Error creating keyspace {}", str2, e4);
                    exc = e4;
                    if (session != null) {
                        _log.info("Started shutting down session for creating keyspace ...");
                        session.shutdown();
                        _log.info("Finished shutting down session for creating keyspace.");
                    }
                }
            } catch (Throwable th) {
                if (session != null) {
                    _log.info("Started shutting down session for creating keyspace ...");
                    session.shutdown();
                    _log.info("Finished shutting down session for creating keyspace.");
                }
                throw th;
            }
        }
        if (exc != null) {
            throw exc;
        }
    }

    private void createTable(VirtualDataWindowContext virtualDataWindowContext) throws Exception {
        Session session = null;
        Exception exc = null;
        String namedWindowName = virtualDataWindowContext.getNamedWindowName();
        String namespaceName = virtualDataWindowContext.getViewFactoryContext().getNamespaceName();
        boolean z = true;
        try {
            _log.info("Started checking whether table {} exists...", namedWindowName);
            session = this._rootCluster.connect(namespaceName);
            session.execute(String.format("SELECT * FROM %s LIMIT 10", namedWindowName));
            _log.info("Finished checking whether table {} exists...", namedWindowName);
            _log.info("Table {} exists, nothing to do.", namedWindowName);
        } catch (InvalidQueryException e) {
            z = false;
        } catch (Exception e2) {
            z = false;
            _log.error("Unexcpected Exception when checking for table {}!", namedWindowName, e2);
            exc = e2;
        }
        if (exc == null) {
            _log.info("Finished checking whether table {} exists...", namedWindowName);
            try {
                try {
                    Session connect = this._rootCluster.connect(namespaceName);
                    if (!z) {
                        _log.info("Table does not yet {} exist, trying to create it", namedWindowName);
                        try {
                            _log.info("Started creating table {} ...", namedWindowName);
                            String createCreateTableQuery = QueryStringBuilder.createCreateTableQuery(virtualDataWindowContext);
                            _log.debug("Executing create query: {}", createCreateTableQuery);
                            connect.execute(createCreateTableQuery);
                            _log.info("Finished creating table {} .", namedWindowName);
                        } catch (Exception e3) {
                            _log.error("Error creating table {} .", namedWindowName);
                            exc = e3;
                        }
                    }
                    if (this._username != null && exc == null) {
                        try {
                            _log.info("Started granting rights to table {} ...", namedWindowName);
                            String format = String.format("GRANT ALL on %s TO %s", namedWindowName, this._username);
                            _log.debug("Executing grant query: {}", format);
                            connect.execute(format);
                            _log.info("Finished granting rights to table {}.", namedWindowName);
                        } catch (Exception e4) {
                            _log.error("Error granting rights to table {}.", namedWindowName, e4);
                            exc = e4;
                        }
                    }
                    if (connect != null) {
                        _log.info("Started shutting down session for creating table ...");
                        connect.shutdown();
                        _log.info("Finished shutting down session for creating table.");
                    }
                } catch (Exception e5) {
                    _log.error("Error creating table {}", namedWindowName, e5);
                    exc = e5;
                    if (session != null) {
                        _log.info("Started shutting down session for creating table ...");
                        session.shutdown();
                        _log.info("Finished shutting down session for creating table.");
                    }
                }
            } catch (Throwable th) {
                if (session != null) {
                    _log.info("Started shutting down session for creating table ...");
                    session.shutdown();
                    _log.info("Finished shutting down session for creating table.");
                }
                throw th;
            }
        }
        if (exc != null) {
            throw exc;
        }
    }

    public void initialize(VirtualDataWindowFactoryContext virtualDataWindowFactoryContext) {
        String namespaceName = virtualDataWindowFactoryContext.getViewFactoryContext().getNamespaceName();
        Object[] parameters = virtualDataWindowFactoryContext.getParameters();
        if (parameters.length < 1) {
            throw new RuntimeException(String.format("Cassandra window has at least one parameter. Found %s", Integer.valueOf(parameters.length)));
        }
        String trim = parameters[0].toString().trim();
        _log.info("Started parsing Cassandra configuration...");
        try {
            this._configuration = CassandraConfiguration.create(trim);
            _log.info("Finished parsing Cassandra configuration.");
            String host = this._configuration.getHost();
            this._username = System.getProperty(CASSANDRA_USERNAME, null);
            String property = System.getProperty(CASSANDRA_PASSWORD, null);
            try {
                this._rootCluster = connectToCluster(host, System.getProperty(this._configuration.getRootUsernameProperty(), null), System.getProperty(this._configuration.getRootPasswordProperty(), null));
                createKeyspace(host, namespaceName);
                this._cluster = connectToCluster(host, this._username, property);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } catch (Exception e2) {
            _log.error("Error parsing Cassandra configuration!");
            throw new RuntimeException(e2);
        }
    }

    /* renamed from: create, reason: merged with bridge method [inline-methods] */
    public CassandraVirtualDataWindow m5create(VirtualDataWindowContext virtualDataWindowContext) {
        CassandraVirtualDataWindow cassandraVirtualDataWindow;
        if (this._cluster == null) {
            throw new RuntimeException("Cassandra cluster has not yet been initialized!");
        }
        synchronized (this._cluster) {
            try {
                _log.info("Started connecting to cluster...");
                Session connect = this._cluster.connect(this._configuration.getKeyspace());
                _log.info("Finished connecting to cluster...");
                createTable(virtualDataWindowContext);
                cassandraVirtualDataWindow = new CassandraVirtualDataWindow(connect, virtualDataWindowContext.getNamedWindowName(), virtualDataWindowContext);
            } catch (Exception e) {
                _log.error("Error connecting to cluster!");
                throw new RuntimeException(e);
            }
        }
        return cassandraVirtualDataWindow;
    }

    public void destroyAllContextPartitions() {
        close();
    }

    public Set<String> getUniqueKeyPropertyNames() {
        return null;
    }

    protected void close() {
        try {
            _log.info("Started shutting down connection to Cassandra cluster...");
            if (this._cluster != null) {
                this._cluster.shutdown();
            }
            _log.info("Finished shutting down connection to Cassandra cluster.");
        } catch (Exception e) {
            _log.error("Error shutting down connection to Cassandra cluster!", e);
        }
        try {
            _log.info("Started shutting down connection to Cassandra root cluster...");
            if (this._cluster != null) {
                this._rootCluster.shutdown();
            }
            _log.info("Finished shutting down connection to Cassandra root cluster.");
        } catch (Exception e2) {
            _log.error("Error shutting down connection to Cassandra root cluster!", e2);
        }
    }
}
