package org.apache.griffin.core.job;

import com.fasterxml.jackson.core.type.TypeReference;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.TimeZone;
import org.apache.commons.lang.StringUtils;
import org.apache.griffin.core.config.EnvConfig;
import org.apache.griffin.core.event.GriffinEventManager;
import org.apache.griffin.core.event.JobEvent;
import org.apache.griffin.core.exception.GriffinException;
import org.apache.griffin.core.exception.GriffinExceptionMessage;
import org.apache.griffin.core.job.entity.AbstractJob;
import org.apache.griffin.core.job.entity.BatchJob;
import org.apache.griffin.core.job.entity.JobHealth;
import org.apache.griffin.core.job.entity.JobInstanceBean;
import org.apache.griffin.core.job.entity.JobState;
import org.apache.griffin.core.job.entity.JobType;
import org.apache.griffin.core.job.entity.LivySessionStates;
import org.apache.griffin.core.job.entity.StreamingJob;
import org.apache.griffin.core.job.repo.BatchJobRepo;
import org.apache.griffin.core.job.repo.JobInstanceRepo;
import org.apache.griffin.core.job.repo.JobRepo;
import org.apache.griffin.core.job.repo.StreamingJobRepo;
import org.apache.griffin.core.measure.entity.GriffinMeasure;
import org.apache.griffin.core.measure.repo.GriffinMeasureRepo;
import org.apache.griffin.core.util.JsonUtil;
import org.apache.griffin.core.util.YarnNetUtil;
import org.json.JSONArray;
import org.json.JSONObject;
import org.quartz.CronScheduleBuilder;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.env.Environment;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.ResourceAccessException;

@Service
/* loaded from: input_file:org/apache/griffin/core/job/JobServiceImpl.class */
public class JobServiceImpl implements JobService {
    private static final Logger LOGGER = LoggerFactory.getLogger(JobServiceImpl.class);
    public static final String GRIFFIN_JOB_ID = "griffinJobId";
    private static final int MAX_PAGE_SIZE = 1024;
    private static final int DEFAULT_PAGE_SIZE = 10;
    static final String START = "start";
    static final String STOP = "stop";

    @Autowired
    @Qualifier("schedulerFactoryBean")
    private SchedulerFactoryBean factory;

    @Autowired
    private JobInstanceRepo instanceRepo;

    @Autowired
    private Environment env;

    @Autowired
    private GriffinMeasureRepo measureRepo;

    @Autowired
    private BatchJobRepo batchJobRepo;

    @Autowired
    private StreamingJobRepo streamingJobRepo;

    @Autowired
    private JobRepo<AbstractJob> jobRepo;

    @Autowired
    private BatchJobOperatorImpl batchJobOp;

    @Autowired
    private StreamingJobOperatorImpl streamingJobOp;

    @Autowired
    private GriffinEventManager eventManager;

    @Autowired
    private LivyTaskSubmitHelper livyTaskSubmitHelper;

    @Override // org.apache.griffin.core.job.JobService
    public List<AbstractJob> getAliveJobs(String str) {
        return getJobDataBeans(JobType.BATCH.getName().equals(str) ? this.batchJobRepo.findByDeleted(false) : JobType.STREAMING.getName().equals(str) ? this.streamingJobRepo.findByDeleted(false) : this.jobRepo.findByDeleted(false));
    }

    private List<AbstractJob> getJobDataBeans(List<? extends AbstractJob> list) {
        ArrayList arrayList = new ArrayList();
        try {
            for (AbstractJob abstractJob : list) {
                abstractJob.setJobState(genJobState(abstractJob));
                arrayList.add(abstractJob);
            }
            return arrayList;
        } catch (SchedulerException e) {
            LOGGER.error("Failed to get RUNNING jobs.", e);
            throw new GriffinException.ServiceException("Failed to get RUNNING jobs.", e);
        }
    }

