package com.dangdang.ddframe.job.internal.job.dataflow;

import com.dangdang.ddframe.job.api.DataFlowElasticJob;
import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext;
import com.dangdang.ddframe.job.api.JobExecutionSingleShardingContext;
import com.dangdang.ddframe.job.internal.job.AbstractElasticJob;
import com.dangdang.ddframe.job.internal.job.AbstractJobExecutionShardingContext;
import com.google.common.collect.Lists;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.collections.CollectionUtils;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/dangdang/ddframe/job/internal/job/dataflow/AbstractDataFlowElasticJob.class */
public abstract class AbstractDataFlowElasticJob<T, C extends AbstractJobExecutionShardingContext> extends AbstractElasticJob implements DataFlowElasticJob<T, C> {
    private static final Logger log = LoggerFactory.getLogger(AbstractDataFlowElasticJob.class);
    private final ExecutorService executorService = getExecutorService();
    private final DataFlowType dataFlowType = getDataFlowType();

    private DataFlowType getDataFlowType() {
        ParameterizedType parameterizedType;
        Class<?> cls = getClass();
        while (true) {
            Class<?> cls2 = cls;
            if (cls2.getGenericSuperclass() instanceof ParameterizedType) {
                parameterizedType = (ParameterizedType) cls2.getGenericSuperclass();
                if (2 == parameterizedType.getActualTypeArguments().length) {
                    break;
                }
                cls = cls2.getSuperclass();
            } else {
                cls = cls2.getSuperclass();
            }
        }
        Type type = parameterizedType.getActualTypeArguments()[1];
        if (JobExecutionMultipleShardingContext.class == type) {
            return DataFlowType.THROUGHPUT;
        }
        if (JobExecutionSingleShardingContext.class == type) {
            return DataFlowType.SEQUENCE;
        }
        throw new UnsupportedOperationException(String.format("Cannot support %s", type));
    }

    @Override // com.dangdang.ddframe.job.internal.job.AbstractElasticJob
    protected final void executeJob(JobExecutionMultipleShardingContext jobExecutionMultipleShardingContext) {
        if (DataFlowType.THROUGHPUT == this.dataFlowType) {
            if (isStreamingProcess()) {
                executeThroughputStreamingJob(jobExecutionMultipleShardingContext);
                return;
            } else {
                executeThroughputOneOffJob(jobExecutionMultipleShardingContext);
                return;
            }
        }
        if (DataFlowType.SEQUENCE == this.dataFlowType) {
            if (isStreamingProcess()) {
                executeSequenceStreamingJob(jobExecutionMultipleShardingContext);
            } else {
                executeSequenceOneOffJob(jobExecutionMultipleShardingContext);
            }
        }
    }

    private void executeThroughputStreamingJob(JobExecutionMultipleShardingContext jobExecutionMultipleShardingContext) {
        List<T> fetchDataForThroughput = fetchDataForThroughput(jobExecutionMultipleShardingContext);
        while (true) {
            List<T> list = fetchDataForThroughput;
            if (CollectionUtils.isEmpty(list)) {
                return;
            }
            processDataForThroughput(jobExecutionMultipleShardingContext, list);
            if (!getJobFacade().isEligibleForJobRunning()) {
                return;
            } else {
                fetchDataForThroughput = fetchDataForThroughput(jobExecutionMultipleShardingContext);
            }
        }
    }

    private void executeThroughputOneOffJob(JobExecutionMultipleShardingContext jobExecutionMultipleShardingContext) {
        List<T> fetchDataForThroughput = fetchDataForThroughput(jobExecutionMultipleShardingContext);
        if (CollectionUtils.isEmpty(fetchDataForThroughput)) {
            return;
        }
        processDataForThroughput(jobExecutionMultipleShardingContext, fetchDataForThroughput);
    }

    private void executeSequenceStreamingJob(JobExecutionMultipleShardingContext jobExecutionMultipleShardingContext) {
        Map<Integer, List<T>> fetchDataForSequence = fetchDataForSequence(jobExecutionMultipleShardingContext);
        while (true) {
            Map<Integer, List<T>> map = fetchDataForSequence;
            if (map.isEmpty()) {
                return;
            }
            processDataForSequence(jobExecutionMultipleShardingContext, map);
            if (!getJobFacade().isEligibleForJobRunning()) {
                return;
            } else {
                fetchDataForSequence = fetchDataForSequence(jobExecutionMultipleShardingContext);
            }
        }
    }

