package com.facebook.presto.raptor.storage;

import com.facebook.presto.client.NodeVersion;
import com.facebook.presto.metadata.PrestoNode;
import com.facebook.presto.raptor.NodeSupplier;
import com.facebook.presto.raptor.metadata.BucketNode;
import com.facebook.presto.raptor.metadata.ColumnInfo;
import com.facebook.presto.raptor.metadata.Distribution;
import com.facebook.presto.raptor.metadata.MetadataDao;
import com.facebook.presto.raptor.metadata.SchemaDaoUtil;
import com.facebook.presto.raptor.metadata.ShardManager;
import com.facebook.presto.raptor.metadata.TestDatabaseShardManager;
import com.facebook.presto.raptor.storage.BucketBalancer;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.type.BigintType;
import com.facebook.presto.testing.TestingNodeManager;
import com.facebook.presto.type.TypeRegistry;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableList;
import io.airlift.testing.Assertions;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.net.URI;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:com/facebook/presto/raptor/storage/TestBucketBalancer.class */
public class TestBucketBalancer {
    private static final List<String> AVAILABLE_WORKERS = ImmutableList.of("node1", "node2", "node3", "node4", "node5");
    private DBI dbi;
    private Handle dummyHandle;
    private ShardManager shardManager;
    private TestingNodeManager nodeManager;
    private MetadataDao metadataDao;
    private BucketBalancer balancer;

    @BeforeMethod
    public void setup() throws Exception {
        TypeRegistry typeRegistry = new TypeRegistry();
        this.dbi = new DBI("jdbc:h2:mem:test" + System.nanoTime());
        this.dbi.registerMapper(new Distribution.Mapper(typeRegistry));
        this.dummyHandle = this.dbi.open();
        SchemaDaoUtil.createTablesWithRetry(this.dbi);
        this.metadataDao = (MetadataDao) this.dbi.onDemand(MetadataDao.class);
        this.nodeManager = new TestingNodeManager((List) AVAILABLE_WORKERS.stream().map(TestBucketBalancer::createTestingNode).collect(Collectors.toList()));
        TestingNodeManager testingNodeManager = this.nodeManager;
        testingNodeManager.getClass();
        NodeSupplier nodeSupplier = testingNodeManager::getWorkerNodes;
        this.shardManager = TestDatabaseShardManager.createShardManager(this.dbi, nodeSupplier);
        this.balancer = new BucketBalancer(nodeSupplier, this.shardManager, true, new Duration(1.0d, TimeUnit.DAYS), true, true, "test");
    }

    @AfterMethod(alwaysRun = true)
    public void teardown() {
        if (this.dummyHandle != null) {
            this.dummyHandle.close();
        }
    }

    @Test
    public void testSingleDistributionUnbalanced() throws Exception {
        long createDistribution = createDistribution("distA", 16);
        createBucketedTable("testA", createDistribution);
        createBucketedTable("testB", createDistribution);
        createAssignments(createDistribution, AVAILABLE_WORKERS, 10, 3, 1, 1, 1);
        assertBalancing(this.balancer, 6);
    }

    @Test
    public void testSingleDistributionSlightlyUnbalanced() throws Exception {
        long createDistribution = createDistribution("distA", 16);
        createBucketedTable("testA", createDistribution);
        createBucketedTable("testB", createDistribution);
        createAssignments(createDistribution, AVAILABLE_WORKERS, 4, 4, 3, 3, 2);
        assertBalancing(this.balancer, 1);
    }

    @Test
    public void testSingleDistributionBalanced() throws Exception {
        long createDistribution = createDistribution("distA", 16);
        createBucketedTable("testA", createDistribution);
        createBucketedTable("testB", createDistribution);
        createAssignments(createDistribution, AVAILABLE_WORKERS, 4, 3, 3, 3, 3);
        assertBalancing(this.balancer, 0);
    }

