package org.apache.griffin.core.job;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.apache.griffin.core.exception.GriffinException;
import org.apache.griffin.core.exception.GriffinExceptionMessage;
import org.apache.griffin.core.job.entity.AbstractJob;
import org.apache.griffin.core.job.entity.BatchJob;
import org.apache.griffin.core.job.entity.JobDataSegment;
import org.apache.griffin.core.job.entity.JobHealth;
import org.apache.griffin.core.job.entity.JobInstanceBean;
import org.apache.griffin.core.job.entity.JobState;
import org.apache.griffin.core.job.entity.LivySessionStates;
import org.apache.griffin.core.job.repo.BatchJobRepo;
import org.apache.griffin.core.job.repo.JobInstanceRepo;
import org.apache.griffin.core.measure.entity.DataSource;
import org.apache.griffin.core.measure.entity.GriffinMeasure;
import org.quartz.CronExpression;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

@Service
/* loaded from: input_file:org/apache/griffin/core/job/BatchJobOperatorImpl.class */
public class BatchJobOperatorImpl implements JobOperator {
    private static final Logger LOGGER;

    @Autowired
    @Qualifier("schedulerFactoryBean")
    private SchedulerFactoryBean factory;

    @Autowired
    private JobInstanceRepo instanceRepo;

    @Autowired
    private BatchJobRepo batchJobRepo;

    @Autowired
    private JobServiceImpl jobService;
    static final /* synthetic */ boolean $assertionsDisabled;

    @Override // org.apache.griffin.core.job.JobOperator
    @Transactional(rollbackFor = {Exception.class})
    public AbstractJob add(AbstractJob abstractJob, GriffinMeasure griffinMeasure) throws Exception {
        validateParams(abstractJob, griffinMeasure);
        String quartzName = this.jobService.getQuartzName(abstractJob);
        String quartzGroup = this.jobService.getQuartzGroup();
        this.jobService.addJob(this.jobService.getTriggerKeyIfValid(quartzName, quartzGroup), (BatchJob) this.batchJobRepo.save(genBatchJobBean(abstractJob, quartzName, quartzGroup)), GriffinMeasure.ProcessType.BATCH);
        return abstractJob;
    }

    private BatchJob genBatchJobBean(AbstractJob abstractJob, String str, String str2) {
        BatchJob batchJob = (BatchJob) abstractJob;
        batchJob.setMetricName(abstractJob.getJobName());
        batchJob.setGroup(str2);
        batchJob.setName(str);
        return batchJob;
    }

    @Override // org.apache.griffin.core.job.JobOperator
    public void start(AbstractJob abstractJob) {
        String name = abstractJob.getName();
        String group = abstractJob.getGroup();
        Trigger.TriggerState triggerState = getTriggerState(name, group);
        if (triggerState == null) {
            throw new GriffinException.BadRequestException(GriffinExceptionMessage.JOB_IS_NOT_SCHEDULED);
        }
        if (triggerState != Trigger.TriggerState.PAUSED) {
            throw new GriffinException.BadRequestException(GriffinExceptionMessage.JOB_IS_NOT_IN_PAUSED_STATUS);
        }
        try {
            this.factory.getScheduler().resumeJob(JobKey.jobKey(name, group));
        } catch (SchedulerException e) {
            throw new GriffinException.ServiceException("Failed to start job.", e);
        }
    }

    @Override // org.apache.griffin.core.job.JobOperator
    public void stop(AbstractJob abstractJob) {
        pauseJob((BatchJob) abstractJob, false);
    }

    @Override // org.apache.griffin.core.job.JobOperator
    @Transactional
    public void delete(AbstractJob abstractJob) {
        pauseJob((BatchJob) abstractJob, true);
    }

    @Override // org.apache.griffin.core.job.JobOperator
    public JobHealth getHealth(JobHealth jobHealth, AbstractJob abstractJob) throws SchedulerException {
        if (!CollectionUtils.isEmpty(this.jobService.getTriggers(abstractJob.getName(), abstractJob.getGroup()))) {
            jobHealth.setJobCount(jobHealth.getJobCount() + 1);
            if (this.jobService.isJobHealthy(abstractJob.getId()).booleanValue()) {
                jobHealth.setHealthyJobCount(jobHealth.getHealthyJobCount() + 1);
            }
        }
        return jobHealth;
    }

