package org.apache.spark.network.shuffle;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.spark.network.TestUtils;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.class */
public class ExternalShuffleIntegrationSuite {
    private static final String APP_ID = "app-id";
    private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager";
    private static final int RDD_ID = 1;
    private static final int SPLIT_INDEX_VALID_BLOCK = 0;
    private static final int SPLIT_INDEX_MISSING_FILE = 1;
    private static final int SPLIT_INDEX_CORRUPT_LENGTH = 2;
    private static final int SPLIT_INDEX_VALID_BLOCK_TO_RM = 3;
    private static final int SPLIT_INDEX_MISSING_BLOCK_TO_RM = 4;
    static TestShuffleDataContext dataContext0;
    static ExternalBlockHandler handler;
    static TransportServer server;
    static TransportConf conf;
    static TransportContext transportContext;
    static byte[] exec0RddBlockValid = new byte[123];
    static byte[] exec0RddBlockToRemove = new byte[124];
    static byte[][] exec0Blocks = {new byte[123], new byte[12345], new byte[1234567]};
    static byte[][] exec1Blocks = {new byte[321], new byte[54321]};

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite$FetchResult.class */
    public static class FetchResult {
        public Set<String> successBlocks;
        public Set<String> failedBlocks;
        public List<ManagedBuffer> buffers;

        FetchResult() {
        }

        public void releaseBuffers() {
            Iterator<ManagedBuffer> it = this.buffers.iterator();
            while (it.hasNext()) {
                it.next().release();
            }
        }
    }

    @BeforeClass
    public static void beforeAll() throws IOException {
        Random random = new Random();
        byte[][] bArr = exec0Blocks;
        int length = bArr.length;
        for (int i = SPLIT_INDEX_VALID_BLOCK; i < length; i++) {
            random.nextBytes(bArr[i]);
        }
        byte[][] bArr2 = exec1Blocks;
        int length2 = bArr2.length;
        for (int i2 = SPLIT_INDEX_VALID_BLOCK; i2 < length2; i2++) {
            random.nextBytes(bArr2[i2]);
        }
        random.nextBytes(exec0RddBlockValid);
        random.nextBytes(exec0RddBlockToRemove);
        dataContext0 = new TestShuffleDataContext(SPLIT_INDEX_CORRUPT_LENGTH, 5);
        dataContext0.create();
        dataContext0.insertSortShuffleData(SPLIT_INDEX_VALID_BLOCK, SPLIT_INDEX_VALID_BLOCK, exec0Blocks);
        dataContext0.insertCachedRddData(1, SPLIT_INDEX_VALID_BLOCK, exec0RddBlockValid);
        dataContext0.insertCachedRddData(1, SPLIT_INDEX_VALID_BLOCK_TO_RM, exec0RddBlockToRemove);
        HashMap hashMap = new HashMap();
        hashMap.put("spark.shuffle.io.maxRetries", "0");
        hashMap.put("spark.shuffle.service.fetch.rdd.enabled", "true");
        conf = new TransportConf("shuffle", new MapConfigProvider(hashMap));
        handler = new ExternalBlockHandler(new OneForOneStreamManager(), new ExternalShuffleBlockResolver(conf, null) { // from class: org.apache.spark.network.shuffle.ExternalShuffleIntegrationSuite.1
            public ManagedBuffer getRddBlockData(String str, String str2, int i3, int i4) {
                FileSegmentManagedBuffer rddBlockData;
                if (i3 == 1) {
                    switch (i4) {
                        case ExternalShuffleIntegrationSuite.SPLIT_INDEX_CORRUPT_LENGTH /* 2 */:
                            rddBlockData = new FileSegmentManagedBuffer(ExternalShuffleIntegrationSuite.conf, new File("missing.file"), 0L, 12L);
                            break;
                        default:
                            rddBlockData = super.getRddBlockData(str, str2, i3, i4);
                            break;
                    }
                } else {
                    rddBlockData = super.getRddBlockData(str, str2, i3, i4);
                }
                return rddBlockData;
            }
        });
        transportContext = new TransportContext(conf, handler);
        server = transportContext.createServer();
    }

    @AfterClass
    public static void afterAll() {
        dataContext0.cleanup();
        server.close();
        transportContext.close();
    }

    @After
    public void afterEach() {
        handler.applicationRemoved(APP_ID, false);
    }

    private FetchResult fetchBlocks(String str, String[] strArr) throws Exception {
        return fetchBlocks(str, strArr, conf, server.getPort());
    }

