package org.apache.pulsar.broker.admin;

import com.google.common.collect.Sets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.protocol.MockProtocolHandler;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/admin/AdminApiOffloadTest.class */
public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
    private final String testTenant = "prop-xyz";
    private final String testNamespace = "ns1";
    private final String myNamespace = "prop-xyz/ns1";
    private final String testTopic = "persistent://prop-xyz/ns1/test-";

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        this.conf.setManagedLedgerMaxEntriesPerLedger(10);
        this.conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
        this.conf.setSystemTopicEnabled(true);
        this.conf.setTopicLevelPoliciesEnabled(true);
        super.internalSetup();
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("prop-xyz", new TenantInfoImpl(Sets.newHashSet(new String[]{"role1", "role2"}), Sets.newHashSet(new String[]{"test"})));
        this.admin.namespaces().createNamespace("prop-xyz/ns1", Sets.newHashSet(new String[]{"test"}));
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    private void testOffload(String str, String str2) throws Exception {
        LedgerOffloader ledgerOffloader = (LedgerOffloader) Mockito.mock(LedgerOffloader.class);
        Mockito.when(ledgerOffloader.getOffloadDriverName()).thenReturn(MockProtocolHandler.NAME);
        ((PulsarService) Mockito.doReturn(ledgerOffloader).when(this.pulsar)).getManagedLedgerOffloader((NamespaceName) ArgumentMatchers.any(), (OffloadPoliciesImpl) ArgumentMatchers.any());
        CompletableFuture completableFuture = new CompletableFuture();
        ((LedgerOffloader) Mockito.doReturn(completableFuture).when(ledgerOffloader)).offload((ReadHandle) ArgumentMatchers.any(), (UUID) ArgumentMatchers.any(), (Map) ArgumentMatchers.any());
        MessageId messageId = MessageId.latest;
        Producer create = this.pulsarClient.newProducer().topic(str).enableBatching(false).create();
        Throwable th = null;
        for (int i = 0; i < 15; i++) {
            try {
                try {
                    messageId = create.send("Foobar".getBytes());
                } finally {
                }
            } catch (Throwable th2) {
                if (create != null) {
                    if (th != null) {
                        try {
                            create.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        create.close();
                    }
                }
                throw th2;
            }
        }
        if (create != null) {
            if (0 != 0) {
                try {
                    create.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                create.close();
            }
        }
        ManagedLedgerInfo managedLedgerInfo = this.pulsar.getManagedLedgerFactory().getManagedLedgerInfo(str2);
        Assert.assertEquals(managedLedgerInfo.ledgers.size(), 2);
        Assert.assertEquals(this.admin.topics().offloadStatus(str).getStatus(), LongRunningProcessStatus.Status.NOT_RUN);
        this.admin.topics().triggerOffload(str, messageId);
        Assert.assertEquals(this.admin.topics().offloadStatus(str).getStatus(), LongRunningProcessStatus.Status.RUNNING);
        try {
            this.admin.topics().triggerOffload(str, messageId);
            Assert.fail("Should have failed");
        } catch (PulsarAdminException.ConflictException e) {
        }
        completableFuture.completeExceptionally(new Exception("Some random failure"));
        Assert.assertEquals(this.admin.topics().offloadStatus(str).getStatus(), LongRunningProcessStatus.Status.ERROR);
        Assert.assertTrue(this.admin.topics().offloadStatus(str).getLastError().contains("Some random failure"));
        ((LedgerOffloader) Mockito.doReturn(CompletableFuture.completedFuture(null)).when(ledgerOffloader)).offload((ReadHandle) ArgumentMatchers.any(), (UUID) ArgumentMatchers.any(), (Map) ArgumentMatchers.any());
        this.admin.topics().triggerOffload(str, messageId);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().offloadStatus(str).getStatus(), LongRunningProcessStatus.Status.SUCCESS);
        });
        MessageIdImpl firstUnoffloadedMessage = this.admin.topics().offloadStatus(str).getFirstUnoffloadedMessage();
        Assert.assertTrue(firstUnoffloadedMessage instanceof MessageIdImpl);
        MessageIdImpl messageIdImpl = firstUnoffloadedMessage;
        Assert.assertEquals(messageIdImpl.getLedgerId(), ((ManagedLedgerInfo.LedgerInfo) managedLedgerInfo.ledgers.get(1)).ledgerId);
        Assert.assertEquals(messageIdImpl.getEntryId(), 0L);
        ((LedgerOffloader) Mockito.verify(ledgerOffloader, Mockito.times(2))).offload((ReadHandle) ArgumentMatchers.any(), (UUID) ArgumentMatchers.any(), (Map) ArgumentMatchers.any());
    }

    @Test
    public void testOffloadV2() throws Exception {
        testOffload("persistent://prop-xyz/ns1/topic1", "prop-xyz/ns1/persistent/topic1");
    }

    @Test
    public void testOffloadV1() throws Exception {
        testOffload("persistent://prop-xyz/test/ns1/topic2", "prop-xyz/test/ns1/persistent/topic2");
    }

    @Test
    public void testOffloadPolicies() throws Exception {
        OffloadPoliciesImpl create = OffloadPoliciesImpl.create("aws-s3", "test-region", "test-bucket", "test-endpoint", (String) null, (String) null, (String) null, (String) null, 100, 100, 0L, 100L, OffloadedReadPriority.TIERED_STORAGE_FIRST);
        this.admin.namespaces().setOffloadPolicies("prop-xyz/ns1", create);
        Assert.assertEquals(create, this.admin.namespaces().getOffloadPolicies("prop-xyz/ns1"));
        this.admin.namespaces().removeOffloadPolicies("prop-xyz/ns1");
        Assert.assertNull(this.admin.namespaces().getOffloadPolicies("prop-xyz/ns1"));
    }

    @Test
    public void testOffloadPoliciesApi() throws Exception {
        String str = "persistent://prop-xyz/ns1/test-" + UUID.randomUUID().toString();
        this.admin.topics().createPartitionedTopic(str, 3);
        this.pulsarClient.newProducer().topic(str).create().close();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        Assert.assertNull(this.admin.topics().getOffloadPolicies(str));
        OffloadPoliciesImpl offloadPoliciesImpl = new OffloadPoliciesImpl();
        offloadPoliciesImpl.setFileSystemProfilePath("fileSystemPath");
        this.admin.topics().setOffloadPolicies(str, offloadPoliciesImpl);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.topics().getOffloadPolicies(str));
        });
        Assert.assertEquals(this.admin.topics().getOffloadPolicies(str), offloadPoliciesImpl);
        Assert.assertEquals(this.admin.topics().getOffloadPolicies(str).getFileSystemProfilePath(), "fileSystemPath");
        this.admin.topics().removeOffloadPolicies(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getOffloadPolicies(str));
        });
        Assert.assertNull(this.admin.topics().getOffloadPolicies(str));
    }

    @Test
    public void testOffloadPoliciesAppliedApi() throws Exception {
        String str = "persistent://prop-xyz/ns1/test-" + UUID.randomUUID().toString();
        this.admin.topics().createPartitionedTopic(str, 3);
        this.pulsarClient.newProducer().topic(str).create().close();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        OffloadPoliciesImpl offloadPolicies = this.admin.topics().getOffloadPolicies(str, true);
        OffloadPoliciesImpl mergeConfiguration = OffloadPoliciesImpl.mergeConfiguration((OffloadPoliciesImpl) null, (OffloadPoliciesImpl) null, this.pulsar.getConfiguration().getProperties());
        Assert.assertEquals(offloadPolicies, mergeConfiguration);
        LedgerOffloader ledgerOffloader = (LedgerOffloader) Mockito.mock(LedgerOffloader.class);
        Mockito.when(ledgerOffloader.getOffloadDriverName()).thenReturn(MockProtocolHandler.NAME);
        ((PulsarService) Mockito.doReturn(ledgerOffloader).when(this.pulsar)).createManagedLedgerOffloader((OffloadPoliciesImpl) ArgumentMatchers.any());
        OffloadPoliciesImpl offloadPoliciesImpl = new OffloadPoliciesImpl();
        offloadPoliciesImpl.setManagedLedgerOffloadThresholdInBytes(100L);
        offloadPoliciesImpl.setManagedLedgerOffloadDeletionLagInMillis(200L);
        offloadPoliciesImpl.setManagedLedgerOffloadDriver("s3");
        offloadPoliciesImpl.setManagedLedgerOffloadBucket("buck");
        this.admin.namespaces().setOffloadPolicies("prop-xyz/ns1", offloadPoliciesImpl);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.namespaces().getOffloadPolicies("prop-xyz/ns1"), offloadPoliciesImpl);
        });
        Assert.assertEquals(this.admin.topics().getOffloadPolicies(str, true), offloadPoliciesImpl);
        OffloadPoliciesImpl offloadPoliciesImpl2 = new OffloadPoliciesImpl();
        offloadPoliciesImpl2.setManagedLedgerOffloadThresholdInBytes(200L);
        offloadPoliciesImpl2.setManagedLedgerOffloadDeletionLagInMillis(400L);
        offloadPoliciesImpl2.setManagedLedgerOffloadDriver("s3");
        offloadPoliciesImpl2.setManagedLedgerOffloadBucket("buck2");
        this.admin.topics().setOffloadPolicies(str, offloadPoliciesImpl2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getOffloadPolicies(str, true), offloadPoliciesImpl2);
        });
        this.admin.topics().removeOffloadPolicies(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getOffloadPolicies(str, true), offloadPoliciesImpl);
        });
        this.admin.namespaces().removeOffloadPolicies("prop-xyz/ns1");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getOffloadPolicies(str, true), mergeConfiguration);
        });
    }

    @Test
    public void testTopicLevelOffloadPartitioned() throws Exception {
        testOffload(true);
    }

    @Test
    public void testTopicLevelOffloadNonPartitioned() throws Exception {
        testOffload(false);
    }

    private void testOffload(boolean z) throws Exception {
        String str = "persistent://prop-xyz/ns1/test-" + UUID.randomUUID().toString();
        if (z) {
            this.admin.topics().createPartitionedTopic(str, 3);
        } else {
            this.admin.topics().createNonPartitionedTopic(str);
        }
        this.pulsarClient.newProducer().topic(str).enableBatching(false).create().close();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        if (z) {
            for (int i = 0; i < 3; i++) {
                PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(TopicName.get(str).getPartition(i).toString()).get()).get();
                Assert.assertNotNull(persistentTopic.getManagedLedger().getConfig().getLedgerOffloader());
                Assert.assertEquals(persistentTopic.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName(), "NullLedgerOffloader");
            }
        } else {
            PersistentTopic persistentTopic2 = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic(str, false).get()).get();
            Assert.assertNotNull(persistentTopic2.getManagedLedger().getConfig().getLedgerOffloader());
            Assert.assertEquals(persistentTopic2.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName(), "NullLedgerOffloader");
        }
        OffloadPoliciesImpl offloadPoliciesImpl = new OffloadPoliciesImpl();
        offloadPoliciesImpl.setOffloadersDirectory(".");
        offloadPoliciesImpl.setManagedLedgerOffloadDriver(MockProtocolHandler.NAME);
        offloadPoliciesImpl.setManagedLedgerOffloadPrefetchRounds(10);
        offloadPoliciesImpl.setManagedLedgerOffloadThresholdInBytes(1024L);
        LedgerOffloader ledgerOffloader = (LedgerOffloader) Mockito.mock(LedgerOffloader.class);
        Mockito.when(ledgerOffloader.getOffloadDriverName()).thenReturn(MockProtocolHandler.NAME);
        ((PulsarService) Mockito.doReturn(ledgerOffloader).when(this.pulsar)).createManagedLedgerOffloader((OffloadPoliciesImpl) ArgumentMatchers.any());
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        this.admin.topics().setOffloadPolicies(str, offloadPoliciesImpl);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.topics().getOffloadPolicies(str));
        });
        if (z) {
            for (int i2 = 0; i2 < 3; i2++) {
                PersistentTopic persistentTopic3 = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic(TopicName.get(str).getPartition(i2).toString(), false).get()).get();
                Assert.assertNotNull(persistentTopic3.getManagedLedger().getConfig().getLedgerOffloader());
                Assert.assertEquals(persistentTopic3.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName(), MockProtocolHandler.NAME);
            }
        } else {
            PersistentTopic persistentTopic4 = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic(str, false).get()).get();
            Assert.assertNotNull(persistentTopic4.getManagedLedger().getConfig().getLedgerOffloader());
            Assert.assertEquals(persistentTopic4.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName(), MockProtocolHandler.NAME);
        }
        LedgerOffloader ledgerOffloader2 = (LedgerOffloader) Mockito.mock(LedgerOffloader.class);
        Mockito.when(ledgerOffloader2.getOffloadDriverName()).thenReturn("s3");
        HashMap hashMap = new HashMap();
        hashMap.put(TopicName.get(str).getNamespaceObject(), ledgerOffloader2);
        ((PulsarService) Mockito.doReturn(hashMap).when(this.pulsar)).getLedgerOffloaderMap();
        ((PulsarService) Mockito.doReturn(ledgerOffloader2).when(this.pulsar)).getManagedLedgerOffloader(TopicName.get(str).getNamespaceObject(), (OffloadPoliciesImpl) null);
        this.admin.topics().removeOffloadPolicies(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getOffloadPolicies(str));
        });
        if (z) {
            ((LedgerOffloader) Mockito.verify(ledgerOffloader, Mockito.times(3))).close();
        } else {
            ((LedgerOffloader) Mockito.verify(ledgerOffloader)).close();
        }
        if (!z) {
            PersistentTopic persistentTopic5 = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic(str, false).get()).get();
            Assert.assertNotNull(persistentTopic5.getManagedLedger().getConfig().getLedgerOffloader());
            Assert.assertEquals(persistentTopic5.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName(), "s3");
        } else {
            for (int i3 = 0; i3 < 3; i3++) {
                PersistentTopic persistentTopic6 = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(TopicName.get(str).getPartition(i3).toString()).get()).get();
                Assert.assertNotNull(persistentTopic6.getManagedLedger().getConfig().getLedgerOffloader());
                Assert.assertEquals(persistentTopic6.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName(), "s3");
            }
        }
    }
}
