package de.julielab.jcore.ae.checkpoint;

import com.google.common.collect.Sets;
import de.julielab.costosys.configuration.FieldConfig;
import de.julielab.costosys.dbconnection.CoStoSysConnection;
import de.julielab.costosys.dbconnection.DataBaseConnector;
import de.julielab.jcore.types.ext.DBProcessingMetaData;
import de.julielab.jcore.utility.JCoReTools;
import java.io.FileNotFoundException;
import java.sql.BatchUpdateException;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.uima.UimaContext;
import org.apache.uima.analysis_component.JCasAnnotator_ImplBase;
import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
import org.apache.uima.fit.descriptor.ConfigurationParameter;
import org.apache.uima.fit.descriptor.ResourceMetaData;
import org.apache.uima.fit.util.JCasUtil;
import org.apache.uima.jcas.JCas;
import org.apache.uima.resource.ResourceInitializationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ResourceMetaData(name = "JCoRe Database Checkpoint AE", description = "This component can be used when using a JCoRe database reader that reads from a CoStoSys/JeDIS subset. Enters the configured component name in the 'last component' column. Can also mark documents as being completely processed.")
/* loaded from: input_file:de/julielab/jcore/ae/checkpoint/DBCheckpointAE.class */
public class DBCheckpointAE extends JCasAnnotator_ImplBase {
    public static final String PARAM_CHECKPOINT_NAME = "CheckpointName";
    public static final String PARAM_INDICATE_FINISHED = "IndicateFinished";
    public static final String PARAM_COSTOSYS_CONFIG = "CostosysConfigFile";
    public static final String PARAM_WRITE_BATCH_SIZE = "WriteBatchSize";
    private static final Logger log = LoggerFactory.getLogger(DBCheckpointAE.class);
    private DataBaseConnector dbc;

    @ConfigurationParameter(name = PARAM_CHECKPOINT_NAME, description = "String parameter. A name that identifies this checkpoint in the database.")
    private String componentDbName;

    @ConfigurationParameter(name = PARAM_INDICATE_FINISHED, mandatory = false, description = "Whether or not the checkpoint should mark the end of processing of the pipeline. If set to true, this component will not only set its name as checkpoint in the subset table but also set the 'is processed' flag to true and the 'is in process' flag to false.")
    private boolean indicateFinished;

    @ConfigurationParameter(name = PARAM_COSTOSYS_CONFIG, description = "File path or classpath resource location of a Corpus Storage System (CoStoSys) configuration file. This file specifies the database to write the XMI data into and the data table schema. This schema must at least define the primary key columns that the storage tables should have for each document. The primary key is currently just the document ID. Thus, at the moment, primary keys can only consist of a single element when using this component. This is a shortcoming of this specific component and must be changed here, if necessary.")
    private String dbcConfigPath;

    @ConfigurationParameter(name = PARAM_WRITE_BATCH_SIZE, mandatory = false, defaultValue = {"50"}, description = "The number of processed CASes after which the checkpoint should be written into the database. Defaults to 50.")
    private int writeBatchSize;

    @ConfigurationParameter(name = DocumentReleaseCheckpoint.PARAM_JEDIS_SYNCHRONIZATION_KEY, mandatory = false, description = DocumentReleaseCheckpoint.SYNC_PARAM_DESC)
    private String jedisSyncKey;
    private String subsetTable;
    private Set<DocumentId> docIds;
    private DocumentReleaseCheckpoint docReleaseCheckpoint;