    private FetchResult fetchBlocks(String str, String[] strArr, TransportConf transportConf, int i) throws Exception {
        final FetchResult fetchResult = new FetchResult();
        fetchResult.successBlocks = Collections.synchronizedSet(new HashSet());
        fetchResult.failedBlocks = Collections.synchronizedSet(new HashSet());
        fetchResult.buffers = Collections.synchronizedList(new LinkedList());
        final Semaphore semaphore = new Semaphore(SPLIT_INDEX_VALID_BLOCK);
        ExternalBlockStoreClient externalBlockStoreClient = new ExternalBlockStoreClient(transportConf, (SecretKeyHolder) null, false, 5000L);
        Throwable th = SPLIT_INDEX_VALID_BLOCK;
        try {
            try {
                externalBlockStoreClient.init(APP_ID);
                externalBlockStoreClient.fetchBlocks(TestUtils.getLocalHost(), i, str, strArr, new BlockFetchingListener() { // from class: org.apache.spark.network.shuffle.ExternalShuffleIntegrationSuite.2
                    public void onBlockFetchSuccess(String str2, ManagedBuffer managedBuffer) {
                        synchronized (this) {
                            if (!fetchResult.successBlocks.contains(str2) && !fetchResult.failedBlocks.contains(str2)) {
                                managedBuffer.retain();
                                fetchResult.successBlocks.add(str2);
                                fetchResult.buffers.add(managedBuffer);
                                semaphore.release();
                            }
                        }
                    }

                    public void onBlockFetchFailure(String str2, Throwable th2) {
                        synchronized (this) {
                            if (!fetchResult.successBlocks.contains(str2) && !fetchResult.failedBlocks.contains(str2)) {
                                fetchResult.failedBlocks.add(str2);
                                semaphore.release();
                            }
                        }
                    }
                }, (DownloadFileManager) null);
                if (!semaphore.tryAcquire(strArr.length, 5L, TimeUnit.SECONDS)) {
                    Assert.fail("Timeout getting response from the server");
                }
                if (externalBlockStoreClient != null) {
                    if (th != null) {
                        try {
                            externalBlockStoreClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        externalBlockStoreClient.close();
                    }
                }
                return fetchResult;
            } finally {
            }
        } catch (Throwable th3) {
            if (externalBlockStoreClient != null) {
                if (th != null) {
                    try {
                        externalBlockStoreClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    externalBlockStoreClient.close();
                }
            }
            throw th3;
        }
    }

    /* JADX WARN: Type inference failed for: r1v7, types: [java.lang.Object[], byte[]] */
    @Test
    public void testFetchOneSort() throws Exception {
        registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
        FetchResult fetchBlocks = fetchBlocks("exec-0", new String[]{"shuffle_0_0_0"});
        Assert.assertEquals(Sets.newHashSet(new String[]{"shuffle_0_0_0"}), fetchBlocks.successBlocks);
        Assert.assertTrue(fetchBlocks.failedBlocks.isEmpty());
        assertBufferListsEqual(fetchBlocks.buffers, Arrays.asList(new byte[]{exec0Blocks[SPLIT_INDEX_VALID_BLOCK]}));
        fetchBlocks.releaseBuffers();
    }

    @Test
    public void testFetchThreeSort() throws Exception {
        registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
        FetchResult fetchBlocks = fetchBlocks("exec-0", new String[]{"shuffle_0_0_0", "shuffle_0_0_1", "shuffle_0_0_2"});
        Assert.assertEquals(Sets.newHashSet(new String[]{"shuffle_0_0_0", "shuffle_0_0_1", "shuffle_0_0_2"}), fetchBlocks.successBlocks);
        Assert.assertTrue(fetchBlocks.failedBlocks.isEmpty());
        assertBufferListsEqual(fetchBlocks.buffers, Arrays.asList(exec0Blocks));
        fetchBlocks.releaseBuffers();
    }

    @Test
    public void testRegisterWithCustomShuffleManager() throws Exception {
        registerExecutor("exec-1", dataContext0.createExecutorInfo("custom shuffle manager"));
    }

    @Test
    public void testFetchWrongBlockId() throws Exception {
        registerExecutor("exec-1", dataContext0.createExecutorInfo(SORT_MANAGER));
        FetchResult fetchBlocks = fetchBlocks("exec-1", new String[]{"broadcast_1"});
        Assert.assertTrue(fetchBlocks.successBlocks.isEmpty());
        Assert.assertEquals(Sets.newHashSet(new String[]{"broadcast_1"}), fetchBlocks.failedBlocks);
    }

    @Test
    public void testFetchValidRddBlock() throws Exception {
        registerExecutor("exec-1", dataContext0.createExecutorInfo(SORT_MANAGER));
        FetchResult fetchBlocks = fetchBlocks("exec-1", new String[]{"rdd_1_0"});
        Assert.assertTrue(fetchBlocks.failedBlocks.isEmpty());
        Assert.assertEquals(Sets.newHashSet(new String[]{"rdd_1_0"}), fetchBlocks.successBlocks);
        assertBuffersEqual(new NioManagedBuffer(ByteBuffer.wrap(exec0RddBlockValid)), fetchBlocks.buffers.get(SPLIT_INDEX_VALID_BLOCK));
    }

    @Test
    public void testFetchDeletedRddBlock() throws Exception {
        registerExecutor("exec-1", dataContext0.createExecutorInfo(SORT_MANAGER));
        FetchResult fetchBlocks = fetchBlocks("exec-1", new String[]{"rdd_1_1"});
        Assert.assertTrue(fetchBlocks.successBlocks.isEmpty());
        Assert.assertEquals(Sets.newHashSet(new String[]{"rdd_1_1"}), fetchBlocks.failedBlocks);
    }

    @Test
    public void testRemoveRddBlocks() throws Exception {
        registerExecutor("exec-1", dataContext0.createExecutorInfo(SORT_MANAGER));
        ExternalBlockStoreClient externalBlockStoreClient = new ExternalBlockStoreClient(conf, (SecretKeyHolder) null, false, 5000L);
        Throwable th = SPLIT_INDEX_VALID_BLOCK;
        try {
            try {
                externalBlockStoreClient.init(APP_ID);
                Assert.assertEquals(1L, ((Integer) externalBlockStoreClient.removeBlocks(TestUtils.getLocalHost(), server.getPort(), "exec-1", new String[]{"rdd_1_3", "rdd_1_4"}).get()).intValue());
                if (externalBlockStoreClient != null) {
                    if (th == null) {
                        externalBlockStoreClient.close();
                        return;
                    }
                    try {
                        externalBlockStoreClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (externalBlockStoreClient != null) {
                if (th != null) {
                    try {
                        externalBlockStoreClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    externalBlockStoreClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testFetchCorruptRddBlock() throws Exception {
        registerExecutor("exec-1", dataContext0.createExecutorInfo(SORT_MANAGER));
        FetchResult fetchBlocks = fetchBlocks("exec-1", new String[]{"rdd_1_2"});
        Assert.assertTrue(fetchBlocks.successBlocks.isEmpty());
        Assert.assertEquals(Sets.newHashSet(new String[]{"rdd_1_2"}), fetchBlocks.failedBlocks);
    }

    @Test
    public void testFetchNonexistent() throws Exception {
        registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
        FetchResult fetchBlocks = fetchBlocks("exec-0", new String[]{"shuffle_2_0_0"});
        Assert.assertTrue(fetchBlocks.successBlocks.isEmpty());
        Assert.assertEquals(Sets.newHashSet(new String[]{"shuffle_2_0_0"}), fetchBlocks.failedBlocks);
    }

    @Test
    public void testFetchWrongExecutor() throws Exception {
        registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
        FetchResult fetchBlocks = fetchBlocks("exec-0", new String[]{"shuffle_0_0_0"});
        FetchResult fetchBlocks2 = fetchBlocks("exec-0", new String[]{"shuffle_1_0_0"});
        Assert.assertEquals(Sets.newHashSet(new String[]{"shuffle_0_0_0"}), fetchBlocks.successBlocks);
        Assert.assertEquals(Sets.newHashSet(new String[]{"shuffle_1_0_0"}), fetchBlocks2.failedBlocks);
    }

    @Test
    public void testFetchUnregisteredExecutor() throws Exception {
        registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
        FetchResult fetchBlocks = fetchBlocks("exec-2", new String[]{"shuffle_0_0_0", "shuffle_1_0_0"});
        Assert.assertTrue(fetchBlocks.successBlocks.isEmpty());
        Assert.assertEquals(Sets.newHashSet(new String[]{"shuffle_0_0_0", "shuffle_1_0_0"}), fetchBlocks.failedBlocks);
    }

    @Test
    public void testFetchNoServer() throws Exception {
        TransportConf transportConf = new TransportConf("shuffle", new MapConfigProvider(ImmutableMap.of("spark.shuffle.io.maxRetries", "0")));
        registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
        FetchResult fetchBlocks = fetchBlocks("exec-0", new String[]{"shuffle_1_0_0", "shuffle_1_0_1"}, transportConf, 1);
        Assert.assertTrue(fetchBlocks.successBlocks.isEmpty());
        Assert.assertEquals(Sets.newHashSet(new String[]{"shuffle_1_0_0", "shuffle_1_0_1"}), fetchBlocks.failedBlocks);
    }

    private static void registerExecutor(String str, ExecutorShuffleInfo executorShuffleInfo) throws IOException, InterruptedException {
        ExternalBlockStoreClient externalBlockStoreClient = new ExternalBlockStoreClient(conf, (SecretKeyHolder) null, false, 5000L);
        externalBlockStoreClient.init(APP_ID);
        externalBlockStoreClient.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), str, executorShuffleInfo);
    }

    private static void assertBufferListsEqual(List<ManagedBuffer> list, List<byte[]> list2) throws Exception {
        Assert.assertEquals(list.size(), list2.size());
        for (int i = SPLIT_INDEX_VALID_BLOCK; i < list.size(); i++) {
            assertBuffersEqual(list.get(i), new NioManagedBuffer(ByteBuffer.wrap(list2.get(i))));
        }
    }

    private static void assertBuffersEqual(ManagedBuffer managedBuffer, ManagedBuffer managedBuffer2) throws Exception {
        ByteBuffer nioByteBuffer = managedBuffer.nioByteBuffer();
        ByteBuffer nioByteBuffer2 = managedBuffer2.nioByteBuffer();
        int remaining = nioByteBuffer.remaining();
        Assert.assertEquals(nioByteBuffer.remaining(), nioByteBuffer2.remaining());
        for (int i = SPLIT_INDEX_VALID_BLOCK; i < remaining; i++) {
            Assert.assertEquals(nioByteBuffer.get(), nioByteBuffer2.get());
        }
    }
}
