package com.facebook.presto.raptor.storage;

import com.facebook.presto.raptor.RaptorErrorCode;
import com.facebook.presto.raptor.backup.BackupStore;
import com.facebook.presto.raptor.metadata.ShardManager;
import com.facebook.presto.raptor.metadata.ShardMetadata;
import com.facebook.presto.raptor.util.PrioritizedFifoExecutor;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.PrestoException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.concurrent.Threads;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.Comparator;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import org.weakref.jmx.Flatten;
import org.weakref.jmx.Managed;

/* loaded from: input_file:com/facebook/presto/raptor/storage/ShardRecoveryManager.class */
public class ShardRecoveryManager {
    private static final Logger log = Logger.get(ShardRecoveryManager.class);
    private final StorageService storageService;
    private final Optional<BackupStore> backupStore;
    private final String nodeIdentifier;
    private final ShardManager shardManager;
    private final Duration missingShardDiscoveryInterval;
    private final AtomicBoolean started;
    private final MissingShardsQueue shardQueue;
    private final ScheduledExecutorService missingShardExecutor;
    private final ExecutorService executorService;
    private final ShardRecoveryStats stats;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/facebook/presto/raptor/storage/ShardRecoveryManager$MissingShard.class */
    public static final class MissingShard {
        private final UUID shardUuid;
        private final OptionalLong shardSize;
        private final boolean active;

        private MissingShard(UUID uuid, OptionalLong optionalLong, boolean z) {
            this.shardUuid = (UUID) Objects.requireNonNull(uuid, "shardUuid is null");
            this.shardSize = (OptionalLong) Objects.requireNonNull(optionalLong, "shardSize is null");
            this.active = z;
        }

        public static MissingShard createBackgroundMissingShard(UUID uuid, long j) {
            return new MissingShard(uuid, OptionalLong.of(j), false);
        }

        public static MissingShard createActiveMissingShard(UUID uuid) {
            return new MissingShard(uuid, OptionalLong.empty(), true);
        }

        public UUID getShardUuid() {
            return this.shardUuid;
        }

        public OptionalLong getShardSize() {
            return this.shardSize;
        }

        public boolean isActive() {
            return this.active;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            MissingShard missingShard = (MissingShard) obj;
            return Objects.equals(Boolean.valueOf(this.active), Boolean.valueOf(missingShard.active)) && Objects.equals(this.shardUuid, missingShard.shardUuid);
        }