    @Override // org.apache.griffin.core.job.JobOperator
    public JobState getState(AbstractJob abstractJob, String str) throws SchedulerException {
        JobState jobState = new JobState();
        Scheduler scheduler = this.factory.getScheduler();
        if (abstractJob.getGroup() == null || abstractJob.getName() == null) {
            return null;
        }
        Trigger.TriggerState triggerState = scheduler.getTriggerState(TriggerKey.triggerKey(abstractJob.getName(), abstractJob.getGroup()));
        jobState.setState(triggerState.toString());
        jobState.setToStart(getStartStatus(triggerState));
        jobState.setToStop(getStopStatus(triggerState));
        setTriggerTime(abstractJob, jobState);
        return jobState;
    }

    private void setTriggerTime(AbstractJob abstractJob, JobState jobState) throws SchedulerException {
        List<? extends Trigger> triggers = this.jobService.getTriggers(abstractJob.getName(), abstractJob.getGroup());
        if (CollectionUtils.isEmpty(triggers)) {
            return;
        }
        Trigger trigger = triggers.get(0);
        Date nextFireTime = trigger.getNextFireTime();
        Date previousFireTime = trigger.getPreviousFireTime();
        jobState.setNextFireTime(Long.valueOf(nextFireTime != null ? nextFireTime.getTime() : -1L));
        jobState.setPreviousFireTime(Long.valueOf(previousFireTime != null ? previousFireTime.getTime() : -1L));
    }

    private boolean getStartStatus(Trigger.TriggerState triggerState) {
        return triggerState == Trigger.TriggerState.PAUSED;
    }

    private boolean getStopStatus(Trigger.TriggerState triggerState) {
        return triggerState == Trigger.TriggerState.NORMAL || triggerState == Trigger.TriggerState.BLOCKED;
    }

    private Trigger.TriggerState getTriggerState(String str, String str2) {
        try {
            List<? extends Trigger> triggers = this.jobService.getTriggers(str, str2);
            if (CollectionUtils.isEmpty(triggers)) {
                return null;
            }
            return this.factory.getScheduler().getTriggerState(triggers.get(0).getKey());
        } catch (SchedulerException e) {
            LOGGER.error("Failed to delete job", e);
            throw new GriffinException.ServiceException("Failed to delete job", e);
        }
    }

    private void pauseJob(BatchJob batchJob, boolean z) {
        try {
            pauseJob(batchJob.getGroup(), batchJob.getName());
            pausePredicateJob(batchJob);
            batchJob.setDeleted(z);
            this.batchJobRepo.save(batchJob);
        } catch (Exception e) {
            LOGGER.error("Job schedule happens exception.", e);
            throw new GriffinException.ServiceException("Job schedule happens exception.", e);
        }
    }

    private void pausePredicateJob(BatchJob batchJob) throws SchedulerException {
        for (JobInstanceBean jobInstanceBean : this.instanceRepo.findByJobId(batchJob.getId())) {
            if (!jobInstanceBean.isPredicateDeleted()) {
                deleteJob(jobInstanceBean.getPredicateGroup(), jobInstanceBean.getPredicateName());
                jobInstanceBean.setPredicateDeleted(true);
                if (jobInstanceBean.getState().equals(LivySessionStates.State.FINDING)) {
                    jobInstanceBean.setState(LivySessionStates.State.NOT_FOUND);
                }
            }
        }
    }

    public void deleteJob(String str, String str2) throws SchedulerException {
        Scheduler scheduler = this.factory.getScheduler();
        JobKey jobKey = new JobKey(str2, str);
        if (scheduler.checkExists(jobKey)) {
            scheduler.deleteJob(jobKey);
        } else {
            LOGGER.info("Job({},{}) does not exist.", jobKey.getGroup(), jobKey.getName());
        }
    }

    private void pauseJob(String str, String str2) throws SchedulerException {
        if (StringUtils.isEmpty(str) || StringUtils.isEmpty(str2)) {
            return;
        }
        Scheduler scheduler = this.factory.getScheduler();
        JobKey jobKey = new JobKey(str2, str);
        if (scheduler.checkExists(jobKey)) {
            scheduler.pauseJob(jobKey);
        } else {
            LOGGER.warn("Job({},{}) does not exist.", jobKey.getGroup(), jobKey.getName());
            throw new GriffinException.NotFoundException(GriffinExceptionMessage.JOB_KEY_DOES_NOT_EXIST);
        }
    }

