package co.cask.cdap.scheduler;

import co.cask.cdap.api.Transactional;
import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.app.store.Store;
import co.cask.cdap.common.AlreadyExistsException;
import co.cask.cdap.common.BadRequestException;
import co.cask.cdap.common.ConflictException;
import co.cask.cdap.common.NotFoundException;
import co.cask.cdap.common.namespace.NamespaceQueryAdmin;
import co.cask.cdap.common.service.RetryOnStartFailureService;
import co.cask.cdap.data.dataset.SystemDatasetInstantiator;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.MultiThreadDatasetCache;
import co.cask.cdap.data2.transaction.MultiThreadTransactionAware;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.data2.transaction.TxCallable;
import co.cask.cdap.internal.app.runtime.schedule.ProgramSchedule;
import co.cask.cdap.internal.app.runtime.schedule.ProgramScheduleRecord;
import co.cask.cdap.internal.app.runtime.schedule.ProgramScheduleStatus;
import co.cask.cdap.internal.app.runtime.schedule.SchedulerException;
import co.cask.cdap.internal.app.runtime.schedule.SchedulerService;
import co.cask.cdap.internal.app.runtime.schedule.queue.Job;
import co.cask.cdap.internal.app.runtime.schedule.queue.JobQueueDataset;
import co.cask.cdap.internal.app.runtime.schedule.store.ProgramScheduleStoreDataset;
import co.cask.cdap.internal.app.runtime.schedule.store.Schedulers;
import co.cask.cdap.proto.NamespaceMeta;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.id.ApplicationId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.proto.id.ScheduleId;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Service;
import com.google.inject.Inject;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.tephra.RetryStrategies;
import org.apache.tephra.TransactionFailureException;
import org.apache.tephra.TransactionSystemClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/scheduler/CoreSchedulerService.class */
public class CoreSchedulerService extends AbstractIdleService implements Scheduler {
    private static final Logger LOG = LoggerFactory.getLogger(CoreSchedulerService.class);
    private final Transactional transactional;
    private final Service internalService;
    private final DatasetFramework datasetFramework;
    private final SchedulerService scheduler;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/scheduler/CoreSchedulerService$StoreAndQueueTxRunnable.class */
    public interface StoreAndQueueTxRunnable<V, T extends Throwable> {
        V run(ProgramScheduleStoreDataset programScheduleStoreDataset, JobQueueDataset jobQueueDataset) throws Throwable;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/scheduler/CoreSchedulerService$StoreTxRunnable.class */
    public interface StoreTxRunnable<V, T extends Throwable> {
        V run(ProgramScheduleStoreDataset programScheduleStoreDataset) throws Throwable;
    }

