package com.facebook.presto.raptor.metadata;

import com.facebook.airlift.concurrent.Threads;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.CounterStat;
import com.facebook.presto.raptor.backup.BackupStore;
import com.facebook.presto.raptor.filesystem.FileSystemUtil;
import com.facebook.presto.raptor.storage.OrcDataEnvironment;
import com.facebook.presto.raptor.storage.StorageService;
import com.facebook.presto.raptor.util.DaoSupplier;
import com.facebook.presto.spi.NodeManager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.airlift.units.Duration;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.concurrent.GuardedBy;
import javax.inject.Inject;
import org.apache.hadoop.fs.Path;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

/* loaded from: input_file:com/facebook/presto/raptor/metadata/ShardCleaner.class */
public class ShardCleaner {
    private static final Logger log = Logger.get(ShardCleaner.class);
    private final ShardDao dao;
    private final String currentNode;
    private final boolean coordinator;
    private final Ticker ticker;
    private final StorageService storageService;
    private final Optional<BackupStore> backupStore;
    private final Duration maxTransactionAge;
    private final Duration transactionCleanerInterval;
    private final Duration localCleanerInterval;
    private final Duration localCleanTime;
    private final Duration backupCleanerInterval;
    private final Duration backupCleanTime;
    private final ScheduledExecutorService scheduler;
    private final ExecutorService backupExecutor;
    private final OrcDataEnvironment orcDataEnvironment;
    private final Duration maxCompletedTransactionAge;
    private final AtomicBoolean started;
    private final CounterStat transactionJobErrors;
    private final CounterStat backupJobErrors;
    private final CounterStat localJobErrors;
    private final CounterStat backupShardsQueued;
    private final CounterStat backupShardsCleaned;
    private final CounterStat localShardsCleaned;

    @GuardedBy("this")
    private final Map<UUID, Long> shardsToClean;

    @Inject
    public ShardCleaner(DaoSupplier<ShardDao> daoSupplier, Ticker ticker, NodeManager nodeManager, StorageService storageService, Optional<BackupStore> optional, OrcDataEnvironment orcDataEnvironment, ShardCleanerConfig shardCleanerConfig) {
        this(daoSupplier, nodeManager.getCurrentNode().getNodeIdentifier(), nodeManager.getCurrentNode().isCoordinator(), ticker, storageService, optional, orcDataEnvironment, shardCleanerConfig.getMaxTransactionAge(), shardCleanerConfig.getTransactionCleanerInterval(), shardCleanerConfig.getLocalCleanerInterval(), shardCleanerConfig.getLocalCleanTime(), shardCleanerConfig.getBackupCleanerInterval(), shardCleanerConfig.getBackupCleanTime(), shardCleanerConfig.getBackupDeletionThreads(), shardCleanerConfig.getMaxCompletedTransactionAge());
    }

    public ShardCleaner(DaoSupplier<ShardDao> daoSupplier, String str, boolean z, Ticker ticker, StorageService storageService, Optional<BackupStore> optional, OrcDataEnvironment orcDataEnvironment, Duration duration, Duration duration2, Duration duration3, Duration duration4, Duration duration5, Duration duration6, int i, Duration duration7) {
        this.started = new AtomicBoolean();
        this.transactionJobErrors = new CounterStat();
        this.backupJobErrors = new CounterStat();
        this.localJobErrors = new CounterStat();
        this.backupShardsQueued = new CounterStat();
        this.backupShardsCleaned = new CounterStat();
        this.localShardsCleaned = new CounterStat();
        this.shardsToClean = new HashMap();
        this.dao = daoSupplier.onDemand();
        this.currentNode = (String) Objects.requireNonNull(str, "currentNode is null");
        this.coordinator = z;
        this.ticker = (Ticker) Objects.requireNonNull(ticker, "ticker is null");
        this.storageService = (StorageService) Objects.requireNonNull(storageService, "storageService is null");
        this.backupStore = (Optional) Objects.requireNonNull(optional, "backupStore is null");
        this.maxTransactionAge = (Duration) Objects.requireNonNull(duration, "maxTransactionAge");
        this.transactionCleanerInterval = (Duration) Objects.requireNonNull(duration2, "transactionCleanerInterval is null");
        this.localCleanerInterval = (Duration) Objects.requireNonNull(duration3, "localCleanerInterval is null");
        this.localCleanTime = (Duration) Objects.requireNonNull(duration4, "localCleanTime is null");
        this.backupCleanerInterval = (Duration) Objects.requireNonNull(duration5, "backupCleanerInterval is null");
        this.backupCleanTime = (Duration) Objects.requireNonNull(duration6, "backupCleanTime is null");
        this.scheduler = Executors.newScheduledThreadPool(2, Threads.daemonThreadsNamed("shard-cleaner-%s"));
        this.backupExecutor = Executors.newFixedThreadPool(i, Threads.daemonThreadsNamed("shard-cleaner-backup-%s"));
        this.orcDataEnvironment = (OrcDataEnvironment) Objects.requireNonNull(orcDataEnvironment, "environment is null");
        this.maxCompletedTransactionAge = (Duration) Objects.requireNonNull(duration7, "maxCompletedTransactionAge is null");
    }

