package com.facebook.presto.raptor.metadata;

import com.facebook.presto.raptor.RaptorErrorCode;
import com.facebook.presto.raptor.backup.BackupStore;
import com.facebook.presto.raptor.storage.StorageService;
import com.facebook.presto.raptor.util.DaoSupplier;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.PrestoException;
import com.google.common.annotations.VisibleForTesting;
import io.airlift.concurrent.Threads;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import java.io.File;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;

/* loaded from: input_file:com/facebook/presto/raptor/metadata/ShardCleaner.class */
public class ShardCleaner {
    private static final Logger log = Logger.get(ShardCleaner.class);
    private final ShardDao dao;
    private final String currentNode;
    private final boolean coordinator;
    private final StorageService storageService;
    private final Optional<BackupStore> backupStore;
    private final Duration maxTransactionAge;
    private final Duration transactionCleanerInterval;
    private final Duration localCleanerInterval;
    private final Duration localCleanTime;
    private final Duration localPurgeTime;
    private final Duration backupCleanerInterval;
    private final Duration backupCleanTime;
    private final Duration backupPurgeTime;
    private final ScheduledExecutorService scheduler;
    private final ExecutorService backupExecutor;
    private final AtomicBoolean started;

    @Inject
    public ShardCleaner(DaoSupplier<ShardDao> daoSupplier, NodeManager nodeManager, StorageService storageService, Optional<BackupStore> optional, ShardCleanerConfig shardCleanerConfig) {
        this(daoSupplier, nodeManager.getCurrentNode().getNodeIdentifier(), nodeManager.getCoordinators().contains(nodeManager.getCurrentNode()), storageService, optional, shardCleanerConfig.getMaxTransactionAge(), shardCleanerConfig.getTransactionCleanerInterval(), shardCleanerConfig.getLocalCleanerInterval(), shardCleanerConfig.getLocalCleanTime(), shardCleanerConfig.getLocalPurgeTime(), shardCleanerConfig.getBackupCleanerInterval(), shardCleanerConfig.getBackupCleanTime(), shardCleanerConfig.getBackupPurgeTime(), shardCleanerConfig.getBackupDeletionThreads());
    }

    public ShardCleaner(DaoSupplier<ShardDao> daoSupplier, String str, boolean z, StorageService storageService, Optional<BackupStore> optional, Duration duration, Duration duration2, Duration duration3, Duration duration4, Duration duration5, Duration duration6, Duration duration7, Duration duration8, int i) {
        this.started = new AtomicBoolean();
        this.dao = daoSupplier.onDemand();
        this.currentNode = (String) Objects.requireNonNull(str, "currentNode is null");
        this.coordinator = z;
        this.storageService = (StorageService) Objects.requireNonNull(storageService, "storageService is null");
        this.backupStore = (Optional) Objects.requireNonNull(optional, "backupStore is null");
        this.maxTransactionAge = (Duration) Objects.requireNonNull(duration, "maxTransactionAge");
        this.transactionCleanerInterval = (Duration) Objects.requireNonNull(duration2, "transactionCleanerInterval is null");
        this.localCleanerInterval = (Duration) Objects.requireNonNull(duration3, "localCleanerInterval is null");
        this.localCleanTime = (Duration) Objects.requireNonNull(duration4, "localCleanTime is null");
        this.localPurgeTime = (Duration) Objects.requireNonNull(duration5, "localPurgeTime is null");
        this.backupCleanerInterval = (Duration) Objects.requireNonNull(duration6, "backupCleanerInterval is null");
        this.backupCleanTime = (Duration) Objects.requireNonNull(duration7, "backupCleanTime is null");
        this.backupPurgeTime = (Duration) Objects.requireNonNull(duration8, "backupPurgeTime is null");
        this.scheduler = Executors.newScheduledThreadPool(2, Threads.daemonThreadsNamed("shard-cleaner-%s"));
        this.backupExecutor = Executors.newFixedThreadPool(i, Threads.daemonThreadsNamed("shard-cleaner-backup-%s"));
    }

    @PostConstruct
    public void start() {
        if (this.started.getAndSet(true)) {
            return;
        }
        startJobs();
    }

    @PreDestroy
    public void shutdown() {
        this.scheduler.shutdownNow();
        this.backupExecutor.shutdownNow();
    }

    private void startJobs() {
        if (this.coordinator) {
            startTransactionCleanup();
            if (this.backupStore.isPresent()) {
                startBackupCleanup();
            }
        }
        startLocalCleanup();
    }

    private void startTransactionCleanup() {
        this.scheduler.scheduleWithFixedDelay(() -> {
            try {
                abortOldTransactions();
                deleteOldShards();
            } catch (Throwable th) {
                log.error(th, "Error cleaning transactions");
            }
        }, 0L, this.transactionCleanerInterval.toMillis(), TimeUnit.MILLISECONDS);
    }

    private void startBackupCleanup() {
        this.scheduler.scheduleWithFixedDelay(() -> {
            try {
                cleanBackupShards();
                purgeBackupShards();
            } catch (Throwable th) {
                log.error(th, "Error cleaning backup shards");
            }
        }, 0L, this.backupCleanerInterval.toMillis(), TimeUnit.MILLISECONDS);
    }

    private void startLocalCleanup() {
        this.scheduler.scheduleWithFixedDelay(() -> {
            try {
                TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextLong(1L, this.localCleanerInterval.roundTo(TimeUnit.SECONDS)));
                cleanLocalShards();
                purgeLocalShards();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (Throwable th) {
                log.error(th, "Error cleaning local shards");
            }
        }, 0L, this.localCleanerInterval.toMillis(), TimeUnit.MILLISECONDS);
    }