    @Inject
    CoreSchedulerService(TransactionSystemClient transactionSystemClient, final DatasetFramework datasetFramework, final SchedulerService schedulerService, final NotificationSubscriberService notificationSubscriberService, final ConstraintCheckerService constraintCheckerService, final NamespaceQueryAdmin namespaceQueryAdmin, final Store store) {
        this.datasetFramework = datasetFramework;
        this.transactional = Transactions.createTransactionalWithRetry(Transactions.createTransactional(new MultiThreadDatasetCache(new SystemDatasetInstantiator(datasetFramework), transactionSystemClient, Schedulers.STORE_DATASET_ID.getParent(), Collections.emptyMap(), (MetricsContext) null, (Map) null, new MultiThreadTransactionAware[0])), RetryStrategies.retryOnConflict(10, 100L));
        this.scheduler = schedulerService;
        this.internalService = new RetryOnStartFailureService(new Supplier<Service>() { // from class: co.cask.cdap.scheduler.CoreSchedulerService.1
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public Service m333get() {
                return new AbstractIdleService() { // from class: co.cask.cdap.scheduler.CoreSchedulerService.1.1
                    protected void startUp() throws Exception {
                        if (!datasetFramework.hasInstance(Schedulers.STORE_DATASET_ID)) {
                            datasetFramework.addInstance(Schedulers.STORE_TYPE_NAME, Schedulers.STORE_DATASET_ID, DatasetProperties.EMPTY);
                        }
                        schedulerService.startAndWait();
                        CoreSchedulerService.this.migrateSchedules(namespaceQueryAdmin, store);
                        CoreSchedulerService.this.cleanupJobs();
                        constraintCheckerService.startAndWait();
                        notificationSubscriberService.startAndWait();
                    }

                    protected void shutDown() throws Exception {
                        notificationSubscriberService.stopAndWait();
                        constraintCheckerService.stopAndWait();
                        schedulerService.stopAndWait();
                    }
                };
            }
        }, co.cask.cdap.common.service.RetryStrategies.exponentialDelay(200L, 5000L, TimeUnit.MILLISECONDS));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cleanupJobs() {
        try {
            this.transactional.execute(new TxRunnable() { // from class: co.cask.cdap.scheduler.CoreSchedulerService.2
                public void run(DatasetContext datasetContext) throws Exception {
                    JobQueueDataset jobQueue = Schedulers.getJobQueue(datasetContext, CoreSchedulerService.this.datasetFramework);
                    CloseableIterator<Job> fullScan = jobQueue.fullScan();
                    Throwable th = null;
                    try {
                        try {
                            CoreSchedulerService.LOG.info("Cleaning up jobs in state {}.", Job.State.PENDING_LAUNCH);
                            while (fullScan.hasNext()) {
                                Job job = (Job) fullScan.next();
                                if (job.getState() == Job.State.PENDING_LAUNCH) {
                                    CoreSchedulerService.LOG.warn("Removing job because it was left in state {} from a previous run of the scheduler: {} .", Job.State.PENDING_LAUNCH, job);
                                    jobQueue.deleteJob(job);
                                }
                            }
                            if (fullScan != null) {
                                if (0 == 0) {
                                    fullScan.close();
                                    return;
                                }
                                try {
                                    fullScan.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                        } catch (Throwable th3) {
                            th = th3;
                            throw th3;
                        }
                    } catch (Throwable th4) {
                        if (fullScan != null) {
                            if (th != null) {
                                try {
                                    fullScan.close();
                                } catch (Throwable th5) {
                                    th.addSuppressed(th5);
                                }
                            } else {
                                fullScan.close();
                            }
                        }
                        throw th4;
                    }
                }
            });
        } catch (TransactionFailureException e) {
            LOG.warn("Failed to cleanup jobs upon startup.", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void migrateSchedules(NamespaceQueryAdmin namespaceQueryAdmin, final Store store) throws Exception {
        List list = namespaceQueryAdmin.list();
        if (((Boolean) execute(new StoreTxRunnable<Boolean, RuntimeException>() { // from class: co.cask.cdap.scheduler.CoreSchedulerService.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // co.cask.cdap.scheduler.CoreSchedulerService.StoreTxRunnable
            public Boolean run(ProgramScheduleStoreDataset programScheduleStoreDataset) {
                return Boolean.valueOf(programScheduleStoreDataset.isMigrationComplete());
            }
        }, RuntimeException.class)).booleanValue()) {
            LOG.debug("Schedule migration has already been completed for all namespaces. Skip migration.");
            return;
        }
        Collections.sort(list, new Comparator<NamespaceMeta>() { // from class: co.cask.cdap.scheduler.CoreSchedulerService.4
            @Override // java.util.Comparator
            public int compare(NamespaceMeta namespaceMeta, NamespaceMeta namespaceMeta2) {
                return namespaceMeta.getNamespaceId().toString().compareTo(namespaceMeta2.getNamespaceId().toString());
            }
        });
        String str = null;
        Iterator it = list.iterator();
        while (it.hasNext()) {
            final NamespaceId namespaceId = ((NamespaceMeta) it.next()).getNamespaceId();
            if (str == null || str.compareTo(namespaceId.toString()) <= 0) {
                LOG.info("Starting schedule migration for namespace '{}'", namespaceId);
                str = (String) execute(new StoreTxRunnable<String, RuntimeException>() { // from class: co.cask.cdap.scheduler.CoreSchedulerService.5
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // co.cask.cdap.scheduler.CoreSchedulerService.StoreTxRunnable
                    public String run(ProgramScheduleStoreDataset programScheduleStoreDataset) {
                        return programScheduleStoreDataset.migrateFromAppMetadataStore(namespaceId, store, CoreSchedulerService.this.scheduler);
                    }
                }, RuntimeException.class);
            } else {
                LOG.debug("Skip migrating schedules in namespace '{}', since namespace with lexicographical order is smaller than the last migration completed namespace '{}' should already be migrated.", namespaceId, str);
            }
        }
        execute(new StoreTxRunnable<Void, RuntimeException>() { // from class: co.cask.cdap.scheduler.CoreSchedulerService.6
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // co.cask.cdap.scheduler.CoreSchedulerService.StoreTxRunnable
            public Void run(ProgramScheduleStoreDataset programScheduleStoreDataset) {
                programScheduleStoreDataset.setMigrationComplete();
                return null;
            }
        }, RuntimeException.class);
        LOG.info("Schedule migration is completed for all namespaces.");
    }

    protected void startUp() throws Exception {
        this.internalService.startAndWait();
    }

    protected void shutDown() throws Exception {
        this.internalService.stopAndWait();
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public void addSchedule(ProgramSchedule programSchedule) throws AlreadyExistsException, BadRequestException {
        addSchedules(Collections.singleton(programSchedule));
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public void addSchedules(final Iterable<? extends ProgramSchedule> iterable) throws AlreadyExistsException, BadRequestException {
        for (ProgramSchedule programSchedule : iterable) {
            if (!programSchedule.getProgramId().getType().equals(ProgramType.WORKFLOW)) {
                throw new BadRequestException(String.format("Cannot schedule program %s of type %s: Only workflows can be scheduled", programSchedule.getProgramId().getProgram(), programSchedule.getProgramId().getType()));
            }
        }
        execute(new StoreTxRunnable<Void, AlreadyExistsException>() { // from class: co.cask.cdap.scheduler.CoreSchedulerService.7
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // co.cask.cdap.scheduler.CoreSchedulerService.StoreTxRunnable
            public Void run(ProgramScheduleStoreDataset programScheduleStoreDataset) throws AlreadyExistsException {
                programScheduleStoreDataset.addSchedules(iterable);
                for (ProgramSchedule programSchedule2 : iterable) {
                    try {
                        CoreSchedulerService.this.scheduler.addProgramSchedule(programSchedule2);
                    } catch (SchedulerException e) {
                        CoreSchedulerService.LOG.error("Exception occurs when adding schedule {}", programSchedule2, e);
                        throw new RuntimeException(e);
                    }
                }
                return null;
            }
        }, AlreadyExistsException.class);
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public void updateSchedule(ProgramSchedule programSchedule) throws NotFoundException, BadRequestException {
        ProgramScheduleStatus scheduleStatus = getScheduleStatus(programSchedule.getScheduleId());
        deleteSchedule(programSchedule.getScheduleId());
        try {
            addSchedule(programSchedule);
            if (ProgramScheduleStatus.SCHEDULED == scheduleStatus) {
                try {
                    enableSchedule(programSchedule.getScheduleId());
                } catch (ConflictException e) {
                    throw new IllegalStateException("Schedule '" + programSchedule.getScheduleId() + "' already enabled despite just being added.");
                }
            }
        } catch (AlreadyExistsException e2) {
            throw new IllegalStateException("Schedule '" + programSchedule.getScheduleId() + "' already exists despite just being deleted.");
        }
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public void enableSchedule(final ScheduleId scheduleId) throws NotFoundException, ConflictException {
        try {
            execute(new StoreTxRunnable<Void, Exception>() { // from class: co.cask.cdap.scheduler.CoreSchedulerService.8
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // co.cask.cdap.scheduler.CoreSchedulerService.StoreTxRunnable
                public Void run(ProgramScheduleStoreDataset programScheduleStoreDataset) throws NotFoundException, ConflictException, SchedulerException {
                    ProgramScheduleRecord scheduleRecord = programScheduleStoreDataset.getScheduleRecord(scheduleId);
                    if (ProgramScheduleStatus.SUSPENDED != scheduleRecord.getMeta().getStatus()) {
                        throw new ConflictException("Schedule '" + scheduleId + "' is already enabled");
                    }
                    CoreSchedulerService.this.scheduler.resumeProgramSchedule(scheduleRecord.getSchedule());
                    programScheduleStoreDataset.updateScheduleStatus(scheduleId, ProgramScheduleStatus.SCHEDULED);
                    return null;
                }
            }, Exception.class);
        } catch (SchedulerException e) {
            throw new RuntimeException("Exception occurs when enabling schedule " + scheduleId, e);
        } catch (Exception e2) {
            throw Throwables.propagate(e2);
        } catch (NotFoundException | ConflictException e3) {
            throw e3;
        }
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public void disableSchedule(final ScheduleId scheduleId) throws NotFoundException, ConflictException {
        try {
            execute(new StoreAndQueueTxRunnable<Void, Exception>() { // from class: co.cask.cdap.scheduler.CoreSchedulerService.9
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // co.cask.cdap.scheduler.CoreSchedulerService.StoreAndQueueTxRunnable
                public Void run(ProgramScheduleStoreDataset programScheduleStoreDataset, JobQueueDataset jobQueueDataset) throws NotFoundException, ConflictException, SchedulerException {
                    ProgramScheduleRecord scheduleRecord = programScheduleStoreDataset.getScheduleRecord(scheduleId);
                    if (ProgramScheduleStatus.SCHEDULED != scheduleRecord.getMeta().getStatus()) {
                        throw new ConflictException("Schedule '" + scheduleId + "' is already disabled");
                    }
                    CoreSchedulerService.this.scheduler.suspendProgramSchedule(scheduleRecord.getSchedule());
                    programScheduleStoreDataset.updateScheduleStatus(scheduleId, ProgramScheduleStatus.SUSPENDED);
                    jobQueueDataset.markJobsForDeletion(scheduleId, System.currentTimeMillis());
                    return null;
                }
            }, Exception.class);
        } catch (SchedulerException e) {
            throw new RuntimeException("Exception occurs when enabling schedule " + scheduleId, e);
        } catch (Exception e2) {
            throw Throwables.propagate(e2);
        } catch (NotFoundException | ConflictException e3) {
            throw e3;
        }
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public void deleteSchedule(ScheduleId scheduleId) throws NotFoundException {
        deleteSchedules(Collections.singleton(scheduleId));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void deleteSchedulesInScheduler(List<ProgramSchedule> list) {
        for (ProgramSchedule programSchedule : list) {
            try {
                this.scheduler.deleteProgramSchedule(programSchedule);
            } catch (Exception e) {
                LOG.error("Exception occurs when deleting schedule {}", programSchedule, e);
                throw new RuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void deleteScheduleInScheduler(ProgramSchedule programSchedule) throws NotFoundException {
        try {
            this.scheduler.deleteProgramSchedule(programSchedule);
        } catch (SchedulerException e) {
            LOG.error("Exception occurs when deleting schedule {}", programSchedule, e);
            throw new RuntimeException(e);
        }
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public void deleteSchedules(final Iterable<? extends ScheduleId> iterable) throws NotFoundException {
        execute(new StoreAndQueueTxRunnable<Void, NotFoundException>() { // from class: co.cask.cdap.scheduler.CoreSchedulerService.10
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // co.cask.cdap.scheduler.CoreSchedulerService.StoreAndQueueTxRunnable
            public Void run(ProgramScheduleStoreDataset programScheduleStoreDataset, JobQueueDataset jobQueueDataset) throws NotFoundException {
                long currentTimeMillis = System.currentTimeMillis();
                for (ScheduleId scheduleId : iterable) {
                    CoreSchedulerService.this.deleteScheduleInScheduler(programScheduleStoreDataset.getSchedule(scheduleId));
                    jobQueueDataset.markJobsForDeletion(scheduleId, currentTimeMillis);
                }
                programScheduleStoreDataset.deleteSchedules(iterable);
                return null;
            }
        }, NotFoundException.class);
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public void deleteSchedules(final ApplicationId applicationId) {
        execute(new StoreAndQueueTxRunnable<Void, RuntimeException>() { // from class: co.cask.cdap.scheduler.CoreSchedulerService.11
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // co.cask.cdap.scheduler.CoreSchedulerService.StoreAndQueueTxRunnable
            public Void run(ProgramScheduleStoreDataset programScheduleStoreDataset, JobQueueDataset jobQueueDataset) {
                long currentTimeMillis = System.currentTimeMillis();
                CoreSchedulerService.this.deleteSchedulesInScheduler(programScheduleStoreDataset.listSchedules(applicationId));
                Iterator<ScheduleId> it = programScheduleStoreDataset.deleteSchedules(applicationId).iterator();
                while (it.hasNext()) {
                    jobQueueDataset.markJobsForDeletion(it.next(), currentTimeMillis);
                }
                return null;
            }
        }, RuntimeException.class);
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public void deleteSchedules(final ProgramId programId) {
        execute(new StoreAndQueueTxRunnable<Void, RuntimeException>() { // from class: co.cask.cdap.scheduler.CoreSchedulerService.12
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // co.cask.cdap.scheduler.CoreSchedulerService.StoreAndQueueTxRunnable
            public Void run(ProgramScheduleStoreDataset programScheduleStoreDataset, JobQueueDataset jobQueueDataset) {
                long currentTimeMillis = System.currentTimeMillis();
                CoreSchedulerService.this.deleteSchedulesInScheduler(programScheduleStoreDataset.listSchedules(programId));
                Iterator<ScheduleId> it = programScheduleStoreDataset.deleteSchedules(programId).iterator();
                while (it.hasNext()) {
                    jobQueueDataset.markJobsForDeletion(it.next(), currentTimeMillis);
                }
                return null;
            }
        }, RuntimeException.class);
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public ProgramSchedule getSchedule(final ScheduleId scheduleId) throws NotFoundException {
        return (ProgramSchedule) execute(new StoreTxRunnable<ProgramSchedule, NotFoundException>() { // from class: co.cask.cdap.scheduler.CoreSchedulerService.13
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // co.cask.cdap.scheduler.CoreSchedulerService.StoreTxRunnable
            public ProgramSchedule run(ProgramScheduleStoreDataset programScheduleStoreDataset) throws NotFoundException {
                return programScheduleStoreDataset.getSchedule(scheduleId);
            }
        }, NotFoundException.class);
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public ProgramScheduleStatus getScheduleStatus(final ScheduleId scheduleId) throws NotFoundException {
        return (ProgramScheduleStatus) execute(new StoreTxRunnable<ProgramScheduleStatus, NotFoundException>() { // from class: co.cask.cdap.scheduler.CoreSchedulerService.14
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // co.cask.cdap.scheduler.CoreSchedulerService.StoreTxRunnable
            public ProgramScheduleStatus run(ProgramScheduleStoreDataset programScheduleStoreDataset) throws NotFoundException {
                return programScheduleStoreDataset.getScheduleRecord(scheduleId).getMeta().getStatus();
            }
        }, NotFoundException.class);
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public List<ProgramSchedule> listSchedules(final ApplicationId applicationId) {
        return (List) execute(new StoreTxRunnable<List<ProgramSchedule>, RuntimeException>() { // from class: co.cask.cdap.scheduler.CoreSchedulerService.15
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // co.cask.cdap.scheduler.CoreSchedulerService.StoreTxRunnable
            public List<ProgramSchedule> run(ProgramScheduleStoreDataset programScheduleStoreDataset) {
                return programScheduleStoreDataset.listSchedules(applicationId);
            }
        }, RuntimeException.class);
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public List<ProgramSchedule> listSchedules(final ProgramId programId) {
        return (List) execute(new StoreTxRunnable<List<ProgramSchedule>, RuntimeException>() { // from class: co.cask.cdap.scheduler.CoreSchedulerService.16
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // co.cask.cdap.scheduler.CoreSchedulerService.StoreTxRunnable
            public List<ProgramSchedule> run(ProgramScheduleStoreDataset programScheduleStoreDataset) {
                return programScheduleStoreDataset.listSchedules(programId);
            }
        }, RuntimeException.class);
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public Collection<ProgramScheduleRecord> findSchedules(final String str) {
        return (Collection) execute(new StoreTxRunnable<Collection<ProgramScheduleRecord>, RuntimeException>() { // from class: co.cask.cdap.scheduler.CoreSchedulerService.17
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // co.cask.cdap.scheduler.CoreSchedulerService.StoreTxRunnable
            public Collection<ProgramScheduleRecord> run(ProgramScheduleStoreDataset programScheduleStoreDataset) {
                return programScheduleStoreDataset.findSchedules(str);
            }
        }, RuntimeException.class);
    }

    private <V, T extends Exception> V execute(final StoreTxRunnable<V, T> storeTxRunnable, Class<? extends T> cls) throws Exception {
        try {
            return (V) Transactions.execute(this.transactional, new TxCallable<V>() { // from class: co.cask.cdap.scheduler.CoreSchedulerService.18
                public V call(DatasetContext datasetContext) throws Exception {
                    return (V) storeTxRunnable.run(Schedulers.getScheduleStore(datasetContext, CoreSchedulerService.this.datasetFramework));
                }
            });
        } catch (TransactionFailureException e) {
            throw ((Exception) Transactions.propagate(e, cls));
        }
    }

    private <V, T extends Exception> V execute(final StoreAndQueueTxRunnable<V, T> storeAndQueueTxRunnable, Class<? extends T> cls) throws Exception {
        try {
            return (V) Transactions.execute(this.transactional, new TxCallable<V>() { // from class: co.cask.cdap.scheduler.CoreSchedulerService.19
                public V call(DatasetContext datasetContext) throws Exception {
                    return (V) storeAndQueueTxRunnable.run(Schedulers.getScheduleStore(datasetContext, CoreSchedulerService.this.datasetFramework), Schedulers.getJobQueue(datasetContext, CoreSchedulerService.this.datasetFramework));
                }
            });
        } catch (TransactionFailureException e) {
            throw ((Exception) Transactions.propagate(e, cls));
        }
    }
}