        public int hashCode() {
            return Objects.hash(this.shardUuid, Boolean.valueOf(this.active));
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("shardUuid", this.shardUuid).add("active", this.active).toString();
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:com/facebook/presto/raptor/storage/ShardRecoveryManager$MissingShardComparator.class */
    static class MissingShardComparator implements Comparator<MissingShardRunnable> {
        MissingShardComparator() {
        }

        @Override // java.util.Comparator
        public int compare(MissingShardRunnable missingShardRunnable, MissingShardRunnable missingShardRunnable2) {
            if (missingShardRunnable.isActive() == missingShardRunnable2.isActive()) {
                return 0;
            }
            return missingShardRunnable.isActive() ? -1 : 1;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/facebook/presto/raptor/storage/ShardRecoveryManager$MissingShardRecovery.class */
    public class MissingShardRecovery implements MissingShardRunnable {
        private final UUID shardUuid;
        private final OptionalLong shardSize;
        private final boolean active;

        public MissingShardRecovery(UUID uuid, OptionalLong optionalLong, boolean z) {
            this.shardUuid = (UUID) Objects.requireNonNull(uuid, "shardUuid is null");
            this.shardSize = (OptionalLong) Objects.requireNonNull(optionalLong, "shardSize is null");
            this.active = z;
        }

        @Override // java.lang.Runnable
        public void run() {
            ShardRecoveryManager.this.restoreFromBackup(this.shardUuid, this.shardSize);
        }

        @Override // com.facebook.presto.raptor.storage.ShardRecoveryManager.MissingShardRunnable
        public boolean isActive() {
            return this.active;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/facebook/presto/raptor/storage/ShardRecoveryManager$MissingShardRunnable.class */
    public interface MissingShardRunnable extends Runnable {
        boolean isActive();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/facebook/presto/raptor/storage/ShardRecoveryManager$MissingShardsQueue.class */
    public class MissingShardsQueue {
        private final LoadingCache<MissingShard, ListenableFuture<?>> queuedMissingShards;

        public MissingShardsQueue(final PrioritizedFifoExecutor<MissingShardRunnable> prioritizedFifoExecutor) {
            Objects.requireNonNull(prioritizedFifoExecutor, "shardRecoveryExecutor is null");
            this.queuedMissingShards = CacheBuilder.newBuilder().build(new CacheLoader<MissingShard, ListenableFuture<?>>() { // from class: com.facebook.presto.raptor.storage.ShardRecoveryManager.MissingShardsQueue.1
                public ListenableFuture<?> load(MissingShard missingShard) {
                    ListenableFuture<?> submit = prioritizedFifoExecutor.submit(new MissingShardRecovery(missingShard.getShardUuid(), missingShard.getShardSize(), missingShard.isActive()));
                    submit.addListener(() -> {
                        MissingShardsQueue.this.queuedMissingShards.invalidate(missingShard);
                    }, MoreExecutors.directExecutor());
                    return submit;
                }
            });
        }

        public ListenableFuture<?> submit(MissingShard missingShard) throws ExecutionException {
            return (ListenableFuture) this.queuedMissingShards.get(missingShard);
        }
    }

    @Inject
    public ShardRecoveryManager(StorageService storageService, Optional<BackupStore> optional, NodeManager nodeManager, ShardManager shardManager, StorageManagerConfig storageManagerConfig) {
        this(storageService, optional, nodeManager, shardManager, storageManagerConfig.getMissingShardDiscoveryInterval(), storageManagerConfig.getRecoveryThreads());
    }

    public ShardRecoveryManager(StorageService storageService, Optional<BackupStore> optional, NodeManager nodeManager, ShardManager shardManager, Duration duration, int i) {
        this.started = new AtomicBoolean();
        this.missingShardExecutor = Executors.newScheduledThreadPool(1, Threads.daemonThreadsNamed("missing-shard-discovery"));
        this.executorService = Executors.newCachedThreadPool(Threads.daemonThreadsNamed("shard-recovery-%s"));
        this.storageService = (StorageService) Objects.requireNonNull(storageService, "storageService is null");
        this.backupStore = (Optional) Objects.requireNonNull(optional, "backupStore is null");
        this.nodeIdentifier = ((NodeManager) Objects.requireNonNull(nodeManager, "nodeManager is null")).getCurrentNode().getNodeIdentifier();
        this.shardManager = (ShardManager) Objects.requireNonNull(shardManager, "shardManager is null");
        this.missingShardDiscoveryInterval = (Duration) Objects.requireNonNull(duration, "missingShardDiscoveryInterval is null");
        this.shardQueue = new MissingShardsQueue(new PrioritizedFifoExecutor(this.executorService, i, new MissingShardComparator()));
        this.stats = new ShardRecoveryStats();
    }

    @PostConstruct
    public void start() {
        if (this.backupStore.isPresent() && this.started.compareAndSet(false, true)) {
            enqueueMissingShards();
        }
    }

    @PreDestroy
    public void shutdown() {
        this.executorService.shutdownNow();
        this.missingShardExecutor.shutdownNow();
    }

    private void enqueueMissingShards() {
        this.missingShardExecutor.scheduleWithFixedDelay(() -> {
            try {
                TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(1, 30));
                for (ShardMetadata shardMetadata : getMissingShards()) {
                    this.stats.incrementBackgroundShardRecovery();
                    Futures.addCallback(this.shardQueue.submit(MissingShard.createBackgroundMissingShard(shardMetadata.getShardUuid(), shardMetadata.getCompressedSize())), failureCallback(th -> {
                        log.warn(th, "Error recovering shard: %s", new Object[]{shardMetadata.getShardUuid()});
                    }));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (Throwable th2) {
                log.error(th2, "Error creating shard recovery tasks");
            }
        }, 0L, this.missingShardDiscoveryInterval.toMillis(), TimeUnit.MILLISECONDS);
    }

    private Set<ShardMetadata> getMissingShards() {
        return (Set) this.shardManager.getNodeShards(this.nodeIdentifier).stream().filter(shardMetadata -> {
            return shardNeedsRecovery(shardMetadata.getShardUuid(), shardMetadata.getCompressedSize());
        }).collect(Collectors.toSet());
    }

    private boolean shardNeedsRecovery(UUID uuid, long j) {
        File storageFile = this.storageService.getStorageFile(uuid);
        return (storageFile.exists() && storageFile.length() == j) ? false : true;
    }

    public Future<?> recoverShard(UUID uuid) throws ExecutionException {
        Objects.requireNonNull(uuid, "shardUuid is null");
        this.stats.incrementActiveShardRecovery();
        return this.shardQueue.submit(MissingShard.createActiveMissingShard(uuid));
    }

    @VisibleForTesting
    void restoreFromBackup(UUID uuid, OptionalLong optionalLong) {
        File storageFile = this.storageService.getStorageFile(uuid);
        if (!this.backupStore.get().shardExists(uuid)) {
            this.stats.incrementShardRecoveryBackupNotFound();
            throw new PrestoException(RaptorErrorCode.RAPTOR_RECOVERY_ERROR, "No backup file found for shard: " + uuid);
        }
        if (storageFile.exists()) {
            if (!optionalLong.isPresent() || storageFile.length() == optionalLong.getAsLong()) {
                return;
            }
            log.warn("Local shard file is corrupt. Deleting local file: %s", new Object[]{storageFile});
            storageFile.delete();
        }
        File temporarySuffix = temporarySuffix(this.storageService.getStagingFile(uuid));
        this.storageService.createParents(temporarySuffix);
        log.info("Copying shard %s from backup...", new Object[]{uuid});
        long nanoTime = System.nanoTime();
        try {
            this.backupStore.get().restoreShard(uuid, temporarySuffix);
            Duration nanosSince = Duration.nanosSince(nanoTime);
            DataSize dataSize = new DataSize(temporarySuffix.length(), DataSize.Unit.BYTE);
            DataSize convertToMostSuccinctDataSize = dataRate(dataSize, nanosSince).convertToMostSuccinctDataSize();
            this.stats.addShardRecoveryDataRate(convertToMostSuccinctDataSize, dataSize, nanosSince);
            log.info("Copied shard %s from backup in %s (%s at %s/s)", new Object[]{uuid, nanosSince, dataSize, convertToMostSuccinctDataSize});
            this.storageService.createParents(storageFile);
            try {
                try {
                    Files.move(temporarySuffix.toPath(), storageFile.toPath(), StandardCopyOption.ATOMIC_MOVE);
                    temporarySuffix.delete();
                } catch (FileAlreadyExistsException e) {
                    temporarySuffix.delete();
                } catch (IOException e2) {
                    this.stats.incrementShardRecoveryFailure();
                    throw new PrestoException(RaptorErrorCode.RAPTOR_RECOVERY_ERROR, "Failed to move shard: " + uuid, e2);
                }
                if (storageFile.exists() && (!optionalLong.isPresent() || storageFile.length() == optionalLong.getAsLong())) {
                    this.stats.incrementShardRecoverySuccess();
                    return;
                }
                this.stats.incrementShardRecoveryFailure();
                log.info("Files do not match after recovery. Deleting local file: " + uuid);
                storageFile.delete();
                throw new PrestoException(RaptorErrorCode.RAPTOR_RECOVERY_ERROR, "File not recovered correctly: " + uuid);
            } catch (Throwable th) {
                temporarySuffix.delete();
                throw th;
            }
        } catch (PrestoException e3) {
            this.stats.incrementShardRecoveryFailure();
            temporarySuffix.delete();
            throw e3;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static DataSize dataRate(DataSize dataSize, Duration duration) {
        double bytes = dataSize.toBytes() / duration.getValue(TimeUnit.SECONDS);
        if (Double.isNaN(bytes) || Double.isInfinite(bytes)) {
            bytes = 0.0d;
        }
        return new DataSize(bytes, DataSize.Unit.BYTE).convertToMostSuccinctDataSize();
    }

    private static File temporarySuffix(File file) {
        return new File(file.getPath() + ".tmp-" + UUID.randomUUID());
    }

    @Managed
    @Flatten
    public ShardRecoveryStats getStats() {
        return this.stats;
    }

    private static <T> FutureCallback<T> failureCallback(final Consumer<Throwable> consumer) {
        return new FutureCallback<T>() { // from class: com.facebook.presto.raptor.storage.ShardRecoveryManager.1
            public void onSuccess(T t) {
            }

            public void onFailure(Throwable th) {
                consumer.accept(th);
            }
        };
    }
}
