package de.julielab.jcore.reader.db;

import de.julielab.jcore.types.casmultiplier.RowBatch;
import de.julielab.xmlData.cli.TableNotFoundException;
import de.julielab.xmlData.dataBase.CoStoSysConnection;
import de.julielab.xmlData.dataBase.DBCIterator;
import de.julielab.xmlData.dataBase.util.TableSchemaMismatchException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.uima.UimaContext;
import org.apache.uima.collection.CollectionException;
import org.apache.uima.ducc.Workitem;
import org.apache.uima.fit.descriptor.ConfigurationParameter;
import org.apache.uima.fit.descriptor.ResourceMetaData;
import org.apache.uima.jcas.JCas;
import org.apache.uima.jcas.cas.FSArray;
import org.apache.uima.jcas.cas.StringArray;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.util.Progress;
import org.apache.uima.util.ProgressImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ResourceMetaData(name = "JCoRe Database Multiplier Reader", description = "A collection reader that receives the IDs of documents from a database table. Additional tables may be specified which will, together with the IDs, be sent to a CAS multiplier extending the DBMultiplierReader. The multiplier will read documents and the joined additional tables according to the list of document IDs sent by this reader. The component leverages the corpus storage system (CoStoSys) for this purpose and is part of the Jena Document Information System, JeDIS.", vendor = "JULIE Lab Jena, Germany", copyright = "JULIE Lab Jena, Germany")
/* loaded from: input_file:de/julielab/jcore/reader/db/DBMultiplierReader.class */
public class DBMultiplierReader extends DBSubsetReader {
    public static final String PARAM_RESET_TABLE = "ResetTable";
    public static final String PARAM_TABLE = "Table";
    public static final String PARAM_COSTOSYS_CONFIG_NAME = "CostosysConfigFile";
    public static final String PARAM_DATA_TIMESTAMP = "Timestamp";
    public static final String PARAM_ADDITIONAL_TABLES = "AdditionalTables";
    public static final String PARAM_ADDITIONAL_TABLE_SCHEMAS = "AdditionalTableSchemas";
    public static final String PARAM_FETCH_IDS_PROACTIVELY = "FetchIdsProactively";
    public static final String PARAM_SEND_CAS_TO_LAST = "SendCasToLast";

    @ConfigurationParameter(name = PARAM_SEND_CAS_TO_LAST, mandatory = false, defaultValue = {"false"}, description = "UIMA DUCC relevant parameter when using a CAS multiplier. When set to true, the worker CAS from the collection reader is forwarded to the last component in the pipeline. This can be used to send information about the progress to the CAS consumer in order to have it perform batch operations. For this purpose, a feature structure of type WorkItem from the DUCC library is added to the worker CAS. This feature structure has information about the current progress.")
    private boolean sendCasToLast;
    private static final Logger log = LoggerFactory.getLogger(DBMultiplierReader.class);
    private RetrievingThread retriever;
    private DBCIterator<Object[]> dataTableDocumentIds;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:de/julielab/jcore/reader/db/DBMultiplierReader$RetrievingThread.class */
    public class RetrievingThread extends Thread {
        private List<Object[]> ids;

        public RetrievingThread() {
            if (DBMultiplierReader.this.fetchIdsProactively.booleanValue()) {
                DBMultiplierReader.log.debug("Fetching ID batches in a background thread.");
                setName(DBMultiplierReader.class.getSimpleName() + " RetrievingThread (" + getName() + ")");
                start();
            }
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            int min = Math.min(DBMultiplierReader.this.batchSize, DBMultiplierReader.this.totalDocumentCount - DBMultiplierReader.this.numberFetchedDocIDs);
            try {
                CoStoSysConnection obtainOrReserveConnection = DBMultiplierReader.this.dbc.obtainOrReserveConnection();
                try {
                    DBMultiplierReader.log.trace("Using connection {} to retrieveAndMark", obtainOrReserveConnection.getConnection());
                    this.ids = DBMultiplierReader.this.dbc.retrieveAndMark(DBMultiplierReader.this.tableName, getClass().getSimpleName(), DBMultiplierReader.this.hostName, DBMultiplierReader.this.pid, min, DBMultiplierReader.this.selectionOrder);
                    if (obtainOrReserveConnection != null) {
                        obtainOrReserveConnection.close();
                    }
                    if (DBMultiplierReader.log.isTraceEnabled()) {
                        ArrayList arrayList = new ArrayList();
                        for (Object[] objArr : this.ids) {
                            ArrayList arrayList2 = new ArrayList();
                            for (Object obj : objArr) {
                                arrayList2.add(String.valueOf(obj));
                            }
                            arrayList.add(StringUtils.join(arrayList2, "-"));
                        }
                        DBMultiplierReader.log.trace("Reserved the following document IDs for processing: " + arrayList);
                    }
                    DBMultiplierReader.this.numberFetchedDocIDs += this.ids.size();
                    DBMultiplierReader.log.debug("Retrieved {} document IDs to fetch from the database.", Integer.valueOf(this.ids.size()));
                } finally {
                }
            } catch (TableSchemaMismatchException e) {
                DBMultiplierReader.log.error("Table schema mismatch: The active table schema {} specified in the CoStoSys configuration file {} does not match the columns in the subset table {}: {}", new Object[]{DBMultiplierReader.this.dbc.getActiveTableSchema(), DBMultiplierReader.this.costosysConfig, DBMultiplierReader.this.tableName, e.getMessage()});
                throw new IllegalArgumentException((Throwable) e);
            } catch (TableNotFoundException e2) {
                DBMultiplierReader.log.error("A table to read from could not be found", e2);
                throw new IllegalArgumentException((Throwable) e2);
            }
        }