    private void executeSequenceOneOffJob(JobExecutionMultipleShardingContext jobExecutionMultipleShardingContext) {
        Map<Integer, List<T>> fetchDataForSequence = fetchDataForSequence(jobExecutionMultipleShardingContext);
        if (fetchDataForSequence.isEmpty()) {
            return;
        }
        processDataForSequence(jobExecutionMultipleShardingContext, fetchDataForSequence);
    }

    private List<T> fetchDataForThroughput(JobExecutionMultipleShardingContext jobExecutionMultipleShardingContext) {
        List<T> fetchData = fetchData(jobExecutionMultipleShardingContext);
        log.trace("Elastic job: fetch data size: {}.", Integer.valueOf(fetchData != null ? fetchData.size() : 0));
        return fetchData;
    }

    private void processDataForThroughput(final JobExecutionMultipleShardingContext jobExecutionMultipleShardingContext, List<T> list) {
        int concurrentDataProcessThreadCount = getJobFacade().getConcurrentDataProcessThreadCount();
        if (concurrentDataProcessThreadCount <= 1 || list.size() <= concurrentDataProcessThreadCount) {
            processDataWithStatistics(jobExecutionMultipleShardingContext, list);
            return;
        }
        List<List> partition = Lists.partition(list, list.size() / concurrentDataProcessThreadCount);
        final CountDownLatch countDownLatch = new CountDownLatch(partition.size());
        for (final List list2 : partition) {
            this.executorService.submit(new Runnable() { // from class: com.dangdang.ddframe.job.internal.job.dataflow.AbstractDataFlowElasticJob.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        AbstractDataFlowElasticJob.this.processDataWithStatistics(jobExecutionMultipleShardingContext, list2);
                        countDownLatch.countDown();
                    } catch (Throwable th) {
                        countDownLatch.countDown();
                        throw th;
                    }
                }
            });
        }
        latchAwait(countDownLatch);
    }

    private Map<Integer, List<T>> fetchDataForSequence(final JobExecutionMultipleShardingContext jobExecutionMultipleShardingContext) {
        List<Integer> shardingItems = jobExecutionMultipleShardingContext.getShardingItems();
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(shardingItems.size());
        final CountDownLatch countDownLatch = new CountDownLatch(shardingItems.size());
        Iterator<Integer> it = shardingItems.iterator();
        while (it.hasNext()) {
            final int intValue = it.next().intValue();
            this.executorService.submit(new Runnable() { // from class: com.dangdang.ddframe.job.internal.job.dataflow.AbstractDataFlowElasticJob.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        List<T> fetchData = AbstractDataFlowElasticJob.this.fetchData(jobExecutionMultipleShardingContext.createJobExecutionSingleShardingContext(intValue));
                        if (null != fetchData && !fetchData.isEmpty()) {
                            concurrentHashMap.put(Integer.valueOf(intValue), fetchData);
                        }
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            });
        }
        latchAwait(countDownLatch);
        log.trace("Elastic job: fetch data size: {}.", Integer.valueOf(concurrentHashMap.size()));
        return concurrentHashMap;
    }

    private void processDataForSequence(final JobExecutionMultipleShardingContext jobExecutionMultipleShardingContext, Map<Integer, List<T>> map) {
        final CountDownLatch countDownLatch = new CountDownLatch(map.size());
        for (final Map.Entry<Integer, List<T>> entry : map.entrySet()) {
            this.executorService.submit(new Runnable() { // from class: com.dangdang.ddframe.job.internal.job.dataflow.AbstractDataFlowElasticJob.3
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        AbstractDataFlowElasticJob.this.processDataWithStatistics(jobExecutionMultipleShardingContext.createJobExecutionSingleShardingContext(((Integer) entry.getKey()).intValue()), (List) entry.getValue());
                        countDownLatch.countDown();
                    } catch (Throwable th) {
                        countDownLatch.countDown();
                        throw th;
                    }
                }
            });
        }
        latchAwait(countDownLatch);
    }

    protected abstract void processDataWithStatistics(C c, List<T> list);

    @Override // com.dangdang.ddframe.job.api.DataFlowElasticJob
    public final void updateOffset(int i, String str) {
        getJobFacade().updateOffset(i, str);
    }

    @Override // com.dangdang.ddframe.job.api.DataFlowElasticJob
    public ExecutorService getExecutorService() {
        return Executors.newCachedThreadPool();
    }

    @Override // com.dangdang.ddframe.job.internal.job.AbstractElasticJob, com.dangdang.ddframe.job.api.ElasticJob
    public void handleJobExecutionException(JobExecutionException jobExecutionException) throws JobExecutionException {
        log.error("Elastic job: exception occur in job processing...", jobExecutionException.getCause());
    }

    private void latchAwait(CountDownLatch countDownLatch) {
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
