package com.facebook.presto.raptor.storage;

import com.facebook.presto.raptor.NodeSupplier;
import com.facebook.presto.raptor.RaptorConnectorId;
import com.facebook.presto.raptor.backup.BackupStore;
import com.facebook.presto.raptor.metadata.ShardManager;
import com.facebook.presto.raptor.metadata.ShardMetadata;
import com.facebook.presto.spi.NodeManager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import io.airlift.concurrent.Threads;
import io.airlift.log.Logger;
import io.airlift.stats.CounterStat;
import io.airlift.units.Duration;
import java.io.File;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

/* loaded from: input_file:com/facebook/presto/raptor/storage/ShardEjector.class */
public class ShardEjector {
    private static final Logger log = Logger.get(ShardEjector.class);
    private final String currentNode;
    private final NodeSupplier nodeSupplier;
    private final ShardManager shardManager;
    private final StorageService storageService;
    private final Duration interval;
    private final Optional<BackupStore> backupStore;
    private final ScheduledExecutorService executor;
    private final AtomicBoolean started;
    private final CounterStat shardsEjected;
    private final CounterStat jobErrors;

    @Inject
    public ShardEjector(NodeManager nodeManager, NodeSupplier nodeSupplier, ShardManager shardManager, StorageService storageService, StorageManagerConfig storageManagerConfig, Optional<BackupStore> optional, RaptorConnectorId raptorConnectorId) {
        this(nodeManager.getCurrentNode().getNodeIdentifier(), nodeSupplier, shardManager, storageService, storageManagerConfig.getShardEjectorInterval(), optional, raptorConnectorId.toString());
    }

    public ShardEjector(String str, NodeSupplier nodeSupplier, ShardManager shardManager, StorageService storageService, Duration duration, Optional<BackupStore> optional, String str2) {
        this.started = new AtomicBoolean();
        this.shardsEjected = new CounterStat();
        this.jobErrors = new CounterStat();
        this.currentNode = (String) Objects.requireNonNull(str, "currentNode is null");
        this.nodeSupplier = (NodeSupplier) Objects.requireNonNull(nodeSupplier, "nodeSupplier is null");
        this.shardManager = (ShardManager) Objects.requireNonNull(shardManager, "shardManager is null");
        this.storageService = (StorageService) Objects.requireNonNull(storageService, "storageService is null");
        this.interval = (Duration) Objects.requireNonNull(duration, "interval is null");
        this.backupStore = (Optional) Objects.requireNonNull(optional, "backupStore is null");
        this.executor = Executors.newScheduledThreadPool(1, Threads.daemonThreadsNamed("shard-ejector-" + str2));
    }

    @PostConstruct
    public void start() {
        if (this.backupStore.isPresent() && !this.started.getAndSet(true)) {
            startJob();
        }
    }

    @PreDestroy
    public void shutdown() {
        this.executor.shutdownNow();
    }

    @Managed
    @Nested
    public CounterStat getShardsEjected() {
        return this.shardsEjected;
    }

    @Managed
    @Nested
    public CounterStat getJobErrors() {
        return this.jobErrors;
    }

    private void startJob() {
        this.executor.scheduleWithFixedDelay(() -> {
            try {
                TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextLong(1L, this.interval.roundTo(TimeUnit.SECONDS)));
                process();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (Throwable th) {
                log.error(th, "Error ejecting shards");
                this.jobErrors.update(1L);
            }
        }, 0L, this.interval.toMillis(), TimeUnit.MILLISECONDS);
    }

    @VisibleForTesting
    void process() {
        Preconditions.checkState(this.backupStore.isPresent(), "backup store must be present");
        Map<String, Long> nodeBytes = this.shardManager.getNodeBytes();
        Set set = (Set) this.nodeSupplier.getWorkerNodes().stream().map((v0) -> {
            return v0.getNodeIdentifier();
        }).collect(Collectors.toSet());
        set.getClass();
        HashMap hashMap = new HashMap(Maps.filterKeys(nodeBytes, (v1) -> {
            return r3.contains(v1);
        }));
        if (!hashMap.isEmpty() && hashMap.containsKey(this.currentNode)) {
            long longValue = ((Long) hashMap.get(this.currentNode)).longValue();
            long round = Math.round(hashMap.values().stream().mapToLong((v0) -> {
                return v0.longValue();
            }).average().getAsDouble());
            long round2 = Math.round(round * 1.01d);
            if (longValue <= round2) {
                return;
            }
            HashMap hashMap2 = new HashMap(Maps.filterValues(hashMap, l -> {
                return l.longValue() <= round;
            }));
            ArrayDeque arrayDeque = new ArrayDeque((List) this.shardManager.getNodeShards(this.currentNode).stream().filter(shardMetadata -> {
                return !shardMetadata.getBucketNumber().isPresent();
            }).sorted(Comparator.comparingLong((v0) -> {
                return v0.getCompressedSize();
            }).reversed()).collect(Collectors.toList()));
            while (longValue > round2 && !arrayDeque.isEmpty()) {
                ShardMetadata shardMetadata2 = (ShardMetadata) arrayDeque.remove();
                long compressedSize = shardMetadata2.getCompressedSize();
                UUID shardUuid = shardMetadata2.getShardUuid();
                if (!this.backupStore.get().shardExists(shardUuid)) {
                    log.warn("No backup for shard: %s", new Object[]{shardUuid});
                }
                String pickTargetNode = pickTargetNode(hashMap2, compressedSize, round);
                if (pickTargetNode == null) {
                    return;
                }
                long longValue2 = ((Long) hashMap2.get(pickTargetNode)).longValue();
                log.info("Moving shard %s to node %s (shard: %s, node: %s, average: %s, target: %s)", new Object[]{shardUuid, pickTargetNode, Long.valueOf(compressedSize), Long.valueOf(longValue), Long.valueOf(round), Long.valueOf(longValue2)});
                this.shardsEjected.update(1L);
                hashMap2.put(pickTargetNode, Long.valueOf(longValue2 + compressedSize));
                longValue -= compressedSize;
                this.shardManager.assignShard(shardMetadata2.getTableId(), shardUuid, pickTargetNode, false);
                this.shardManager.unassignShard(shardMetadata2.getTableId(), shardUuid, this.currentNode);
                File storageFile = this.storageService.getStorageFile(shardUuid);
                if (storageFile.exists() && !storageFile.delete()) {
                    log.warn("Failed to delete shard file: %s", new Object[]{storageFile});
                }
            }
        }
    }

    private static String pickTargetNode(Map<String, Long> map, long j, long j2) {
        while (!map.isEmpty()) {
            String pickCandidateNode = pickCandidateNode(map);
            if (map.get(pickCandidateNode).longValue() + j <= j2) {
                return pickCandidateNode;
            }
            map.remove(pickCandidateNode);
        }
        return null;
    }

    private static String pickCandidateNode(Map<String, Long> map) {
        Preconditions.checkArgument(!map.isEmpty());
        if (map.size() == 1) {
            return map.keySet().iterator().next();
        }
        ArrayList arrayList = new ArrayList(map.keySet());
        Collections.shuffle(arrayList);
        String str = (String) arrayList.get(0);
        String str2 = (String) arrayList.get(1);
        return map.get(str).longValue() <= map.get(str2).longValue() ? str : str2;
    }
}