    @VisibleForTesting
    void abortOldTransactions() {
        this.dao.abortOldTransactions(maxTimestamp(this.maxTransactionAge));
    }

    @VisibleForTesting
    void deleteOldShards() {
        while (!Thread.currentThread().isInterrupted()) {
            List<UUID> oldCreatedShardsBatch = this.dao.getOldCreatedShardsBatch();
            this.dao.insertDeletedShards(oldCreatedShardsBatch);
            this.dao.deleteCreatedShards(oldCreatedShardsBatch);
            List<ShardNodeId> oldCreatedShardNodesBatch = this.dao.getOldCreatedShardNodesBatch();
            this.dao.insertDeletedShardNodes(oldCreatedShardNodesBatch);
            this.dao.deleteCreatedShardNodes(oldCreatedShardNodesBatch);
            if (oldCreatedShardsBatch.isEmpty() && oldCreatedShardNodesBatch.isEmpty()) {
                return;
            }
        }
    }

    @VisibleForTesting
    void cleanLocalShards() {
        while (!Thread.currentThread().isInterrupted()) {
            List<UUID> cleanableShardNodesBatch = this.dao.getCleanableShardNodesBatch(this.currentNode, maxTimestamp(this.localCleanTime));
            if (cleanableShardNodesBatch.isEmpty()) {
                return;
            }
            long nanoTime = System.nanoTime();
            Iterator<UUID> it = cleanableShardNodesBatch.iterator();
            while (it.hasNext()) {
                deleteFile(this.storageService.getStorageFile(it.next()));
            }
            this.dao.updateCleanedShardNodes(cleanableShardNodesBatch, getCurrentNodeId());
            log.info("Cleaned %s local shards in %s", new Object[]{Integer.valueOf(cleanableShardNodesBatch.size()), Duration.nanosSince(nanoTime)});
        }
    }

    @VisibleForTesting
    void purgeLocalShards() {
        while (!Thread.currentThread().isInterrupted()) {
            List<UUID> purgableShardNodesBatch = this.dao.getPurgableShardNodesBatch(this.currentNode, maxTimestamp(this.localPurgeTime));
            if (purgableShardNodesBatch.isEmpty()) {
                return;
            }
            long nanoTime = System.nanoTime();
            Iterator<UUID> it = purgableShardNodesBatch.iterator();
            while (it.hasNext()) {
                deleteFile(this.storageService.getStorageFile(it.next()));
            }
            this.dao.deletePurgedShardNodes(purgableShardNodesBatch, getCurrentNodeId());
            log.info("Purged %s local shards in %s", new Object[]{Integer.valueOf(purgableShardNodesBatch.size()), Duration.nanosSince(nanoTime)});
        }
    }

    @VisibleForTesting
    void cleanBackupShards() {
        while (!Thread.currentThread().isInterrupted()) {
            List<UUID> cleanableShardsBatch = this.dao.getCleanableShardsBatch(maxTimestamp(this.backupCleanTime));
            if (cleanableShardsBatch.isEmpty()) {
                return;
            }
            long nanoTime = System.nanoTime();
            executeDeletes(cleanableShardsBatch);
            this.dao.updateCleanedShards(cleanableShardsBatch);
            log.info("Cleaned %s backup shards in %s", new Object[]{Integer.valueOf(cleanableShardsBatch.size()), Duration.nanosSince(nanoTime)});
        }
    }

    @VisibleForTesting
    void purgeBackupShards() {
        while (!Thread.currentThread().isInterrupted()) {
            List<UUID> purgableShardsBatch = this.dao.getPurgableShardsBatch(maxTimestamp(this.backupPurgeTime));
            if (purgableShardsBatch.isEmpty()) {
                return;
            }
            long nanoTime = System.nanoTime();
            executeDeletes(purgableShardsBatch);
            this.dao.deletePurgedShards(purgableShardsBatch);
            log.info("Purged %s backup shards in %s", new Object[]{Integer.valueOf(purgableShardsBatch.size()), Duration.nanosSince(nanoTime)});
        }
    }

    private void executeDeletes(List<UUID> list) {
        ArrayList arrayList = new ArrayList();
        for (UUID uuid : list) {
            arrayList.add(Executors.callable(() -> {
                this.backupStore.get().deleteShard(uuid);
            }));
        }
        try {
            boolean z = false;
            Iterator it = this.backupExecutor.invokeAll(arrayList).iterator();
            while (it.hasNext()) {
                try {
                    ((Future) it.next()).get();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                } catch (ExecutionException e2) {
                    if (!z) {
                        z = true;
                        log.error(e2.getCause(), "Error deleting backup shard");
                    }
                }
            }
        } catch (InterruptedException e3) {
            Thread.currentThread().interrupt();
        }
    }

    private int getCurrentNodeId() {
        Integer nodeId = this.dao.getNodeId(this.currentNode);
        if (nodeId == null) {
            throw new PrestoException(RaptorErrorCode.RAPTOR_ERROR, "Node does not exist: " + this.currentNode);
        }
        return nodeId.intValue();
    }

    private static Timestamp maxTimestamp(Duration duration) {
        return new Timestamp(System.currentTimeMillis() - duration.toMillis());
    }

    private static void deleteFile(File file) {
        file.delete();
    }
}
