package cn.boboweike.carrot.server;

import cn.boboweike.carrot.SevereCarrotException;
import cn.boboweike.carrot.server.concurrent.ConcurrentTaskModificationResolver;
import cn.boboweike.carrot.server.concurrent.UnresolvableConcurrentTaskModificationException;
import cn.boboweike.carrot.server.dashboard.DashboardNotificationManager;
import cn.boboweike.carrot.server.strategy.WorkDistributionStrategy;
import cn.boboweike.carrot.storage.BackgroundTaskServerStatus;
import cn.boboweike.carrot.storage.ConcurrentTaskModificationException;
import cn.boboweike.carrot.storage.PageRequest;
import cn.boboweike.carrot.storage.PartitionedStorageProvider;
import cn.boboweike.carrot.tasks.RecurringTask;
import cn.boboweike.carrot.tasks.Task;
import cn.boboweike.carrot.tasks.filters.TaskFilterUtils;
import cn.boboweike.carrot.tasks.states.StateName;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/boboweike/carrot/server/TaskZooKeeper.class */
public class TaskZooKeeper implements Runnable {
    static final Logger LOGGER = LoggerFactory.getLogger(TaskZooKeeper.class);
    private final BackgroundTaskServer backgroundTaskServer;
    private final PartitionedStorageProvider storageProvider;
    private final DashboardNotificationManager dashboardNotificationManager;
    private final TaskFilterUtils taskFilterUtils;
    private final WorkDistributionStrategy workDistributionStrategy;
    private Instant runStartTime;
    private final List<RecurringTask> recurringTasks = new ArrayList();
    private final ConcurrentTaskModificationResolver concurrentTaskModificationResolver = createConcurrentTaskModificationResolver();
    private final Map<Task, Thread> currentlyProcessedTasks = new ConcurrentHashMap();
    private final Duration durationPollIntervalTimeBox = Duration.ofSeconds((long) (backgroundTaskServerStatus().getPollIntervalInSeconds() - (backgroundTaskServerStatus().getPollIntervalInSeconds() * 0.05d)));
    private final ReentrantLock reentrantLock = new ReentrantLock();
    private final AtomicInteger exceptionCount = new AtomicInteger();
    private final AtomicInteger occupiedWorkers = new AtomicInteger();