    @Test
    public void testSingleDistributionUnbalancedWithDeadNode() throws Exception {
        long createDistribution = createDistribution("distA", 16);
        createBucketedTable("testA", createDistribution);
        createBucketedTable("testB", createDistribution);
        createAssignments(createDistribution, ImmutableList.builder().addAll(AVAILABLE_WORKERS).add("node6").build(), 11, 1, 1, 1, 1, 1);
        assertBalancing(this.balancer, 8);
    }

    @Test
    public void testSingleDistributionUnbalancedWithNewNode() throws Exception {
        long createDistribution = createDistribution("distA", 16);
        createBucketedTable("testA", createDistribution);
        createBucketedTable("testB", createDistribution);
        createAssignments(createDistribution, AVAILABLE_WORKERS, 12, 1, 1, 1, 1);
        this.nodeManager.addNode(createTestingNode("node6"));
        assertBalancing(this.balancer, 9);
    }

    @Test
    public void testMultipleDistributionUnbalanced() throws Exception {
        long createDistribution = createDistribution("distA", 17);
        createBucketedTable("testA", createDistribution);
        createAssignments(createDistribution, AVAILABLE_WORKERS, 11, 3, 1, 1, 1);
        long createDistribution2 = createDistribution("distB", 10);
        createBucketedTable("testB", createDistribution2);
        createAssignments(createDistribution2, AVAILABLE_WORKERS, 8, 2, 0, 0, 0);
        long createDistribution3 = createDistribution("distC", 4);
        createBucketedTable("testC", createDistribution3);
        createAssignments(createDistribution3, AVAILABLE_WORKERS, 2, 2, 0, 0, 0);
        assertBalancing(this.balancer, 15);
    }

    @Test
    public void testMultipleDistributionUnbalancedWithDiskSpace() throws Exception {
        long createDistribution = createDistribution("distA", 4);
        createBucketedTable("testA", createDistribution, DataSize.valueOf("4B"));
        createAssignments(createDistribution, AVAILABLE_WORKERS, 1, 1, 1, 1, 0);
        long createDistribution2 = createDistribution("distB", 4);
        createBucketedTable("testB", createDistribution2, DataSize.valueOf("4B"));
        createAssignments(createDistribution2, AVAILABLE_WORKERS, 1, 1, 1, 0, 1);
        long createDistribution3 = createDistribution("distC", 2);
        createBucketedTable("testC", createDistribution3, DataSize.valueOf("2B"));
        createAssignments(createDistribution3, AVAILABLE_WORKERS, 0, 0, 0, 2, 0);
        assertBalancing(this.balancer, 1);
        Assert.assertEquals(this.balancer.fetchClusterState().getAssignedBytes().values().stream().distinct().count(), 1L);
    }

    @Test
    public void testMultipleDistributionUnbalancedWithDiskSpace2() throws Exception {
        long createDistribution = createDistribution("distA", 4);
        createBucketedTable("testA", createDistribution, DataSize.valueOf("4B"));
        createAssignments(createDistribution, AVAILABLE_WORKERS, 1, 1, 1, 1, 0);
        long createDistribution2 = createDistribution("distB", 4);
        createBucketedTable("testB", createDistribution2, DataSize.valueOf("4B"));
        createAssignments(createDistribution2, AVAILABLE_WORKERS, 2, 1, 1, 0, 0);
        assertBalancing(this.balancer, 1);
    }