    @Override // org.apache.griffin.core.job.JobService
    public AbstractJob addJob(AbstractJob abstractJob) throws Exception {
        this.eventManager.notifyListeners(JobEvent.yieldJobEventBeforeCreation(abstractJob));
        GriffinMeasure measureIfValid = getMeasureIfValid(abstractJob.getMeasureId());
        AbstractJob add = getJobOperator(measureIfValid.getProcessType()).add(abstractJob, measureIfValid);
        this.eventManager.notifyListeners(JobEvent.yieldJobEventAfterCreation(add));
        return add;
    }

    @Override // org.apache.griffin.core.job.JobService
    public AbstractJob getJobConfig(Long l) {
        AbstractJob findByIdAndDeleted = this.jobRepo.findByIdAndDeleted(l, false);
        if (findByIdAndDeleted != null) {
            return findByIdAndDeleted;
        }
        LOGGER.warn("Job id {} does not exist.", l);
        throw new GriffinException.NotFoundException(GriffinExceptionMessage.JOB_ID_DOES_NOT_EXIST);
    }

    @Override // org.apache.griffin.core.job.JobService
    public AbstractJob onAction(Long l, String str) throws Exception {
        AbstractJob findByIdAndDeleted = this.jobRepo.findByIdAndDeleted(l, false);
        validateJobExist(findByIdAndDeleted);
        doAction(str, findByIdAndDeleted, getJobOperator(findByIdAndDeleted));
        findByIdAndDeleted.setJobState(genJobState(findByIdAndDeleted, str));
        return findByIdAndDeleted;
    }

    private void doAction(String str, AbstractJob abstractJob, JobOperator jobOperator) throws Exception {
        boolean z = -1;
        switch (str.hashCode()) {
            case 3540994:
                if (str.equals(STOP)) {
                    z = true;
                    break;
                }
                break;
            case 109757538:
                if (str.equals(START)) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                jobOperator.start(abstractJob);
                return;
            case true:
                jobOperator.stop(abstractJob);
                return;
            default:
                throw new GriffinException.NotFoundException(GriffinExceptionMessage.NO_SUCH_JOB_ACTION);
        }
    }

    @Override // org.apache.griffin.core.job.JobService
    public void deleteJob(Long l) throws SchedulerException {
        AbstractJob findByIdAndDeleted = this.jobRepo.findByIdAndDeleted(l, false);
        validateJobExist(findByIdAndDeleted);
        this.eventManager.notifyListeners(JobEvent.yieldJobEventBeforeRemoval(findByIdAndDeleted));
        getJobOperator(findByIdAndDeleted).delete(findByIdAndDeleted);
        this.eventManager.notifyListeners(JobEvent.yieldJobEventAfterRemoval(findByIdAndDeleted));
    }

    @Override // org.apache.griffin.core.job.JobService
    public void deleteJob(String str) throws SchedulerException {
        List<AbstractJob> findByJobNameAndDeleted = this.jobRepo.findByJobNameAndDeleted(str, false);
        if (CollectionUtils.isEmpty(findByJobNameAndDeleted)) {
            LOGGER.warn("There is no job with '{}' name.", str);
            throw new GriffinException.NotFoundException(GriffinExceptionMessage.JOB_NAME_DOES_NOT_EXIST);
        }
        for (AbstractJob abstractJob : findByJobNameAndDeleted) {
            this.eventManager.notifyListeners(JobEvent.yieldJobEventBeforeRemoval(abstractJob));
            getJobOperator(abstractJob).delete(abstractJob);
            this.eventManager.notifyListeners(JobEvent.yieldJobEventAfterRemoval(abstractJob));
        }
    }

