package org.shoulder.batch.service.impl;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import javax.annotation.Nonnull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.shoulder.batch.enums.BatchErrorCodeEnum;
import org.shoulder.batch.enums.BatchResultEnum;
import org.shoulder.batch.model.BatchDataSlice;
import org.shoulder.batch.model.BatchRecordDetail;
import org.shoulder.batch.model.DataItem;
import org.shoulder.batch.service.ext.BatchTaskSliceHandler;
import org.shoulder.core.exception.CommonErrorCodeEnum;
import org.shoulder.core.i18.Translator;
import org.shoulder.core.log.Logger;
import org.shoulder.core.log.LoggerFactory;
import org.shoulder.core.util.ContextUtils;

/* loaded from: input_file:org/shoulder/batch/service/impl/BatchProcessor.class */
public class BatchProcessor implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(BatchProcessor.class);
    private final BlockingQueue<BatchDataSlice> taskQueue;
    private final BlockingQueue<BatchRecordDetail> resultQueue;
    private Collection<BatchTaskSliceHandler> batchTaskSliceHandlers = ContextUtils.getBeansOfType(BatchTaskSliceHandler.class).values();
    protected Translator translator = (Translator) ContextUtils.getBean(Translator.class);
    protected String name;

    public BatchProcessor(String str, BlockingQueue<BatchDataSlice> blockingQueue, BlockingQueue<BatchRecordDetail> blockingQueue2) {
        this.name = str;
        this.taskQueue = blockingQueue;
        this.resultQueue = blockingQueue2;
    }

    @Override // java.lang.Runnable
    public void run() {
        int i = 0;
        while (true) {
            BatchDataSlice poll = this.taskQueue.poll();
            if (poll == null) {
                log.info("{} stop, processed {}", getName(), Integer.valueOf(i));
                return;
            } else {
                putResult(doWork(poll));
                i++;
            }
        }
    }

    private void putResult(List<BatchRecordDetail> list) {
        for (int i = 0; i < list.size(); i++) {
            try {
                this.resultQueue.put(list.get(i));
            } catch (InterruptedException e) {
                log.error("put result into queue FAIL, size=" + list.size() + " put=" + i, e);
                return;
            }
        }
    }

    public String getName() {
        return this.name;
    }

    public List<BatchRecordDetail> doWork(@Nonnull BatchDataSlice batchDataSlice) {
        log.info("task start. {}", batchDataSlice.toString());
        if (CollectionUtils.isEmpty(batchDataSlice.getBatchList())) {
            return Collections.emptyList();
        }
        String dataType = batchDataSlice.getDataType();
        String operationType = batchDataSlice.getOperationType();
        List<BatchRecordDetail> list = null;
        try {
            list = this.batchTaskSliceHandlers.stream().filter(batchTaskSliceHandler -> {
                return batchTaskSliceHandler.support(dataType, operationType);
            }).findFirst().orElseThrow(() -> {
                return BatchErrorCodeEnum.DATA_TYPE_OR_OPERATION_NOT_SUPPORT.toException(new Object[]{dataType, operationType});
            }).handle(batchDataSlice);
        } catch (Exception e) {
            log.error("worker " + getName() + " processed failed", e);
        }
        log.info("task {}-{} finished", batchDataSlice.getTaskId(), Integer.valueOf(batchDataSlice.getSequence()));
        return recheckResultList(batchDataSlice, ListUtils.emptyIfNull(list));
    }

    private List<BatchRecordDetail> recheckResultList(@Nonnull BatchDataSlice batchDataSlice, @Nonnull List<BatchRecordDetail> list) {
        int size = batchDataSlice.getBatchList().size();
        int size2 = list.size();
        if (size == size2) {
            return list;
        }
        log.warnWithErrorCode(BatchErrorCodeEnum.TASK_SLICE_RESULT_INVALID.getCode(), BatchErrorCodeEnum.TASK_SLICE_RESULT_INVALID.getMessage(), Integer.valueOf(size), Integer.valueOf(size2));
        Iterator<? extends DataItem> it = batchDataSlice.getBatchList().iterator();
        while (it.hasNext()) {
            int rowNum = it.next().getRowNum();
            if (list.stream().filter(batchRecordDetail -> {
                return rowNum == batchRecordDetail.getRowNum();
            }).findFirst().orElse(null) == null) {
                list.add(new BatchRecordDetail(rowNum, BatchResultEnum.IMPORT_FAILED.getCode(), CommonErrorCodeEnum.UNKNOWN.getCode()));
            }
        }
        return list;
    }
}
