package org.apache.griffin.core.job;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.griffin.core.exception.GriffinException;
import org.apache.griffin.core.exception.GriffinExceptionMessage;
import org.apache.griffin.core.job.entity.AbstractJob;
import org.apache.griffin.core.job.entity.JobDataSegment;
import org.apache.griffin.core.job.entity.JobInstanceBean;
import org.apache.griffin.core.job.entity.LivySessionStates;
import org.apache.griffin.core.job.entity.SegmentPredicate;
import org.apache.griffin.core.job.entity.SegmentRange;
import org.apache.griffin.core.job.repo.JobInstanceRepo;
import org.apache.griffin.core.job.repo.JobRepo;
import org.apache.griffin.core.measure.entity.DataConnector;
import org.apache.griffin.core.measure.entity.DataSource;
import org.apache.griffin.core.measure.entity.GriffinMeasure;
import org.apache.griffin.core.measure.repo.GriffinMeasureRepo;
import org.apache.griffin.core.util.JsonUtil;
import org.apache.griffin.core.util.TimeUtil;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobKey;
import org.quartz.PersistJobDataAfterExecution;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.transaction.annotation.Transactional;

@PersistJobDataAfterExecution
@DisallowConcurrentExecution
/* loaded from: input_file:org/apache/griffin/core/job/JobInstance.class */
public class JobInstance implements Job {
    private static final Logger LOGGER = LoggerFactory.getLogger(JobInstance.class);
    public static final String MEASURE_KEY = "measure";
    public static final String PREDICATES_KEY = "predicts";
    public static final String PREDICATE_JOB_NAME = "predicateJobName";
    private static final String TRIGGER_KEY = "trigger";
    static final String JOB_NAME = "jobName";
    static final String PATH_CONNECTOR_CHARACTER = ",";
    public static final String INTERVAL = "interval";
    public static final String REPEAT = "repeat";
    public static final String CHECK_DONEFILE_SCHEDULE = "checkdonefile.schedule";

    @Autowired
    @Qualifier("schedulerFactoryBean")
    private SchedulerFactoryBean factory;

    @Autowired
    private GriffinMeasureRepo measureRepo;

    @Autowired
    private JobRepo<AbstractJob> jobRepo;

    @Autowired
    private JobInstanceRepo instanceRepo;

    @Autowired
    private Environment env;
    private GriffinMeasure measure;
    private AbstractJob job;
    private List<SegmentPredicate> mPredicates;
    private Long jobStartTime;