    public boolean pauseJobInstances(List<JobInstanceBean> list) {
        if (CollectionUtils.isEmpty(list)) {
            return true;
        }
        ArrayList arrayList = new ArrayList();
        boolean z = true;
        Iterator<JobInstanceBean> it = list.iterator();
        while (it.hasNext()) {
            z = z && pauseJobInstance(it.next(), arrayList);
        }
        this.instanceRepo.saveAll(arrayList);
        return z;
    }

    private boolean pauseJobInstance(JobInstanceBean jobInstanceBean, List<JobInstanceBean> list) {
        String predicateGroup = jobInstanceBean.getPredicateGroup();
        String predicateName = jobInstanceBean.getPredicateName();
        try {
            if (!jobInstanceBean.isPredicateDeleted()) {
                deleteJob(predicateGroup, predicateName);
                jobInstanceBean.setPredicateDeleted(true);
                list.add(jobInstanceBean);
            }
            return true;
        } catch (SchedulerException e) {
            LOGGER.error("Failed to pause predicate job({},{}).", predicateGroup, predicateName);
            return false;
        }
    }

    private void validateParams(AbstractJob abstractJob, GriffinMeasure griffinMeasure) {
        if (!this.jobService.isValidJobName(abstractJob.getJobName())) {
            throw new GriffinException.BadRequestException(GriffinExceptionMessage.INVALID_JOB_NAME);
        }
        if (!isValidCronExpression(abstractJob.getCronExpression())) {
            throw new GriffinException.BadRequestException(GriffinExceptionMessage.INVALID_CRON_EXPRESSION);
        }
        if (!isValidBaseLine(abstractJob.getSegments())) {
            throw new GriffinException.BadRequestException(GriffinExceptionMessage.MISSING_BASELINE_CONFIG);
        }
        if (!isValidConnectorNames(abstractJob.getSegments(), getConnectorNames(griffinMeasure))) {
            throw new GriffinException.BadRequestException(GriffinExceptionMessage.INVALID_CONNECTOR_NAME);
        }
    }

    private boolean isValidCronExpression(String str) {
        if (StringUtils.isEmpty(str)) {
            LOGGER.warn("Cron Expression is empty.");
            return false;
        }
        if (CronExpression.isValidExpression(str)) {
            return true;
        }
        LOGGER.warn("Cron Expression is invalid: {}", str);
        return false;
    }

    private boolean isValidBaseLine(List<JobDataSegment> list) {
        if (!$assertionsDisabled && list == null) {
            throw new AssertionError();
        }
        Iterator<JobDataSegment> it = list.iterator();
        while (it.hasNext()) {
            if (it.next().isAsTsBaseline()) {
                return true;
            }
        }
        LOGGER.warn("Please set segment timestamp baseline in as.baseline field.");
        return false;
    }

    private boolean isValidConnectorNames(List<JobDataSegment> list, List<String> list2) {
        if (!$assertionsDisabled && list == null) {
            throw new AssertionError();
        }
        HashSet hashSet = new HashSet();
        Iterator<JobDataSegment> it = list.iterator();
        while (it.hasNext()) {
            String dataConnectorName = it.next().getDataConnectorName();
            hashSet.add(dataConnectorName);
            if (!list2.stream().anyMatch(str -> {
                return str.equals(dataConnectorName);
            })) {
                LOGGER.warn("Param {} is a illegal string. Please input one of strings in {}.", dataConnectorName, list2);
                return false;
            }
        }
        if (hashSet.size() >= list.size()) {
            return true;
        }
        LOGGER.warn("Connector names in job data segment cannot duplicate.");
        return false;
    }

    private List<String> getConnectorNames(GriffinMeasure griffinMeasure) {
        HashSet hashSet = new HashSet();
        List<DataSource> dataSources = griffinMeasure.getDataSources();
        Iterator<DataSource> it = dataSources.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getConnector().getName());
        }
        if (hashSet.size() >= dataSources.size()) {
            return new ArrayList(hashSet);
        }
        LOGGER.warn("Connector names cannot be repeated.");
        return Collections.emptyList();
    }

    static {
        $assertionsDisabled = !BatchJobOperatorImpl.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(BatchJobOperatorImpl.class);
    }
}