    @PostConstruct
    public void start() {
        if (this.started.getAndSet(true)) {
            return;
        }
        startJobs();
    }

    @PreDestroy
    public void shutdown() {
        this.scheduler.shutdownNow();
        this.backupExecutor.shutdownNow();
    }

    @Managed
    @Nested
    public CounterStat getTransactionJobErrors() {
        return this.transactionJobErrors;
    }

    @Managed
    @Nested
    public CounterStat getBackupJobErrors() {
        return this.backupJobErrors;
    }

    @Managed
    @Nested
    public CounterStat getLocalJobErrors() {
        return this.localJobErrors;
    }

    @Managed
    @Nested
    public CounterStat getBackupShardsQueued() {
        return this.backupShardsQueued;
    }

    @Managed
    @Nested
    public CounterStat getBackupShardsCleaned() {
        return this.backupShardsCleaned;
    }

    @Managed
    @Nested
    public CounterStat getLocalShardsCleaned() {
        return this.localShardsCleaned;
    }

    private void startJobs() {
        if (this.coordinator) {
            startTransactionCleanup();
            if (this.backupStore.isPresent()) {
                scheduleBackupCleanup();
            }
        }
        if (this.backupStore.isPresent()) {
            scheduleLocalCleanup();
        }
    }

    private void startTransactionCleanup() {
        this.scheduler.scheduleWithFixedDelay(() -> {
            try {
                abortOldTransactions();
                deleteOldCompletedTransactions();
                deleteOldShards();
            } catch (Throwable th) {
                log.error(th, "Error cleaning transactions");
                this.transactionJobErrors.update(1L);
            }
        }, 0L, this.transactionCleanerInterval.toMillis(), TimeUnit.MILLISECONDS);
    }

    private void scheduleBackupCleanup() {
        this.scheduler.scheduleWithFixedDelay(this::runBackupCleanup, 0L, this.backupCleanerInterval.toMillis(), TimeUnit.MILLISECONDS);
    }

    private void scheduleLocalCleanup() {
        Set<UUID> localShards = getLocalShards();
        this.scheduler.submit(() -> {
            waitJitterTime();
            runLocalCleanupImmediately(localShards);
        });
        this.scheduler.scheduleWithFixedDelay(() -> {
            waitJitterTime();
            runLocalCleanup();
        }, 0L, this.localCleanerInterval.toMillis(), TimeUnit.MILLISECONDS);
    }

    private synchronized void runBackupCleanup() {
        try {
            cleanBackupShards();
        } catch (Throwable th) {
            log.error(th, "Error cleaning backup shards");
            this.backupJobErrors.update(1L);
        }
    }