    @Override // org.apache.griffin.core.job.JobService
    public List<JobInstanceBean> findInstancesOfJob(Long l, int i, int i2) {
        if (this.jobRepo.findByIdAndDeleted(l, false) == null) {
            LOGGER.warn("Job id {} does not exist.", l);
            throw new GriffinException.NotFoundException(GriffinExceptionMessage.JOB_ID_DOES_NOT_EXIST);
        }
        int i3 = i2 > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : i2;
        return updateState(this.instanceRepo.findByJobId(l, new PageRequest(i, i3 <= 0 ? DEFAULT_PAGE_SIZE : i3, Sort.Direction.DESC, new String[]{"tms"})));
    }

    @Override // org.apache.griffin.core.job.JobService
    public JobInstanceBean findInstance(Long l) {
        JobInstanceBean findByInstanceId = this.instanceRepo.findByInstanceId(l);
        if (findByInstanceId != null) {
            return findByInstanceId;
        }
        LOGGER.warn("Instance id {} does not exist.", l);
        throw new GriffinException.NotFoundException(GriffinExceptionMessage.INSTANCE_ID_DOES_NOT_EXIST);
    }

    private List<JobInstanceBean> updateState(List<JobInstanceBean> list) {
        for (JobInstanceBean jobInstanceBean : list) {
            LivySessionStates.State state = jobInstanceBean.getState();
            if (state.equals(LivySessionStates.State.UNKNOWN) || LivySessionStates.isActive(state)) {
                syncInstancesOfJob(jobInstanceBean);
            }
        }
        return list;
    }

    @Override // org.apache.griffin.core.job.JobService
    public List<JobInstanceBean> findInstancesByTriggerKey(String str) {
        return this.instanceRepo.findByTriggerKey(str);
    }

    @Override // org.apache.griffin.core.job.JobService
    public JobHealth getHealthInfo() {
        JobHealth jobHealth = new JobHealth();
        for (AbstractJob abstractJob : this.jobRepo.findByDeleted(false)) {
            try {
                jobHealth = getJobOperator(abstractJob).getHealth(jobHealth, abstractJob);
            } catch (SchedulerException e) {
                LOGGER.error("Job schedule exception. {}", e);
                throw new GriffinException.ServiceException("Fail to Get HealthInfo", e);
            }
        }
        return jobHealth;
    }

    @Scheduled(fixedDelayString = "${jobInstance.expired.milliseconds}")
    public void deleteExpiredJobInstance() {
        Long valueOf = Long.valueOf(System.currentTimeMillis());
        if (!this.batchJobOp.pauseJobInstances(this.instanceRepo.findByExpireTmsLessThanEqual(valueOf))) {
            LOGGER.error("Pause job failure.");
        } else {
            LOGGER.info("Delete {} expired job instances.", Integer.valueOf(this.instanceRepo.deleteByExpireTimestamp(valueOf)));
        }
    }

    private void validateJobExist(AbstractJob abstractJob) {
        if (abstractJob == null) {
            LOGGER.warn("Griffin job does not exist.");
            throw new GriffinException.NotFoundException(GriffinExceptionMessage.JOB_ID_DOES_NOT_EXIST);
        }
    }

    private JobOperator getJobOperator(AbstractJob abstractJob) {
        if (abstractJob instanceof BatchJob) {
            return this.batchJobOp;
        }
        if (abstractJob instanceof StreamingJob) {
            return this.streamingJobOp;
        }
        throw new GriffinException.BadRequestException(GriffinExceptionMessage.JOB_TYPE_DOES_NOT_SUPPORT);
    }

