package co.cask.cdap.scheduler;

import co.cask.cdap.api.Transactional;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.dataset.DatasetManagementException;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.messaging.Message;
import co.cask.cdap.api.messaging.MessageFetcher;
import co.cask.cdap.api.messaging.TopicNotFoundException;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.common.NotFoundException;
import co.cask.cdap.common.ServiceUnavailableException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.logging.LogSamplers;
import co.cask.cdap.common.logging.Loggers;
import co.cask.cdap.common.service.RetryStrategies;
import co.cask.cdap.common.service.RetryStrategy;
import co.cask.cdap.data.dataset.SystemDatasetInstantiator;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.MultiThreadDatasetCache;
import co.cask.cdap.data2.transaction.MultiThreadTransactionAware;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.data2.transaction.TxCallable;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.internal.app.runtime.messaging.MultiThreadMessagingContext;
import co.cask.cdap.internal.app.runtime.schedule.ProgramScheduleRecord;
import co.cask.cdap.internal.app.runtime.schedule.ProgramScheduleStatus;
import co.cask.cdap.internal.app.runtime.schedule.queue.JobQueueDataset;
import co.cask.cdap.internal.app.runtime.schedule.store.Schedulers;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.proto.Notification;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ScheduleId;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import com.google.inject.Inject;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.tephra.TransactionFailureException;
import org.apache.tephra.TransactionSystemClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:co/cask/cdap/scheduler/NotificationSubscriberService.class */
public class NotificationSubscriberService extends AbstractIdleService {
    private static final Logger LOG = LoggerFactory.getLogger(NotificationSubscriberService.class);
    private static final Logger SAMPLING_LOG = Loggers.sampling(LOG, LogSamplers.limitRate(10000));
    private static final Gson GSON = new Gson();
    private final Transactional transactional;
    private final MultiThreadMessagingContext messagingContext;
    private final DatasetFramework datasetFramework;
    private final MultiThreadDatasetCache multiThreadDatasetCache;
    private final CConfiguration cConf;
    private ListeningExecutorService taskExecutorService;
    private volatile boolean stopping = false;

    /* loaded from: input_file:co/cask/cdap/scheduler/NotificationSubscriberService$DataEventNotificationSubscriberThread.class */
    private class DataEventNotificationSubscriberThread extends NotificationSubscriberThread {
        DataEventNotificationSubscriberThread() {
            super(NotificationSubscriberService.this.cConf.get("data.event.topic"));
        }