        public List<Object[]> getDocumentIds() {
            if (!DBMultiplierReader.this.fetchIdsProactively.booleanValue()) {
                DBMultiplierReader.log.debug("Fetching new documents (without employing a background thread).");
                run();
            }
            try {
                DBMultiplierReader.log.debug("Waiting for the background thread to finish fetching documents to return them.");
                join();
                return this.ids;
            } catch (InterruptedException e) {
                DBMultiplierReader.log.error("Background ID fetching thread was interrupted", e);
                return null;
            }
        }
    }

    @Override // de.julielab.jcore.reader.db.DBSubsetReader, de.julielab.jcore.reader.db.DBReaderBase
    public void initialize(UimaContext uimaContext) throws ResourceInitializationException {
        super.initialize(uimaContext);
        if (!this.readDataTable.booleanValue()) {
            log.debug("Reading from subset table {}", this.tableName);
            this.hasNext = this.dbc.withConnectionQueryBoolean(dataBaseConnector -> {
                return Boolean.valueOf(dataBaseConnector.hasUnfetchedRows(this.tableName));
            });
        } else {
            log.debug("Reading from data table {}", this.tableName);
            this.dataTableDocumentIds = this.dbc.query(this.tableName, Arrays.asList(this.dbc.getFieldConfiguration(this.dbc.getActiveTableSchema()).getPrimaryKey()));
            this.hasNext = this.dataTableDocumentIds.hasNext();
        }
    }

    public void getNext(JCas jCas) throws CollectionException {
        log.trace("Requesting next batch of document IDs from the database.");
        List<Object[]> nextDocumentIdBatch = getNextDocumentIdBatch();
        log.trace("Received a list of {} ID from the database.", Integer.valueOf(nextDocumentIdBatch.size()));
        RowBatch rowBatch = new RowBatch(jCas);
        FSArray fSArray = new FSArray(jCas, nextDocumentIdBatch.size());
        for (int i = 0; i < nextDocumentIdBatch.size(); i++) {
            Object[] objArr = nextDocumentIdBatch.get(i);
            StringArray stringArray = new StringArray(jCas, objArr.length);
            for (int i2 = 0; i2 < objArr.length; i2++) {
                stringArray.set(i2, String.valueOf(objArr[i2]));
            }
            fSArray.set(i, stringArray);
        }
        StringArray stringArray2 = new StringArray(jCas, this.tables.length);
        StringArray stringArray3 = new StringArray(jCas, this.schemas.length);
        for (int i3 = 0; i3 < this.tables.length; i3++) {
            String str = this.tables[i3];
            String str2 = this.schemas[i3];
            stringArray2.set(i3, str);
            stringArray3.set(i3, str2);
        }
        rowBatch.setIdentifiers(fSArray);
        rowBatch.setTables(stringArray2);
        rowBatch.setTableName(this.tableName);
        rowBatch.setTableSchemas(stringArray3);
        rowBatch.setCostosysConfiguration(this.costosysConfig);
        rowBatch.addToIndexes();
        if (this.sendCasToLast) {
            try {
                Workitem workitem = new Workitem(jCas);
                workitem.setSendToLast(true);
                workitem.setBlockindex(this.processedDocuments / this.batchSize);
                if (!hasNext()) {
                    workitem.setLastBlock(true);
                }
                workitem.addToIndexes();
            } catch (IOException e) {
                log.error("Error occurred while creating Workitem feature structure", e);
                throw new CollectionException(e);
            }
        }
    }

    public boolean hasNext() throws IOException, CollectionException {
        boolean z = this.hasNext;
        if (this.retriever != null) {
            z = !this.retriever.getDocumentIds().isEmpty();
        }
        if (!z) {
            close();
        }
        return z;
    }

    public List<Object[]> getNextDocumentIdBatch() {
        List<Object[]> nextFromDataTable = this.readDataTable.booleanValue() ? getNextFromDataTable() : getNextFromSubset();
        if (nextFromDataTable != null) {
            this.processedDocuments += nextFromDataTable.size();
        }
        return nextFromDataTable;
    }

    private List<Object[]> getNextFromDataTable() {
        ArrayList arrayList = new ArrayList(this.batchSize);
        this.hasNext = false;
        log.trace("Filling document ID list with the next batch of documents.");
        while (this.dataTableDocumentIds.hasNext() && arrayList.size() < this.batchSize) {
            arrayList.add((Object[]) this.dataTableDocumentIds.next());
        }
        if (this.processedDocuments < this.totalDocumentCount - 1) {
            log.trace("Checking if there are more documents to read from the data table.");
            this.hasNext = this.dataTableDocumentIds.hasNext();
        }
        return arrayList;
    }

    private List<Object[]> getNextFromSubset() {
        if (this.retriever == null) {
            this.retriever = new RetrievingThread();
        }
        List<Object[]> documentIds = this.retriever.getDocumentIds();
        if (this.fetchIdsProactively.booleanValue()) {
            this.retriever = new RetrievingThread();
        }
        return documentIds;
    }

    public Progress[] getProgress() {
        return new Progress[]{new ProgressImpl(this.processedDocuments, this.totalDocumentCount, "entities", true)};
    }

    public void close() {
        if (this.dbc != null) {
            this.dbc.close();
        }
        this.dbc = null;
    }
}