    private void waitJitterTime() {
        try {
            TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextLong(1L, this.localCleanerInterval.roundTo(TimeUnit.SECONDS)));
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private synchronized void runLocalCleanupImmediately(Set<UUID> set) {
        try {
            cleanLocalShardsImmediately(set);
        } catch (Throwable th) {
            log.error(th, "Error cleaning local shards");
            this.localJobErrors.update(1L);
        }
    }

    private synchronized void runLocalCleanup() {
        try {
            cleanLocalShards();
        } catch (Throwable th) {
            log.error(th, "Error cleaning local shards");
            this.localJobErrors.update(1L);
        }
    }

    @Managed
    public void startBackupCleanup() {
        this.scheduler.submit(this::runBackupCleanup);
    }

    @Managed
    public void startLocalCleanup() {
        this.scheduler.submit(this::runLocalCleanup);
    }

    @Managed
    public void startLocalCleanupImmediately() {
        this.scheduler.submit(() -> {
            runLocalCleanupImmediately(getLocalShards());
        });
    }

    @VisibleForTesting
    void abortOldTransactions() {
        this.dao.abortOldTransactions(maxTimestamp(this.maxTransactionAge));
    }

    @VisibleForTesting
    void deleteOldShards() {
        while (!Thread.currentThread().isInterrupted()) {
            List<UUID> oldCreatedShardsBatch = this.dao.getOldCreatedShardsBatch();
            if (oldCreatedShardsBatch.isEmpty()) {
                return;
            }
            this.dao.insertDeletedShards(oldCreatedShardsBatch);
            this.dao.deleteCreatedShards(oldCreatedShardsBatch);
            this.backupShardsQueued.update(oldCreatedShardsBatch.size());
        }
    }

    @VisibleForTesting
    void deleteOldCompletedTransactions() {
        while (!Thread.currentThread().isInterrupted() && this.dao.deleteOldCompletedTransactions(maxTimestamp(this.maxCompletedTransactionAge)) >= 10000) {
        }
    }

    @VisibleForTesting
    synchronized Set<UUID> getLocalShards() {
        return this.storageService.getStorageShards();
    }

    @VisibleForTesting
    synchronized void cleanLocalShardsImmediately(Set<UUID> set) throws IOException {
        Sets.SetView difference = Sets.difference(set, (Set) this.dao.getNodeShardsAndDeltas(this.currentNode, null).stream().map((v0) -> {
            return v0.getShardUuid();
        }).collect(Collectors.toSet()));
        Iterator it = difference.iterator();
        while (it.hasNext()) {
            deleteFile(this.storageService.getStorageFile((UUID) it.next()));
        }
        this.localShardsCleaned.update(difference.size());
        log.info("Cleaned %s local shards immediately", new Object[]{Integer.valueOf(difference.size())});
    }

    @VisibleForTesting
    synchronized void cleanLocalShards() throws IOException {
        Set<UUID> localShards = getLocalShards();
        Set set = (Set) this.dao.getNodeShardsAndDeltas(this.currentNode, null).stream().map((v0) -> {
            return v0.getShardUuid();
        }).collect(Collectors.toSet());
        Iterator it = set.iterator();
        while (it.hasNext()) {
            this.shardsToClean.remove((UUID) it.next());
        }
        for (UUID uuid : localShards) {
            if (!set.contains(uuid)) {
                this.shardsToClean.putIfAbsent(uuid, Long.valueOf(this.ticker.read()));
            }
        }
        long read = this.ticker.read() - this.localCleanTime.roundTo(TimeUnit.NANOSECONDS);
        Set<UUID> set2 = (Set) this.shardsToClean.entrySet().stream().filter(entry -> {
            return ((Long) entry.getValue()).longValue() < read;
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toSet());
        if (set2.isEmpty()) {
            return;
        }
        for (UUID uuid2 : set2) {
            deleteFile(this.storageService.getStorageFile(uuid2));
            this.shardsToClean.remove(uuid2);
        }
        this.localShardsCleaned.update(set2.size());
        log.info("Cleaned %s local shards", new Object[]{Integer.valueOf(set2.size())});
    }

    @VisibleForTesting
    void cleanBackupShards() {
        Set newConcurrentHashSet = Sets.newConcurrentHashSet();
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        boolean z = true;
        while (!Thread.currentThread().isInterrupted()) {
            Set<UUID> of = ImmutableSet.of();
            if (z && newConcurrentHashSet.size() < 1000) {
                of = this.dao.getCleanableShardsBatch(maxTimestamp(this.backupCleanTime));
                z = false;
            }
            if (of.isEmpty() && newConcurrentHashSet.isEmpty()) {
                return;
            }
            ImmutableSet<UUID> copyOf = ImmutableSet.copyOf(Sets.difference(of, newConcurrentHashSet));
            newConcurrentHashSet.addAll(copyOf);
            for (UUID uuid : copyOf) {
                CompletableFuture.runAsync(() -> {
                    this.backupStore.get().deleteShard(uuid);
                }, this.backupExecutor).thenAccept(r5 -> {
                    linkedBlockingQueue.add(uuid);
                }).whenComplete((r11, th) -> {
                    if (th != null) {
                        log.error(th, "Error cleaning backup shard: %s", new Object[]{uuid});
                        this.backupJobErrors.update(1L);
                        newConcurrentHashSet.remove(uuid);
                    }
                });
            }
            Collection<?> drain = drain(linkedBlockingQueue, Math.min(100, newConcurrentHashSet.size()), 100L, TimeUnit.MILLISECONDS);
            if (!drain.isEmpty()) {
                newConcurrentHashSet.removeAll(drain);
                this.dao.deleteCleanedShards(drain);
                this.backupShardsCleaned.update(drain.size());
                z = true;
            }
        }
    }

    private static <T> Collection<T> drain(BlockingQueue<T> blockingQueue, int i, long j, TimeUnit timeUnit) {
        long nanoTime = System.nanoTime();
        ArrayList arrayList = new ArrayList();
        while (true) {
            blockingQueue.drainTo(arrayList);
            if (arrayList.size() >= i) {
                return arrayList;
            }
            long nanos = timeUnit.toNanos(j) - (System.nanoTime() - nanoTime);
            if (nanos <= 0) {
                return arrayList;
            }
            try {
                T poll = blockingQueue.poll(nanos, TimeUnit.NANOSECONDS);
                if (poll != null) {
                    arrayList.add(poll);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return arrayList;
            }
        }
    }

    private static Timestamp maxTimestamp(Duration duration) {
        return new Timestamp(System.currentTimeMillis() - duration.toMillis());
    }

    private void deleteFile(Path path) throws IOException {
        this.orcDataEnvironment.getFileSystem(FileSystemUtil.DEFAULT_RAPTOR_CONTEXT).delete(path, false);
    }
}