    public void initialize(UimaContext uimaContext) throws ResourceInitializationException {
        this.componentDbName = (String) uimaContext.getConfigParameterValue(PARAM_CHECKPOINT_NAME);
        this.dbcConfigPath = (String) uimaContext.getConfigParameterValue(PARAM_COSTOSYS_CONFIG);
        this.indicateFinished = ((Boolean) Optional.ofNullable((Boolean) uimaContext.getConfigParameterValue(PARAM_INDICATE_FINISHED)).orElse(false)).booleanValue();
        this.writeBatchSize = ((Integer) Optional.ofNullable((Integer) uimaContext.getConfigParameterValue(PARAM_WRITE_BATCH_SIZE)).orElse(50)).intValue();
        try {
            this.dbc = new DataBaseConnector(this.dbcConfigPath);
            this.docIds = new HashSet();
            if (this.indicateFinished) {
                this.jedisSyncKey = (String) Optional.ofNullable(uimaContext.getConfigParameterValue(DocumentReleaseCheckpoint.PARAM_JEDIS_SYNCHRONIZATION_KEY)).orElse(getClass().getCanonicalName() + this.componentDbName);
                this.docReleaseCheckpoint = DocumentReleaseCheckpoint.get();
                this.docReleaseCheckpoint.register(this.jedisSyncKey);
            }
            log.info("{}: {}", PARAM_CHECKPOINT_NAME, this.componentDbName);
            log.info("{}: {}", PARAM_INDICATE_FINISHED, Boolean.valueOf(this.indicateFinished));
            log.info("{}: {}", PARAM_CHECKPOINT_NAME, this.componentDbName);
            log.info("{}: {}", PARAM_WRITE_BATCH_SIZE, Integer.valueOf(this.writeBatchSize));
        } catch (FileNotFoundException e) {
            log.error("Could not initiate database connector", e);
            throw new ResourceInitializationException(e);
        }
    }