    private JobOperator getJobOperator(GriffinMeasure.ProcessType processType) {
        if (processType == GriffinMeasure.ProcessType.BATCH) {
            return this.batchJobOp;
        }
        if (processType == GriffinMeasure.ProcessType.STREAMING) {
            return this.streamingJobOp;
        }
        throw new GriffinException.BadRequestException(GriffinExceptionMessage.MEASURE_TYPE_DOES_NOT_SUPPORT);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TriggerKey getTriggerKeyIfValid(String str, String str2) throws SchedulerException {
        TriggerKey triggerKey = TriggerKey.triggerKey(str, str2);
        if (this.factory.getScheduler().checkExists(triggerKey)) {
            throw new GriffinException.ConflictException(GriffinExceptionMessage.QUARTZ_JOB_ALREADY_EXIST);
        }
        return triggerKey;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<? extends Trigger> getTriggers(String str, String str2) throws SchedulerException {
        if (str == null || str2 == null) {
            return null;
        }
        return this.factory.getScheduler().getTriggersOfJob(new JobKey(str, str2));
    }

    private JobState genJobState(AbstractJob abstractJob, String str) throws SchedulerException {
        JobState state = getJobOperator(abstractJob).getState(abstractJob, str);
        abstractJob.setJobState(state);
        return state;
    }

    private JobState genJobState(AbstractJob abstractJob) throws SchedulerException {
        return genJobState(abstractJob, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addJob(TriggerKey triggerKey, AbstractJob abstractJob, GriffinMeasure.ProcessType processType) throws Exception {
        this.factory.getScheduler().scheduleJob(genTriggerInstance(triggerKey, addJobDetail(triggerKey, abstractJob), abstractJob, processType));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getQuartzName(AbstractJob abstractJob) {
        return abstractJob.getJobName() + "_" + System.currentTimeMillis();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getQuartzGroup() {
        return "BA";
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isValidJobName(String str) {
        if (StringUtils.isEmpty(str)) {
            LOGGER.warn("Job name cannot be empty.");
            return false;
        }
        if (this.jobRepo.countByJobNameAndDeleted(str, false) <= 0) {
            return true;
        }
        LOGGER.warn("Job name already exits.");
        return false;
    }

    private GriffinMeasure getMeasureIfValid(Long l) {
        GriffinMeasure findByIdAndDeleted = this.measureRepo.findByIdAndDeleted(l, false);
        if (findByIdAndDeleted != null) {
            return findByIdAndDeleted;
        }
        LOGGER.warn("The measure id {} isn't valid. Maybe it doesn't exist or is external measure type.", l);
        throw new GriffinException.BadRequestException(GriffinExceptionMessage.INVALID_MEASURE_ID);
    }

    private Trigger genTriggerInstance(TriggerKey triggerKey, JobDetail jobDetail, AbstractJob abstractJob, GriffinMeasure.ProcessType processType) {
        TriggerBuilder forJob = TriggerBuilder.newTrigger().withIdentity(triggerKey).forJob(jobDetail);
        if (processType == GriffinMeasure.ProcessType.BATCH) {
            return forJob.withSchedule(CronScheduleBuilder.cronSchedule(abstractJob.getCronExpression()).inTimeZone(TimeZone.getTimeZone(abstractJob.getTimeZone()))).build();
        }
        if (processType == GriffinMeasure.ProcessType.STREAMING) {
            return forJob.startNow().withSchedule(SimpleScheduleBuilder.simpleSchedule().withRepeatCount(0)).build();
        }
        throw new GriffinException.BadRequestException(GriffinExceptionMessage.JOB_TYPE_DOES_NOT_SUPPORT);
    }

    private JobDetail addJobDetail(TriggerKey triggerKey, AbstractJob abstractJob) throws SchedulerException {
        Scheduler scheduler = this.factory.getScheduler();
        JobKey jobKey = JobKey.jobKey(triggerKey.getName(), triggerKey.getGroup());
        Boolean valueOf = Boolean.valueOf(scheduler.checkExists(jobKey));
        JobDetail jobDetail = valueOf.booleanValue() ? scheduler.getJobDetail(jobKey) : JobBuilder.newJob(JobInstance.class).storeDurably().withIdentity(jobKey).build();
        setJobDataMap(jobDetail, abstractJob);
        scheduler.addJob(jobDetail, valueOf.booleanValue());
        return jobDetail;
    }

    private void setJobDataMap(JobDetail jobDetail, AbstractJob abstractJob) {
        jobDetail.getJobDataMap().put(GRIFFIN_JOB_ID, abstractJob.getId().toString());
    }

    public void deleteJobsRelateToMeasure(Long l) throws SchedulerException {
        List<AbstractJob> findByMeasureIdAndDeleted = this.jobRepo.findByMeasureIdAndDeleted(l, false);
        if (CollectionUtils.isEmpty(findByMeasureIdAndDeleted)) {
            LOGGER.info("Measure id {} has no related jobs.", l);
            return;
        }
        for (AbstractJob abstractJob : findByMeasureIdAndDeleted) {
            getJobOperator(abstractJob).delete(abstractJob);
        }
    }

    @Scheduled(fixedDelayString = "${jobInstance.fixedDelay.in.milliseconds}")
    public void syncInstancesOfAllJobs() {
        Iterator<JobInstanceBean> it = this.instanceRepo.findByActiveState(new LivySessionStates.State[]{LivySessionStates.State.STARTING, LivySessionStates.State.NOT_STARTED, LivySessionStates.State.RECOVERING, LivySessionStates.State.IDLE, LivySessionStates.State.RUNNING, LivySessionStates.State.BUSY}).iterator();
        while (it.hasNext()) {
            syncInstancesOfJob(it.next());
        }
    }

    private void syncInstancesOfJob(JobInstanceBean jobInstanceBean) {
        if (jobInstanceBean.getSessionId() == null) {
            return;
        }
        String str = this.env.getProperty("livy.uri") + "/" + jobInstanceBean.getSessionId();
        TypeReference<HashMap<String, Object>> typeReference = new TypeReference<HashMap<String, Object>>() { // from class: org.apache.griffin.core.job.JobServiceImpl.1
        };
        try {
            String fromLivy = this.livyTaskSubmitHelper.getFromLivy(str);
            LOGGER.info(fromLivy);
            setJobInstanceIdAndUri(jobInstanceBean, (HashMap) JsonUtil.toEntity(fromLivy, typeReference));
        } catch (HttpClientErrorException e) {
            LOGGER.warn("sessionId({}) appId({}) {}.", new Object[]{jobInstanceBean.getSessionId(), jobInstanceBean.getAppId(), e.getMessage()});
            setStateByYarn(jobInstanceBean, e);
            this.livyTaskSubmitHelper.decreaseCurTaskNum(jobInstanceBean.getId());
        } catch (Exception e2) {
            LOGGER.error(e2.getMessage());
        } catch (ResourceAccessException e3) {
            LOGGER.error("Your url may be wrong. Please check {}.\n {}", str, e3.getMessage());
        }
    }

    private void setStateByYarn(JobInstanceBean jobInstanceBean, HttpClientErrorException httpClientErrorException) {
        if (checkStatus(jobInstanceBean, httpClientErrorException)) {
            return;
        }
        int value = httpClientErrorException.getStatusCode().value();
        if ((value == 400 || value == 404) && jobInstanceBean.getAppId() != null) {
            setStateByYarn(jobInstanceBean);
        }
    }

    private boolean checkStatus(JobInstanceBean jobInstanceBean, HttpClientErrorException httpClientErrorException) {
        int value = httpClientErrorException.getStatusCode().value();
        String appId = jobInstanceBean.getAppId();
        String responseBodyAsString = httpClientErrorException.getResponseBodyAsString();
        Long sessionId = jobInstanceBean.getSessionId();
        Long valueOf = Long.valueOf(sessionId != null ? sessionId.longValue() : -1L);
        if (value != 404 || appId != null || responseBodyAsString == null || !responseBodyAsString.contains(valueOf.toString())) {
            return false;
        }
        jobInstanceBean.setState(LivySessionStates.State.DEAD);
        jobInstanceBean.setDeleted(true);
        this.instanceRepo.save(jobInstanceBean);
        return true;
    }

    private void setStateByYarn(JobInstanceBean jobInstanceBean) {
        LOGGER.warn("Spark session {} may be overdue! Now we use yarn to update state.", jobInstanceBean.getSessionId());
        if (!YarnNetUtil.update(this.env.getProperty("yarn.uri"), jobInstanceBean)) {
            if (jobInstanceBean.getState().equals(LivySessionStates.State.UNKNOWN)) {
                return;
            } else {
                jobInstanceBean.setState(LivySessionStates.State.UNKNOWN);
            }
        }
        this.instanceRepo.save(jobInstanceBean);
    }

    private void setJobInstanceIdAndUri(JobInstanceBean jobInstanceBean, HashMap<String, Object> hashMap) {
        if (hashMap != null) {
            Object obj = hashMap.get("state");
            Object obj2 = hashMap.get("appId");
            jobInstanceBean.setState(obj == null ? null : LivySessionStates.State.valueOf(obj.toString().toUpperCase()));
            jobInstanceBean.setAppId(obj2 == null ? null : obj2.toString());
            jobInstanceBean.setAppUri(obj2 == null ? null : this.env.getProperty("yarn.uri") + "/cluster/app/" + obj2);
            this.instanceRepo.save(jobInstanceBean);
            if (jobInstanceBean.getState().equals(LivySessionStates.State.SUCCESS) || jobInstanceBean.getState().equals(LivySessionStates.State.DEAD)) {
                this.livyTaskSubmitHelper.decreaseCurTaskNum(jobInstanceBean.getSessionId());
            }
        }
    }

    public Boolean isJobHealthy(Long l) {
        List<JobInstanceBean> findByJobId = this.instanceRepo.findByJobId(l, new PageRequest(0, 1, Sort.Direction.DESC, new String[]{"tms"}));
        return Boolean.valueOf(!CollectionUtils.isEmpty(findByJobId) && LivySessionStates.isHealthy(findByJobId.get(0).getState()));
    }

    @Override // org.apache.griffin.core.job.JobService
    public String getJobHdfsSinksPath(String str, long j) {
        List<AbstractJob> findByJobNameAndDeleted = this.jobRepo.findByJobNameAndDeleted(str, false);
        if (findByJobNameAndDeleted.size() == 0) {
            return null;
        }
        return findByJobNameAndDeleted.get(0).getType().toLowerCase().equals("batch") ? getSinksPath(EnvConfig.ENV_BATCH) + "/" + str + "/" + j + "" : getSinksPath(EnvConfig.ENV_STREAMING) + "/" + str + "/" + j + "";
    }

    private String getSinksPath(String str) {
        try {
            JSONArray jSONArray = new JSONObject(str).getJSONArray("sinks");
            for (int i = 0; i < jSONArray.length(); i++) {
                Object obj = jSONArray.getJSONObject(i).get("type");
                if ((obj instanceof String) && "hdfs".equalsIgnoreCase(String.valueOf(obj))) {
                    return jSONArray.getJSONObject(i).getJSONObject("config").getString("path");
                }
            }
            return null;
        } catch (Exception e) {
            LOGGER.error("Fail to get Persist path from {}", str, e);
            return null;
        }
    }

    @Override // org.apache.griffin.core.job.JobService
    public String triggerJobById(Long l) throws SchedulerException {
        AbstractJob findByIdAndDeleted = this.jobRepo.findByIdAndDeleted(l, false);
        validateJobExist(findByIdAndDeleted);
        Scheduler scheduler = this.factory.getScheduler();
        JobKey jobKey = JobKey.jobKey(findByIdAndDeleted.getName(), findByIdAndDeleted.getGroup());
        if (!scheduler.checkExists(jobKey)) {
            throw new GriffinException.NotFoundException(GriffinExceptionMessage.JOB_ID_DOES_NOT_EXIST);
        }
        Trigger build = TriggerBuilder.newTrigger().forJob(jobKey).startNow().build();
        scheduler.scheduleJob(build);
        return build.getKey().toString();
    }
}