    @Test
    public void testMultipleDistributionUnbalancedWorstCase() throws Exception {
        long createDistribution = createDistribution("distA", 4);
        createBucketedTable("testA", createDistribution, DataSize.valueOf("4B"));
        createAssignments(createDistribution, AVAILABLE_WORKERS, 4, 0, 0, 0, 0);
        long createDistribution2 = createDistribution("distB", 4);
        createBucketedTable("testB", createDistribution2, DataSize.valueOf("4B"));
        createAssignments(createDistribution2, AVAILABLE_WORKERS, 4, 0, 0, 0, 0);
        long createDistribution3 = createDistribution("distC", 4);
        createBucketedTable("testC", createDistribution3, DataSize.valueOf("4B"));
        createAssignments(createDistribution3, AVAILABLE_WORKERS, 4, 0, 0, 0, 0);
        long createDistribution4 = createDistribution("distD", 4);
        createBucketedTable("testD", createDistribution4, DataSize.valueOf("4B"));
        createAssignments(createDistribution4, AVAILABLE_WORKERS, 4, 0, 0, 0, 0);
        long createDistribution5 = createDistribution("distE", 4);
        createBucketedTable("testE", createDistribution5, DataSize.valueOf("4B"));
        createAssignments(createDistribution5, AVAILABLE_WORKERS, 4, 0, 0, 0, 0);
        assertBalancing(this.balancer, 15);
    }

    private static void assertBalancing(BucketBalancer bucketBalancer, int i) {
        Assert.assertEquals(bucketBalancer.process(), i);
        BucketBalancer.ClusterState fetchClusterState = bucketBalancer.fetchClusterState();
        for (Distribution distribution : fetchClusterState.getDistributionAssignments().keySet()) {
            HashMultiset<String> create = HashMultiset.create();
            Stream map = fetchClusterState.getDistributionAssignments().get(distribution).stream().map((v0) -> {
                return v0.getNodeIdentifier();
            });
            create.getClass();
            map.forEach((v1) -> {
                r1.add(v1);
            });
            double size = (1.0d * create.size()) / fetchClusterState.getActiveNodes().size();
            for (String str : create) {
                Assertions.assertGreaterThanOrEqual(Integer.valueOf(create.count(str)), Integer.valueOf((int) Math.floor(size)), str + " has fewer buckets than expected");
                Assertions.assertLessThanOrEqual(Integer.valueOf(create.count(str)), Integer.valueOf((int) Math.ceil(size)), str + " has more buckets than expected");
            }
        }
        Assert.assertEquals(bucketBalancer.process(), 0);
    }

    private long createDistribution(String str, int i) {
        long insertDistribution = ((MetadataDao) this.dbi.onDemand(MetadataDao.class)).insertDistribution(str, Distribution.serializeColumnTypes(ImmutableList.of(BigintType.BIGINT)), i);
        this.shardManager.createBuckets(insertDistribution, i);
        return insertDistribution;
    }

    private long createBucketedTable(String str, long j) {
        return createBucketedTable(str, j, DataSize.valueOf("0B"));
    }

    private long createBucketedTable(String str, long j, DataSize dataSize) {
        long insertTable = ((MetadataDao) this.dbi.onDemand(MetadataDao.class)).insertTable("test", str, false, false, Long.valueOf(j), 0L);
        this.shardManager.createTable(insertTable, ImmutableList.of(new ColumnInfo(1L, BigintType.BIGINT)), false, OptionalLong.empty());
        this.metadataDao.updateTableStats(insertTable, 1024L, 1073741824L, dataSize.toBytes(), dataSize.toBytes() * 2);
        return insertTable;
    }

    private List<BucketNode> createAssignments(long j, List<String> list, int... iArr) {
        Preconditions.checkArgument(list.size() == iArr.length);
        ImmutableList.Builder builder = ImmutableList.builder();
        int i = 0;
        for (int i2 = 0; i2 < iArr.length; i2++) {
            for (int i3 = 0; i3 < iArr[i2]; i3++) {
                this.shardManager.updateBucketAssignment(j, i, list.get(i2));
                builder.add(bucketNode(i, list.get(i2)));
                i++;
            }
        }
        return builder.build();
    }

    private static BucketNode bucketNode(int i, String str) {
        return new BucketNode(i, str);
    }

    private static Node createTestingNode(String str) {
        return new PrestoNode(str, URI.create("http://test"), NodeVersion.UNKNOWN, false);
    }
}