        @Override // co.cask.cdap.scheduler.NotificationSubscriberService.NotificationSubscriberThread
        protected void updateJobQueue(DatasetContext datasetContext, Notification notification) throws IOException, DatasetManagementException {
            String str = (String) notification.getProperties().get("datasetId");
            if (str == null) {
                return;
            }
            for (ProgramScheduleRecord programScheduleRecord : NotificationSubscriberService.this.getSchedules(datasetContext, Schedulers.triggerKeyForPartition(DatasetId.fromString(str)))) {
                if (ProgramScheduleStatus.SCHEDULED.equals(programScheduleRecord.getMeta().getStatus())) {
                    this.jobQueue.addNotification(programScheduleRecord, notification);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/scheduler/NotificationSubscriberService$NotificationSubscriberThread.class */
    public abstract class NotificationSubscriberThread implements Runnable {
        private final String topic;
        private final RetryStrategy scheduleStrategy = RetryStrategies.exponentialDelay(100, Schedulers.SUBSCRIBER_TX_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        private int failureCount;
        private String messageId;
        JobQueueDataset jobQueue;

        NotificationSubscriberThread(String str) {
            this.topic = str;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.jobQueue = Schedulers.getJobQueue(NotificationSubscriberService.this.multiThreadDatasetCache, NotificationSubscriberService.this.datasetFramework);
            this.messageId = loadMessageId();
            while (!NotificationSubscriberService.this.stopping) {
                try {
                    long processNotifications = processNotifications();
                    if (processNotifications > 0) {
                        TimeUnit.MILLISECONDS.sleep(processNotifications);
                    }
                } catch (InterruptedException e) {
                }
            }
        }

        private String loadMessageId() {
            try {
                return (String) Transactions.execute(NotificationSubscriberService.this.transactional, new TxCallable<String>() { // from class: co.cask.cdap.scheduler.NotificationSubscriberService.NotificationSubscriberThread.1
                    /* renamed from: call, reason: merged with bridge method [inline-methods] */
                    public String m335call(DatasetContext datasetContext) throws Exception {
                        return NotificationSubscriberThread.this.jobQueue.retrieveSubscriberState(NotificationSubscriberThread.this.topic);
                    }
                });
            } catch (TransactionFailureException e) {
                throw Throwables.propagate(e);
            }
        }

        private long processNotifications() {
            boolean z = false;
            try {
                final MessageFetcher messageFetcher = NotificationSubscriberService.this.messagingContext.getMessageFetcher();
                z = ((Boolean) Transactions.execute(NotificationSubscriberService.this.transactional, new TxCallable<Boolean>() { // from class: co.cask.cdap.scheduler.NotificationSubscriberService.NotificationSubscriberThread.2
                    /* renamed from: call, reason: merged with bridge method [inline-methods] */
                    public Boolean m336call(DatasetContext datasetContext) throws Exception {
                        return Boolean.valueOf(NotificationSubscriberThread.this.fetchAndProcessNotifications(datasetContext, messageFetcher));
                    }
                })).booleanValue();
                this.failureCount = 0;
            } catch (Exception e) {
                NotificationSubscriberService.LOG.warn("Failed to get and process notifications. Will retry in next run", e);
                this.failureCount++;
            }
            return this.failureCount > 0 ? this.scheduleStrategy.nextRetry(this.failureCount, 0L) : z ? 2000L : 0L;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean fetchAndProcessNotifications(DatasetContext datasetContext, MessageFetcher messageFetcher) throws Exception {
            CloseableIterator fetch;
            Throwable th;
            boolean z = true;
            try {
                fetch = messageFetcher.fetch(NamespaceId.SYSTEM.getNamespace(), this.topic, 100, this.messageId);
                th = null;
            } catch (ServiceUnavailableException | TopicNotFoundException e) {
                NotificationSubscriberService.SAMPLING_LOG.warn("Failed to fetch from TMS. Will retry later.", e);
                this.failureCount++;
            }
            try {
                try {
                    NotificationSubscriberService.LOG.trace("Fetch with messageId = {}", this.messageId);
                    while (fetch.hasNext() && !NotificationSubscriberService.this.stopping) {
                        z = false;
                        Message message = (Message) fetch.next();
                        try {
                            updateJobQueue(datasetContext, (Notification) NotificationSubscriberService.GSON.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), Notification.class));
                            this.messageId = message.getId();
                        } catch (JsonSyntaxException e2) {
                            NotificationSubscriberService.LOG.warn("Failed to decode message with id {}. Skipped. ", message.getId(), e2);
                            this.messageId = message.getId();
                        }
                    }
                    if (!z) {
                        this.jobQueue.persistSubscriberState(this.topic, this.messageId);
                    }
                    if (fetch != null) {
                        if (0 != 0) {
                            try {
                                fetch.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            fetch.close();
                        }
                    }
                    return z;
                } finally {
                }
            } finally {
            }
        }

        abstract void updateJobQueue(DatasetContext datasetContext, Notification notification) throws Exception;
    }

    /* loaded from: input_file:co/cask/cdap/scheduler/NotificationSubscriberService$SchedulerEventNotificationSubscriberThread.class */
    private class SchedulerEventNotificationSubscriberThread extends NotificationSubscriberThread {
        SchedulerEventNotificationSubscriberThread(String str) {
            super(str);
        }

        @Override // co.cask.cdap.scheduler.NotificationSubscriberService.NotificationSubscriberThread
        protected void updateJobQueue(DatasetContext datasetContext, Notification notification) throws IOException, DatasetManagementException, NotFoundException {
            Map properties = notification.getProperties();
            String str = (String) properties.get(ProgramOptionConstants.SCHEDULE_ID);
            if (str == null) {
                NotificationSubscriberService.LOG.warn("Cannot find schedule id in the notification with properties {}. Skipping current notification.", properties);
                return;
            }
            ScheduleId fromString = ScheduleId.fromString(str);
            try {
                this.jobQueue.addNotification(Schedulers.getScheduleStore(datasetContext, NotificationSubscriberService.this.datasetFramework).getScheduleRecord(fromString), notification);
            } catch (NotFoundException e) {
                NotificationSubscriberService.LOG.warn("Cannot find schedule {}. Skipping current notification with properties {}.", new Object[]{fromString, properties, e});
            }
        }
    }

    @Inject
    NotificationSubscriberService(MessagingService messagingService, CConfiguration cConfiguration, DatasetFramework datasetFramework, TransactionSystemClient transactionSystemClient) {
        this.cConf = cConfiguration;
        this.messagingContext = new MultiThreadMessagingContext(messagingService);
        this.multiThreadDatasetCache = new MultiThreadDatasetCache(new SystemDatasetInstantiator(datasetFramework), transactionSystemClient, NamespaceId.SYSTEM, ImmutableMap.of(), (MetricsContext) null, (Map) null, new MultiThreadTransactionAware[]{this.messagingContext});
        this.transactional = Transactions.createTransactionalWithRetry(Transactions.createTransactional(this.multiThreadDatasetCache, 30), org.apache.tephra.RetryStrategies.retryOnConflict(20, 100L));
        this.datasetFramework = datasetFramework;
    }

    protected void startUp() {
        LOG.info("Start running NotificationSubscriberService");
        this.taskExecutorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("scheduler-subscriber-task").build()));
        this.taskExecutorService.submit(new SchedulerEventNotificationSubscriberThread(this.cConf.get("time.event.topic")));
        this.taskExecutorService.submit(new SchedulerEventNotificationSubscriberThread(this.cConf.get("stream.size.event.topic")));
        this.taskExecutorService.submit(new DataEventNotificationSubscriberThread());
    }

    protected void shutDown() {
        this.stopping = true;
        LOG.info("Stopping NotificationSubscriberService.");
        try {
            try {
                this.taskExecutorService.awaitTermination(5L, TimeUnit.SECONDS);
                if (!this.taskExecutorService.isTerminated()) {
                    this.taskExecutorService.shutdownNow();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                if (!this.taskExecutorService.isTerminated()) {
                    this.taskExecutorService.shutdownNow();
                }
            }
            LOG.info("Stopped NotificationSubscriberService.");
        } catch (Throwable th) {
            if (!this.taskExecutorService.isTerminated()) {
                this.taskExecutorService.shutdownNow();
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Collection<ProgramScheduleRecord> getSchedules(DatasetContext datasetContext, String str) throws IOException, DatasetManagementException {
        return Schedulers.getScheduleStore(datasetContext, this.datasetFramework).findSchedules(str);
    }
}