    public void batchProcessComplete() throws AnalysisEngineProcessException {
        super.batchProcessComplete();
        log.debug("BatchProcessComplete called, stashing {} documents to be ready for marked as being finished", Integer.valueOf(this.docIds.size()));
        if (this.indicateFinished) {
            this.docReleaseCheckpoint.release(this.jedisSyncKey, this.docIds.stream());
        }
        CoStoSysConnection obtainOrReserveConnection = this.dbc.obtainOrReserveConnection();
        try {
            setLastComponent(obtainOrReserveConnection, this.subsetTable, this.indicateFinished, this.dbc.getActiveTableFieldConfiguration());
            if (obtainOrReserveConnection != null) {
                obtainOrReserveConnection.close();
            }
            this.docIds.clear();
        } catch (Throwable th) {
            if (obtainOrReserveConnection != null) {
                try {
                    obtainOrReserveConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void collectionProcessComplete() throws AnalysisEngineProcessException {
        super.collectionProcessComplete();
        log.debug("BatchProcessComplete called, stashing {} documents to be ready for marked as being finished", Integer.valueOf(this.docIds.size()));
        if (this.indicateFinished) {
            this.docReleaseCheckpoint.release(this.jedisSyncKey, this.docIds.stream());
        }
        CoStoSysConnection obtainOrReserveConnection = this.dbc.obtainOrReserveConnection();
        try {
            setLastComponent(obtainOrReserveConnection, this.subsetTable, this.indicateFinished, this.dbc.getActiveTableFieldConfiguration());
            if (obtainOrReserveConnection != null) {
                obtainOrReserveConnection.close();
            }
            this.docIds.clear();
            log.info("Closing database connector.");
            this.dbc.close();
        } catch (Throwable th) {
            if (obtainOrReserveConnection != null) {
                try {
                    obtainOrReserveConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void customBatchProcessingComplete() throws AnalysisEngineProcessException {
        if (this.indicateFinished) {
            this.docReleaseCheckpoint.release(this.jedisSyncKey, this.docIds.stream());
        }
        CoStoSysConnection obtainOrReserveConnection = this.dbc.obtainOrReserveConnection();
        try {
            setLastComponent(obtainOrReserveConnection, this.subsetTable, this.indicateFinished, this.dbc.getActiveTableFieldConfiguration());
            if (obtainOrReserveConnection != null) {
                obtainOrReserveConnection.close();
            }
            this.docIds.clear();
        } catch (Throwable th) {
            if (obtainOrReserveConnection != null) {
                try {
                    obtainOrReserveConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void process(JCas jCas) throws AnalysisEngineProcessException {
        try {
            DBProcessingMetaData selectSingle = JCasUtil.selectSingle(jCas, DBProcessingMetaData.class);
            if (!selectSingle.getDoNotMarkAsProcessed()) {
                DocumentId documentId = new DocumentId(selectSingle);
                if (this.subsetTable == null) {
                    this.subsetTable = selectSingle.getSubsetTable();
                }
                if (this.subsetTable == null) {
                    if (selectSingle.getSubsetTable() == null) {
                        log.error("The subset table retrieved from the DBProcessingMetaData is null. Cannot continue without the table name.");
                        throw new AnalysisEngineProcessException(new IllegalStateException("The subset table retrieved from the DBProcessingMetaData is null. Cannot continue without the table name."));
                    }
                    this.subsetTable = selectSingle.getSubsetTable();
                }
                this.docIds.add(documentId);
                log.trace("Adding document ID {} for subset table {} for checkpoint marking", documentId, this.subsetTable);
                if (this.docIds.size() >= this.writeBatchSize) {
                    log.debug("Cached documents have reached the configured batch size of {}, sending to database.", Integer.valueOf(this.writeBatchSize));
                    customBatchProcessingComplete();
                }
            }
        } catch (IllegalArgumentException e) {
            log.error("The document with document ID {} does not have an annotation of type {}. This annotation ought to contain the name of the subset table. It should be set by the DB reader. Cannot write the checkpoint to the database since the target subset table or its schema is unknown.", JCoReTools.getDocId(jCas), DBProcessingMetaData.class.getCanonicalName());
            throw new AnalysisEngineProcessException(e);
        }
    }

    private void setLastComponent(CoStoSysConnection coStoSysConnection, String str, boolean z, FieldConfig fieldConfig) throws AnalysisEngineProcessException {
        Set<DocumentId> emptySet = Collections.emptySet();
        if (z) {
            emptySet = this.docReleaseCheckpoint.getReleasedDocumentIds();
        }
        Sets.SetView difference = Sets.difference(this.docIds, emptySet);
        if ((difference.isEmpty() && emptySet.isEmpty()) || StringUtils.isBlank(str)) {
            log.debug("Not setting the last component to {} because the processed document IDs list is empty (size: {}) or the subset table name could not be retrieved (is: {})", new Object[]{this.componentDbName, Integer.valueOf(difference.size()), str});
            return;
        }
        if (fieldConfig.getPrimaryKey().length > 1) {
            throw new IllegalArgumentException("Currently, only one-element primary keys are supported.");
        }
        String join = StringUtils.join(fieldConfig.expandPKNames("%s = ?"), " AND ");
        String format = String.format("UPDATE %s SET %s='%s' WHERE %s", str, "last_component", this.componentDbName, join);
        String str2 = null;
        if (z) {
            str2 = String.format("UPDATE %s SET %s='%s', %s=TRUE, %s=FALSE WHERE %s", str, "last_component", this.componentDbName, "is_processed", "is_in_process", join);
        }
        if (!difference.isEmpty()) {
            log.debug("Setting the last component to {} for {} documents", this.componentDbName, Integer.valueOf(difference.size()));
            updateSubsetTable(coStoSysConnection, difference, format);
        }
        if (z) {
            log.debug("Marking {} documents to having been processed by component \"{}\".", Integer.valueOf(difference.size()), this.componentDbName);
            updateSubsetTable(coStoSysConnection, emptySet, str2);
        }
    }

    private void updateSubsetTable(CoStoSysConnection coStoSysConnection, Collection<DocumentId> collection, String str) throws AnalysisEngineProcessException {
        boolean z;
        do {
            try {
                z = false;
                PreparedStatement prepareStatement = coStoSysConnection.prepareStatement(str);
                for (DocumentId documentId : collection) {
                    for (int i = 0; i < documentId.getId().length; i++) {
                        prepareStatement.setString(i + 1, documentId.getId()[i]);
                    }
                    prepareStatement.addBatch();
                }
                try {
                    prepareStatement.executeBatch();
                } catch (BatchUpdateException e) {
                    if (e.getMessage().contains("deadlock detected")) {
                        log.debug("Database transaction deadlock detected while trying to set the last component. Trying again.");
                        z = true;
                    }
                }
            } catch (SQLException e2) {
                e2.printStackTrace();
                SQLException nextException = e2.getNextException();
                if (null == nextException) {
                    throw new AnalysisEngineProcessException(e2);
                }
                nextException.printStackTrace();
                throw new AnalysisEngineProcessException(nextException);
            }
        } while (z);
    }
}