    @Transactional
    public void execute(JobExecutionContext jobExecutionContext) {
        try {
            initParam(jobExecutionContext);
            setSourcesPartitionsAndPredicates(this.measure.getDataSources());
            createJobInstance(this.job.getConfigMap());
        } catch (Exception e) {
            LOGGER.error("Create predicate job failure.", e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void initParam(JobExecutionContext jobExecutionContext) throws SchedulerException {
        this.mPredicates = new ArrayList();
        JobDetail jobDetail = jobExecutionContext.getJobDetail();
        this.job = (AbstractJob) this.jobRepo.findOne(Long.valueOf(jobDetail.getJobDataMap().getLong(JobServiceImpl.GRIFFIN_JOB_ID)));
        this.measure = (GriffinMeasure) this.measureRepo.findOne(this.job.getMeasureId());
        setJobStartTime(jobDetail);
        if (this.job.getConfigMap() == null) {
            this.job.setConfigMap(new HashMap());
        }
        this.job.getConfigMap().put(TRIGGER_KEY, jobExecutionContext.getTrigger().getKey().toString());
    }

    private void setJobStartTime(JobDetail jobDetail) throws SchedulerException {
        this.jobStartTime = Long.valueOf(((Trigger) this.factory.getScheduler().getTriggersOfJob(jobDetail.getKey()).get(0)).getPreviousFireTime().getTime());
    }

    private void setSourcesPartitionsAndPredicates(List<DataSource> list) {
        boolean z = true;
        for (JobDataSegment jobDataSegment : this.job.getSegments()) {
            if (jobDataSegment.isAsTsBaseline() && z) {
                this.measure.setTimestamp(Long.valueOf(this.jobStartTime.longValue() + TimeUtil.str2Long(jobDataSegment.getSegmentRange().getBegin()).longValue()));
                z = false;
            }
            Iterator<DataSource> it = list.iterator();
            while (it.hasNext()) {
                setDataConnectorPartitions(jobDataSegment, it.next().getConnector());
            }
        }
    }

    private void setDataConnectorPartitions(JobDataSegment jobDataSegment, DataConnector dataConnector) {
        if (jobDataSegment.getDataConnectorName().equals(dataConnector.getName())) {
            Long[] genSampleTs = genSampleTs(jobDataSegment.getSegmentRange(), dataConnector);
            setConnectorConf(dataConnector, genSampleTs);
            setConnectorPredicates(dataConnector, genSampleTs);
        }
    }

    private Long[] genSampleTs(SegmentRange segmentRange, DataConnector dataConnector) {
        Long str2Long = TimeUtil.str2Long(segmentRange.getBegin());
        Long str2Long2 = TimeUtil.str2Long(segmentRange.getLength());
        String dataUnit = dataConnector.getDataUnit();
        Long str2Long3 = TimeUtil.str2Long(StringUtils.isEmpty(dataUnit) ? dataConnector.getDefaultDataUnit() : dataUnit);
        Long valueOf = Long.valueOf(this.jobStartTime.longValue() + str2Long.longValue());
        if (str2Long2.longValue() < 0) {
            valueOf = Long.valueOf(valueOf.longValue() + str2Long2.longValue());
            str2Long2 = Long.valueOf(Math.abs(str2Long2.longValue()));
        }
        if (Math.abs(str2Long3.longValue()) >= str2Long2.longValue() || str2Long3.longValue() == 0) {
            return new Long[]{valueOf};
        }
        int longValue = (int) (str2Long2.longValue() / str2Long3.longValue());
        Long[] lArr = new Long[longValue];
        for (int i = 0; i < longValue; i++) {
            lArr[i] = Long.valueOf(valueOf.longValue() + (i * str2Long3.longValue()));
        }
        return lArr;
    }

    private void setConnectorPredicates(DataConnector dataConnector, Long[] lArr) {
        for (SegmentPredicate segmentPredicate : dataConnector.getPredicates()) {
            genConfMap(segmentPredicate.getConfigMap(), lArr, dataConnector.getDataTimeZone());
            segmentPredicate.setConfigMap(segmentPredicate.getConfigMap());
            this.mPredicates.add(segmentPredicate);
        }
    }

    private void setConnectorConf(DataConnector dataConnector, Long[] lArr) {
        genConfMap(dataConnector.getConfigMap(), lArr, dataConnector.getDataTimeZone());
        dataConnector.setConfigMap(dataConnector.getConfigMap());
    }

    private void genConfMap(Map<String, Object> map, Long[] lArr, String str) {
        if (map == null) {
            LOGGER.warn("Predicate config is null.");
            return;
        }
        for (Map.Entry<String, Object> entry : map.entrySet()) {
            if (entry.getValue() instanceof String) {
                String str2 = (String) entry.getValue();
                HashSet hashSet = new HashSet();
                if (!StringUtils.isEmpty(str2)) {
                    for (Long l : lArr) {
                        hashSet.add(TimeUtil.format(str2, l.longValue(), TimeUtil.getTimeZone(str)));
                    }
                    map.put(entry.getKey(), StringUtils.join(hashSet, PATH_CONNECTOR_CHARACTER));
                }
            }
        }
    }

    private void createJobInstance(Map<String, Object> map) throws Exception {
        Map<String, Object> checkConfMap = checkConfMap(map != null ? map : new HashMap<>());
        Map map2 = (Map) checkConfMap.get(CHECK_DONEFILE_SCHEDULE);
        Long str2Long = TimeUtil.str2Long((String) map2.get(INTERVAL));
        Integer valueOf = Integer.valueOf(map2.get(REPEAT).toString());
        String str = this.job.getJobName() + "_predicate_" + System.currentTimeMillis();
        TriggerKey triggerKey = TriggerKey.triggerKey(str, "PG");
        if (this.factory.getScheduler().checkExists(triggerKey)) {
            throw new GriffinException.ConflictException(GriffinExceptionMessage.QUARTZ_JOB_ALREADY_EXIST);
        }
        saveJobInstance(str, "PG", (String) checkConfMap.get(TRIGGER_KEY));
        createJobInstance(triggerKey, str2Long, valueOf, str);
    }

    Map<String, Object> checkConfMap(Map<String, Object> map) {
        Map map2 = (Map) map.get(CHECK_DONEFILE_SCHEDULE);
        String property = this.env.getProperty("predicate.job.interval");
        String str = property != null ? property : "5m";
        String property2 = this.env.getProperty("predicate.job.repeat.count");
        String str2 = property2 != null ? property2 : "12";
        if (map2 == null) {
            HashMap hashMap = new HashMap();
            hashMap.put(INTERVAL, str);
            hashMap.put(REPEAT, str2);
            map.put(CHECK_DONEFILE_SCHEDULE, hashMap);
        } else {
            String obj = map2.get(REPEAT).toString();
            String obj2 = map2.get(INTERVAL).toString();
            String str3 = obj2 != null ? obj2 : str;
            String str4 = obj != null ? obj : str2;
            map2.put(INTERVAL, str3);
            map2.put(REPEAT, str4);
        }
        return map;
    }

    private void saveJobInstance(String str, String str2, String str3) {
        GriffinMeasure.ProcessType processType = this.measure.getProcessType() == GriffinMeasure.ProcessType.BATCH ? GriffinMeasure.ProcessType.BATCH : GriffinMeasure.ProcessType.STREAMING;
        Long valueOf = Long.valueOf(System.currentTimeMillis());
        String property = this.env.getProperty("jobInstance.expired.milliseconds");
        JobInstanceBean jobInstanceBean = new JobInstanceBean(LivySessionStates.State.FINDING, str, str2, valueOf, Long.valueOf(Long.valueOf(property != null ? property : "604800000").longValue() + valueOf.longValue()), processType);
        jobInstanceBean.setJob(this.job);
        jobInstanceBean.setTriggerKey(str3);
        this.instanceRepo.save(jobInstanceBean);
    }

    private void createJobInstance(TriggerKey triggerKey, Long l, Integer num, String str) throws Exception {
        this.factory.getScheduler().scheduleJob(genTriggerInstance(triggerKey, addJobDetail(triggerKey, str), l, num));
    }

    private Trigger genTriggerInstance(TriggerKey triggerKey, JobDetail jobDetail, Long l, Integer num) {
        return TriggerBuilder.newTrigger().withIdentity(triggerKey).forJob(jobDetail).startNow().withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInMilliseconds(l.longValue()).withRepeatCount(num.intValue())).build();
    }

    private JobDetail addJobDetail(TriggerKey triggerKey, String str) throws SchedulerException, IOException {
        Scheduler scheduler = this.factory.getScheduler();
        JobKey jobKey = JobKey.jobKey(triggerKey.getName(), triggerKey.getGroup());
        Boolean valueOf = Boolean.valueOf(scheduler.checkExists(jobKey));
        JobDetail jobDetail = valueOf.booleanValue() ? scheduler.getJobDetail(jobKey) : JobBuilder.newJob(SparkSubmitJob.class).storeDurably().withIdentity(jobKey).build();
        setJobDataMap(jobDetail, str);
        scheduler.addJob(jobDetail, valueOf.booleanValue());
        return jobDetail;
    }

    private void setJobDataMap(JobDetail jobDetail, String str) throws IOException {
        JobDataMap jobDataMap = jobDetail.getJobDataMap();
        preProcessMeasure();
        jobDataMap.put(MEASURE_KEY, JsonUtil.toJson(this.measure));
        jobDataMap.put(PREDICATES_KEY, JsonUtil.toJson(this.mPredicates));
        jobDataMap.put(JOB_NAME, this.job.getJobName());
        jobDataMap.put(PREDICATE_JOB_NAME, str);
    }

    private void preProcessMeasure() throws IOException {
        DataSource next;
        Map<String, Object> checkpointMap;
        Iterator<DataSource> it = this.measure.getDataSources().iterator();
        while (it.hasNext() && (checkpointMap = (next = it.next()).getCheckpointMap()) != null) {
            next.setCheckpointMap((Map) JsonUtil.toEntity(JsonUtil.toJson(checkpointMap).replaceAll("\\$\\{JOB_NAME}", this.job.getJobName()).replaceAll("\\$\\{SOURCE_NAME}", next.getName()).replaceAll("\\$\\{TARGET_NAME}", next.getName()), Map.class));
        }
    }
}
