package stream.io;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.ProcessContext;
import stream.ProcessorException;
import stream.annotations.Description;
import stream.annotations.Parameter;
import stream.data.Data;
import stream.data.DataFactory;
import stream.io.sql.HsqlDialect;
import stream.io.sql.MysqlDialect;

@Description(group = "Data Stream.Output")
/* loaded from: input_file:stream/io/SQLWriter.class */
public class SQLWriter extends AbstractSQLProcessor {
    static Logger log = LoggerFactory.getLogger(SQLWriter.class);
    String table;
    String[] keys;
    boolean dropTable = false;
    final LinkedHashSet<String> keysToStore = new LinkedHashSet<>();
    Map<String, Class<?>> tableSchema = null;
    transient boolean tableExists = false;
    transient long count = 0;
    transient Connection connection = null;
    transient List<String> columns = new ArrayList();

    public String getTable() {
        return this.table;
    }

    @Parameter(required = true, description = "The database table to insert items into.")
    public void setTable(String str) {
        this.table = str;
    }

    public String[] getKeys() {
        return this.keys;
    }

    @Parameter(required = false, description = "A list of attributes to insert (columns), empty string for all attributes.")
    public void setKeys(String[] strArr) {
        this.keys = strArr;
    }

    public boolean isDropTable() {
        return this.dropTable;
    }

    @Parameter(required = false, defaultValue = "false")
    public void setDropTable(boolean z) {
        this.dropTable = z;
    }

    public void init(ProcessContext processContext) throws Exception {
        super.init(processContext);
        if (this.table == null || this.table.trim().equals("")) {
            throw new Exception("No 'table' attribute provided!");
        }
        init();
        log.debug("init(ProcessContext) done.");
    }

    private void init() throws Exception {
        this.connection = openConnection();
        log.debug("Opened connection to {} = {}", getUrl(), this.connection);
        log.debug("Dialect = {} ", this.dialect);
        if (this.url.toLowerCase().startsWith("jdbc:mysql")) {
            this.dialect = new MysqlDialect();
        }
        if (this.url.toLowerCase().startsWith("jdbc:hsqldb")) {
            this.dialect = new HsqlDialect();
        }
        log.debug("Using dialect {}", this.dialect);
        if (this.dropTable) {
            log.debug("Dropping existing table '{}'", getTable());
            try {
                log.debug("Return of DROP TABLE: {}", Integer.valueOf(this.connection.createStatement().executeUpdate("DROP TABLE " + getTable())));
                return;
            } catch (Exception e) {
                log.error("Failed to drop table: {}", e.getMessage());
                return;
            }
        }
        Map<String, Class<?>> tableSchema = this.dialect.getTableSchema(this.connection, getTable());
        if (tableSchema != null) {
            log.debug("Using existing table schema: {}", tableSchema);
            log.info("Existing schema is: {}", tableSchema);
            if (this.tableSchema == null) {
                this.tableSchema = new LinkedHashMap(tableSchema);
            } else {
                this.tableSchema.putAll(tableSchema);
            }
            if (this.keys != null) {
                for (String str : this.keys) {
                    if (!this.tableSchema.containsKey(str)) {
                        log.info("Removing non-selected key '{}'", str);
                        this.tableSchema.remove(str);
                    }
                }
            }
            log.debug("Types:\n{}", this.tableSchema);
        }
    }

    @Override // stream.io.AbstractSQLProcessor
    public boolean hasTable(String str) {
        if (this.tableExists) {
            return true;
        }
        this.tableExists = super.hasTable(str);
        return this.tableExists;
    }

    public Data process(Data data) {
        if (this.connection == null) {
            try {
                init();
            } catch (Exception e) {
                throw new ProcessorException(this, "Failed to initialize database connection: " + e.getMessage());
            }
        }
        if (this.tableSchema == null) {
            log.debug("No table-schema found, does table exist? {}", Boolean.valueOf(hasTable(getTable())));
            this.tableSchema = this.dialect.getTableSchema(this.connection, getTable());
            log.debug("Tried to read schema from database: {}", this.tableSchema);
            if (this.tableSchema == null) {
                log.debug("Creating new table {} from first item {}", getTable(), data);
                Data create = DataFactory.create();
                if (this.keys != null) {
                    for (String str : this.keys) {
                        create.put(str, data.get(str));
                    }
                } else {
                    create.putAll(data);
                }
                Map<String, Class<?>> columnTypes = this.dialect.getColumnTypes(create);
                if (!createTable(getTable(), columnTypes)) {
                    throw new ProcessorException(this, "Failed to create table " + getTable() + " for item: " + data);
                }
                this.tableSchema = columnTypes;
            }
        }
        if (!hasTable(getTable())) {
            if (this.keys != null) {
                for (String str2 : this.keys) {
                    Serializable serializable = (Serializable) data.get(str2);
                    if (serializable == null) {
                        throw new ProcessorException(this, "Cannot determine type of key '" + str2 + "' for table creation! First item does not provide a value for '" + str2 + "'!");
                    }
                    this.tableSchema.put(str2, serializable.getClass());
                }
            } else {
                for (String str3 : data.keySet()) {
                    this.tableSchema.put(str3, ((Serializable) data.get(str3)).getClass());
                }
            }
            if (!createTable(getTable(), this.tableSchema)) {
                throw new ProcessorException(this, "Failed to create table '" + getTable() + "'!");
            }
            this.tableExists = true;
        }
        try {
            StringBuffer stringBuffer = new StringBuffer("INSERT INTO ");
            stringBuffer.append(getTable());
            stringBuffer.append(" ( ");
            StringBuffer stringBuffer2 = new StringBuffer(" VALUES ( ");
            ArrayList arrayList = new ArrayList();
            Iterator<String> it = this.tableSchema.keySet().iterator();
            while (it.hasNext()) {
                String next = it.next();
                Serializable serializable2 = (Serializable) data.get(next);
                if (serializable2 != null) {
                    arrayList.add(serializable2);
                    stringBuffer.append(this.dialect.mapColumnName(next));
                    stringBuffer2.append("?");
                    if (it.hasNext()) {
                        stringBuffer.append(", ");
                        stringBuffer2.append(", ");
                    }
                }
            }
            stringBuffer.append(" ) ");
            stringBuffer2.append(" ) ");
            stringBuffer.append(stringBuffer2.toString());
            log.debug("INSERT statement is: {}", stringBuffer);
            PreparedStatement prepareStatement = this.connection.prepareStatement(stringBuffer.toString());
            for (int i = 0; i < arrayList.size(); i++) {
                prepareStatement.setObject(i + 1, arrayList.get(i));
            }
            int executeUpdate = prepareStatement.executeUpdate();
            if (executeUpdate == 1) {
                this.count++;
            }
            log.debug("INSERT retured {}", Integer.valueOf(executeUpdate));
            prepareStatement.close();
        } catch (Exception e2) {
            log.error("Failed to insert data item: {}", e2.getMessage());
        }
        return data;
    }

    public void finish() throws Exception {
        super.finish();
        log.debug("Closing SQL writer, {} items written.", Long.valueOf(this.count));
        log.debug("Closing SQL connection...");
        this.connection.close();
        this.tableSchema = null;
    }
}