    public TaskZooKeeper(BackgroundTaskServer backgroundTaskServer) {
        this.backgroundTaskServer = backgroundTaskServer;
        this.storageProvider = backgroundTaskServer.getStorageProvider();
        this.workDistributionStrategy = backgroundTaskServer.getWorkDistributionStrategy();
        this.dashboardNotificationManager = backgroundTaskServer.getDashboardNotificationManager();
        this.taskFilterUtils = new TaskFilterUtils(backgroundTaskServer.getTaskFilters());
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            this.runStartTime = Instant.now();
            if (this.backgroundTaskServer.isUnAnnounced()) {
                return;
            }
            updateTasksThatAreBeingProcessed();
            runRoutineTasks();
            onboardNewWorkIfPossible();
        } catch (Exception e) {
            this.dashboardNotificationManager.handle(e);
            this.exceptionCount.getAndIncrement();
            LOGGER.warn("Carrot encountered a problematic exception. Please create a bug report (if possible, provide the code to reproduce this and the stacktrace) - Processing will continue, exceptionCount = {}", this.exceptionCount, e);
        }
    }

    void updateTasksThatAreBeingProcessed() {
        LOGGER.debug("Updating currently processed tasks... ");
        processTaskList(new ArrayList(this.currentlyProcessedTasks.keySet()), this::updateCurrentlyProcessingTask);
    }

    void runRoutineTasks() {
        checkForRecurringTasks();
        checkForScheduledTasks();
        checkForOrphanedTasks();
        checkForSucceededTasksThanCanGoToDeletedState();
        checkForTasksThatCanBeDeleted();
    }

    boolean canOnboardNewWork() {
        return backgroundTaskServerStatus().isRunning() && this.workDistributionStrategy.canOnboardNewWork();
    }

    void checkForRecurringTasks() {
        Integer partition = getPartition();
        if (partition == BackgroundTaskServer.NO_PARTITION) {
            return;
        }
        LOGGER.debug("Looking for recurring tasks... ");
        processRecurringTasks(getRecurringTasks(partition));
    }

    void checkForScheduledTasks() {
        Integer partition = getPartition();
        if (partition == BackgroundTaskServer.NO_PARTITION) {
            return;
        }
        LOGGER.debug("Looking for scheduled tasks... ");
        processTaskList(() -> {
            return this.storageProvider.getScheduledTasksByPartition(Instant.now().plusSeconds(backgroundTaskServerStatus().getPollIntervalInSeconds()), PageRequest.ascOnUpdatedAt(1000), partition);
        }, (v0) -> {
            v0.enqueue();
        });
    }

    void checkForOrphanedTasks() {
        Integer partition = getPartition();
        if (partition == BackgroundTaskServer.NO_PARTITION) {
            return;
        }
        LOGGER.debug("Looking for orphan tasks... ");
        Instant minus = this.runStartTime.minus((TemporalAmount) Duration.ofSeconds(this.backgroundTaskServer.getServerStatus().getPollIntervalInSeconds()).multipliedBy(4L));
        processTaskList(() -> {
            return this.storageProvider.getTasksByPartition(StateName.PROCESSING, minus, PageRequest.ascOnUpdatedAt(1000), partition);
        }, task -> {
            task.failed("Orphaned task", new IllegalThreadStateException("Task was too long in PROCESSING state without being updated."));
        });
    }

    void checkForSucceededTasksThanCanGoToDeletedState() {
        Integer partition = getPartition();
        if (partition == BackgroundTaskServer.NO_PARTITION) {
            return;
        }
        LOGGER.debug("Looking for succeeded tasks that can go to the deleted state... ");
        AtomicInteger atomicInteger = new AtomicInteger();
        Instant minus = Instant.now().minus((TemporalAmount) this.backgroundTaskServer.getServerStatus().getDeleteSucceededTasksAfter());
        processTaskList(() -> {
            return this.storageProvider.getTasksByPartition(StateName.SUCCEEDED, minus, PageRequest.ascOnUpdatedAt(1000), partition);
        }, task -> {
            atomicInteger.incrementAndGet();
            task.delete("Carrot maintenance - deleting succeeded task");
        });
        if (atomicInteger.get() > 0) {
            this.storageProvider.publishTotalAmountOfSucceededTasks(atomicInteger.get());
        }
    }

    void checkForTasksThatCanBeDeleted() {
        Integer partition = getPartition();
        if (partition == BackgroundTaskServer.NO_PARTITION) {
            return;
        }
        LOGGER.debug("Looking for deleted tasks that can be deleted permanently... ");
        this.storageProvider.deleteTasksPermanentlyByPartition(StateName.DELETED, Instant.now().minus((TemporalAmount) this.backgroundTaskServer.getServerStatus().getPermanentlyDeleteDeletedTasksAfter()), partition);
    }

    private Integer getPartition() {
        Integer partition = this.backgroundTaskServer.getPartition();
        return partition == null ? BackgroundTaskServer.NO_PARTITION : partition;
    }

    void onboardNewWorkIfPossible() {
        if (!pollIntervalInSecondsTimeBoxIsAboutToPass() && canOnboardNewWork()) {
            checkForEnqueuedTasks();
        }
    }

    void checkForEnqueuedTasks() {
        Integer partition = getPartition();
        if (partition == BackgroundTaskServer.NO_PARTITION) {
            return;
        }
        try {
            if (this.reentrantLock.tryLock()) {
                LOGGER.debug("Looking for enqueued tasks... ");
                PageRequest workPageRequest = this.workDistributionStrategy.getWorkPageRequest();
                if (workPageRequest.getLimit() > 0) {
                    List<Task> tasksByPartition = this.storageProvider.getTasksByPartition(StateName.ENQUEUED, workPageRequest, partition);
                    BackgroundTaskServer backgroundTaskServer = this.backgroundTaskServer;
                    Objects.requireNonNull(backgroundTaskServer);
                    tasksByPartition.forEach(backgroundTaskServer::processTask);
                }
            }
        } finally {
            if (this.reentrantLock.isHeldByCurrentThread()) {
                this.reentrantLock.unlock();
            }
        }
    }

    void processRecurringTasks(List<RecurringTask> list) {
        Integer partition = getPartition();
        if (partition == BackgroundTaskServer.NO_PARTITION) {
            return;
        }
        LOGGER.debug("Found {} recurring tasks", Integer.valueOf(list.size()));
        List<Task> list2 = (List) list.stream().filter(recurringTask -> {
            return mustSchedule(recurringTask, partition);
        }).map((v0) -> {
            return v0.toScheduledTask();
        }).collect(Collectors.toList());
        if (list2.isEmpty()) {
            return;
        }
        this.storageProvider.saveByPartition(list2, partition);
    }

    boolean mustSchedule(RecurringTask recurringTask, Integer num) {
        return recurringTask.getNextRun().isBefore(Instant.now().plus((TemporalAmount) this.durationPollIntervalTimeBox).plusSeconds(1L)) && !this.storageProvider.recurringTaskExistsByPartition(recurringTask.getId(), num, StateName.SCHEDULED, StateName.ENQUEUED, StateName.PROCESSING);
    }

    void processTaskList(Supplier<List<Task>> supplier, Consumer<Task> consumer) {
        List<Task> tasksToProcess = getTasksToProcess(supplier);
        while (true) {
            List<Task> list = tasksToProcess;
            if (list.isEmpty()) {
                return;
            }
            processTaskList(list, consumer);
            tasksToProcess = getTasksToProcess(supplier);
        }
    }

    void processTaskList(List<Task> list, Consumer<Task> consumer) {
        Integer partition = getPartition();
        if (partition == BackgroundTaskServer.NO_PARTITION || list.isEmpty()) {
            return;
        }
        try {
            list.forEach(consumer);
            this.taskFilterUtils.runOnStateElectionFilter(list);
            this.storageProvider.saveByPartition(list, partition);
            this.taskFilterUtils.runOnStateAppliedFilters(list);
        } catch (ConcurrentTaskModificationException e) {
            try {
                this.concurrentTaskModificationResolver.resolve(e);
            } catch (UnresolvableConcurrentTaskModificationException e2) {
                throw new SevereCarrotException("Could not resolve ConcurrentTaskModificationException", e2);
            }
        }
    }

    BackgroundTaskServerStatus backgroundTaskServerStatus() {
        return this.backgroundTaskServer.getServerStatus();
    }

    public void startProcessing(Task task, Thread thread) {
        this.currentlyProcessedTasks.put(task, thread);
    }

    public void stopProcessing(Task task) {
        this.currentlyProcessedTasks.remove(task);
    }

    public Thread getThreadProcessingTask(Task task) {
        return this.currentlyProcessedTasks.get(task);
    }

    public int getOccupiedWorkerCount() {
        return this.occupiedWorkers.get();
    }

    public void notifyThreadOccupied() {
        this.occupiedWorkers.incrementAndGet();
    }

    public void notifyThreadIdle() {
        this.occupiedWorkers.decrementAndGet();
        if (this.workDistributionStrategy.canOnboardNewWork()) {
            checkForEnqueuedTasks();
        }
    }

    private List<Task> getTasksToProcess(Supplier<List<Task>> supplier) {
        return pollIntervalInSecondsTimeBoxIsAboutToPass() ? Collections.emptyList() : supplier.get();
    }

    private void updateCurrentlyProcessingTask(Task task) {
        try {
            task.updateProcessing();
        } catch (ClassCastException e) {
        }
    }

    private boolean pollIntervalInSecondsTimeBoxIsAboutToPass() {
        boolean z = Duration.between(this.runStartTime, Instant.now()).compareTo(this.durationPollIntervalTimeBox) >= 0;
        if (z) {
            LOGGER.debug("Carrot is passing the poll interval in seconds timebox because of too many tasks.");
        }
        return z;
    }

    private List<RecurringTask> getRecurringTasks(Integer num) {
        if (this.recurringTasks.size() != this.storageProvider.countRecurringTasksByPartition(num)) {
            this.recurringTasks.clear();
            this.recurringTasks.addAll(this.storageProvider.getRecurringTasksByPartition(num));
        }
        return this.recurringTasks;
    }

    ConcurrentTaskModificationResolver createConcurrentTaskModificationResolver() {
        return this.backgroundTaskServer.getConfiguration().concurrentTaskModificationPolicy.toConcurrentTaskModificationResolver(this.storageProvider, this);
    }
}
