package org.apache.griffin.core.job;

import java.util.Iterator;
import javax.annotation.PostConstruct;
import org.apache.griffin.core.exception.GriffinException;
import org.apache.griffin.core.exception.GriffinExceptionMessage;
import org.apache.griffin.core.job.entity.AbstractJob;
import org.apache.griffin.core.job.entity.JobHealth;
import org.apache.griffin.core.job.entity.JobInstanceBean;
import org.apache.griffin.core.job.entity.JobState;
import org.apache.griffin.core.job.entity.LivySessionStates;
import org.apache.griffin.core.job.entity.StreamingJob;
import org.apache.griffin.core.job.repo.JobInstanceRepo;
import org.apache.griffin.core.job.repo.StreamingJobRepo;
import org.apache.griffin.core.measure.entity.GriffinMeasure;
import org.apache.griffin.core.util.YarnNetUtil;
import org.quartz.JobKey;
import org.quartz.SchedulerException;
import org.quartz.TriggerKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.web.client.ResourceAccessException;
import org.springframework.web.client.RestClientException;

@Service
/* loaded from: input_file:org/apache/griffin/core/job/StreamingJobOperatorImpl.class */
public class StreamingJobOperatorImpl implements JobOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamingJobOperatorImpl.class);

    @Autowired
    private StreamingJobRepo streamingJobRepo;

    @Autowired
    private Environment env;

    @Autowired
    private JobServiceImpl jobService;

    @Autowired
    private JobInstanceRepo instanceRepo;

    @Autowired
    @Qualifier("schedulerFactoryBean")
    private SchedulerFactoryBean factory;

    @Autowired
    private LivyTaskSubmitHelper livyTaskSubmitHelper;
    private String livyUri;

    @PostConstruct
    public void init() {
        this.livyUri = this.env.getProperty("livy.uri");
    }

    @Override // org.apache.griffin.core.job.JobOperator
    @Transactional(rollbackFor = {Exception.class})
    public AbstractJob add(AbstractJob abstractJob, GriffinMeasure griffinMeasure) throws Exception {
        validateParams(abstractJob);
        String quartzName = this.jobService.getQuartzName(abstractJob);
        String quartzGroup = this.jobService.getQuartzGroup();
        TriggerKey triggerKeyIfValid = this.jobService.getTriggerKeyIfValid(quartzName, quartzGroup);
        StreamingJob streamingJob = (StreamingJob) this.streamingJobRepo.save(genStreamingJobBean(abstractJob, quartzName, quartzGroup));
        this.jobService.addJob(triggerKeyIfValid, streamingJob, GriffinMeasure.ProcessType.STREAMING);
        return streamingJob;
    }

    private StreamingJob genStreamingJobBean(AbstractJob abstractJob, String str, String str2) {
        StreamingJob streamingJob = (StreamingJob) abstractJob;
        streamingJob.setMetricName(abstractJob.getJobName());
        streamingJob.setGroup(str2);
        streamingJob.setName(str);
        return streamingJob;
    }

    @Override // org.apache.griffin.core.job.JobOperator
    @Transactional(rollbackFor = {Exception.class})
    public void start(AbstractJob abstractJob) throws Exception {
        StreamingJob streamingJob = (StreamingJob) abstractJob;
        verifyJobState(streamingJob);
        StreamingJob streamingJob2 = (StreamingJob) this.streamingJobRepo.save(streamingJob);
        this.jobService.addJob(TriggerKey.triggerKey(this.jobService.getQuartzName(abstractJob), this.jobService.getQuartzGroup()), streamingJob2, GriffinMeasure.ProcessType.STREAMING);
    }

    private void verifyJobState(AbstractJob abstractJob) throws SchedulerException {
        if (!CollectionUtils.isEmpty(this.jobService.getTriggers(abstractJob.getName(), abstractJob.getGroup()))) {
            throw new GriffinException.BadRequestException(GriffinExceptionMessage.STREAMING_JOB_IS_RUNNING);
        }
        this.instanceRepo.findByJobId(abstractJob.getId()).stream().filter(jobInstanceBean -> {
            return !jobInstanceBean.isDeleted();
        }).forEach(jobInstanceBean2 -> {
            if (!getStartStatus(LivySessionStates.convert2QuartzState(jobInstanceBean2.getState()))) {
                throw new GriffinException.BadRequestException(GriffinExceptionMessage.STREAMING_JOB_IS_RUNNING);
            }
            jobInstanceBean2.setDeleted(true);
        });
    }

    @Override // org.apache.griffin.core.job.JobOperator
    @Transactional(rollbackFor = {Exception.class})
    public void stop(AbstractJob abstractJob) throws SchedulerException {
        stop((StreamingJob) abstractJob, false);
    }

    @Override // org.apache.griffin.core.job.JobOperator
    public void delete(AbstractJob abstractJob) throws SchedulerException {
        stop((StreamingJob) abstractJob, true);
    }

    @Override // org.apache.griffin.core.job.JobOperator
    public JobHealth getHealth(JobHealth jobHealth, AbstractJob abstractJob) {
        jobHealth.setJobCount(jobHealth.getJobCount() + 1);
        if (this.jobService.isJobHealthy(abstractJob.getId()).booleanValue()) {
            jobHealth.setHealthyJobCount(jobHealth.getHealthyJobCount() + 1);
        }
        return jobHealth;
    }

    @Override // org.apache.griffin.core.job.JobOperator
    public JobState getState(AbstractJob abstractJob, String str) {
        JobState jobState = new JobState();
        Iterator<JobInstanceBean> it = this.instanceRepo.findByJobId(abstractJob.getId()).iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            JobInstanceBean next = it.next();
            LivySessionStates.State state = next.getState();
            if (!next.isDeleted() && state != null) {
                String convert2QuartzState = LivySessionStates.convert2QuartzState(state);
                jobState.setState(convert2QuartzState);
                jobState.setToStart(getStartStatus(convert2QuartzState));
                jobState.setToStop(getStopStatus(convert2QuartzState));
                break;
            }
        }
        setStateIfNull(str, jobState);
        return jobState;
    }

    private void setStateIfNull(String str, JobState jobState) {
        if (jobState.getState() == null && "start".equals(str)) {
            jobState.setState("NORMAL");
            jobState.setToStop(true);
        } else if (jobState.getState() == null || "stop".equals(str)) {
            jobState.setState("NONE");
            jobState.setToStart(true);
        }
    }

    private boolean getStartStatus(String str) {
        return ("NORMAL".equals(str) || "BLOCKED".equals(str)) ? false : true;
    }

    private boolean getStopStatus(String str) {
        return ("COMPLETE".equals(str) || "ERROR".equals(str)) ? false : true;
    }

    private void deleteByLivy(JobInstanceBean jobInstanceBean) {
        if (jobInstanceBean.getSessionId() == null) {
            LOGGER.warn("Session id of instance({},{}) is null.", jobInstanceBean.getPredicateGroup(), jobInstanceBean.getPredicateName());
            return;
        }
        String str = this.livyUri + "/" + jobInstanceBean.getSessionId();
        try {
            this.livyTaskSubmitHelper.deleteByLivy(str);
            LOGGER.info("Job instance({}) has been deleted. {}", jobInstanceBean.getSessionId(), str);
        } catch (RestClientException e) {
            LOGGER.warn("sessionId({}) appId({}) {}.", new Object[]{jobInstanceBean.getSessionId(), jobInstanceBean.getAppId(), e.getMessage()});
            YarnNetUtil.delete(this.env.getProperty("yarn.uri"), jobInstanceBean.getAppId());
        } catch (ResourceAccessException e2) {
            LOGGER.error("Your url may be wrong. Please check {}.\n {}", this.livyUri, e2.getMessage());
        }
    }

    private void stop(StreamingJob streamingJob, boolean z) throws SchedulerException {
        pauseJob(streamingJob);
        this.instanceRepo.findByJobId(streamingJob.getId()).stream().filter(jobInstanceBean -> {
            return !jobInstanceBean.isDeleted();
        }).forEach(jobInstanceBean2 -> {
            if (getStopStatus(LivySessionStates.convert2QuartzState(jobInstanceBean2.getState()))) {
                deleteByLivy(jobInstanceBean2);
            }
            jobInstanceBean2.setState(LivySessionStates.State.STOPPED);
            jobInstanceBean2.setDeleted(true);
        });
        streamingJob.setDeleted(z);
        this.streamingJobRepo.save(streamingJob);
    }

    private void pauseJob(StreamingJob streamingJob) throws SchedulerException {
        String name = streamingJob.getName();
        String group = streamingJob.getGroup();
        if (CollectionUtils.isEmpty(this.jobService.getTriggers(name, group))) {
            return;
        }
        this.factory.getScheduler().pauseJob(JobKey.jobKey(name, group));
    }

    private void validateParams(AbstractJob abstractJob) {
        if (!this.jobService.isValidJobName(abstractJob.getJobName())) {
            throw new GriffinException.BadRequestException(GriffinExceptionMessage.INVALID_JOB_NAME);
        }
    }
}
