package org.apache.pulsar.broker.service.persistent;

import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentTopicTest.class */
public class PersistentTopicTest extends BrokerTestBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod(alwaysRun = true)
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testCleanFailedUnloadTopic() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/failedUnload").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/failedUnload").get();
        Assert.assertNotNull(persistentTopic);
        ManagedLedger managedLedger = persistentTopic.ledger;
        LedgerHandle ledgerHandle = (LedgerHandle) Mockito.mock(LedgerHandle.class);
        Field declaredField = managedLedger.getClass().getDeclaredField("currentLedger");
        declaredField.setAccessible(true);
        declaredField.set(managedLedger, ledgerHandle);
        ((LedgerHandle) Mockito.doNothing().when(ledgerHandle)).asyncClose((AsyncCallback.CloseCallback) ArgumentMatchers.any(), ArgumentMatchers.any());
        this.pulsar.getNamespaceService().unloadNamespaceBundle(this.pulsar.getNamespaceService().getBundle(TopicName.get("persistent://prop/ns-abc/failedUnload")), 5L, TimeUnit.SECONDS).get();
        retryStrategically(r4 -> {
            return !this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/failedUnload").isPresent();
        }, 5, 500L);
        Assert.assertFalse(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/failedUnload").isPresent());
        create.close();
    }

    @Test
    public void testUnblockStuckSubscription() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/ns-abc/stuckSubscriptionTopic"}).subscriptionType(SubscriptionType.Shared).subscriptionName("shared").subscribe();
        Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/ns-abc/stuckSubscriptionTopic"}).subscriptionType(SubscriptionType.Failover).subscriptionName("failOver").subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://prop/ns-abc/stuckSubscriptionTopic").create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/stuckSubscriptionTopic").get();
        PersistentSubscription subscription = persistentTopic.getSubscription("shared");
        PersistentSubscription subscription2 = persistentTopic.getSubscription("failOver");
        PersistentDispatcherMultipleConsumers dispatcher = subscription.getDispatcher();
        PersistentDispatcherSingleActiveConsumer dispatcher2 = subscription2.getDispatcher();
        subscribe.close();
        subscribe2.close();
        dispatcher.havePendingRead = true;
        dispatcher2.havePendingRead = true;
        create.newMessage().value("test").eventTime(5L).send();
        create.newMessage().value("test").eventTime(5L).send();
        Consumer subscribe3 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/ns-abc/stuckSubscriptionTopic"}).subscriptionType(SubscriptionType.Shared).subscriptionName("shared").subscribe();
        Consumer subscribe4 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/ns-abc/stuckSubscriptionTopic"}).subscriptionType(SubscriptionType.Failover).subscriptionName("failOver").subscribe();
        Assert.assertNull(subscribe3.receive(2, TimeUnit.SECONDS));
        Assert.assertNull(subscribe4.receive(2, TimeUnit.SECONDS));
        dispatcher.havePendingRead = false;
        dispatcher2.havePendingRead = false;
        subscription.checkAndUnblockIfStuck();
        dispatcher2.checkAndUnblockIfStuck();
        Assert.assertTrue(subscription.checkAndUnblockIfStuck());
        Assert.assertTrue(dispatcher2.checkAndUnblockIfStuck());
        Assert.assertNotNull(subscribe3.receive(5, TimeUnit.SECONDS));
        Assert.assertNotNull(subscribe4.receive(5, TimeUnit.SECONDS));
    }

    @Test
    public void testDeleteNamespaceInfiniteRetry() throws Exception {
        String str = "prop/ns" + UUID.randomUUID();
        this.admin.namespaces().createNamespace(str, Sets.newHashSet(new String[]{"test"}));
        String str2 = "persistent://" + str + "/testDeleteNamespaceInfiniteRetry";
        this.conf.setForceDeleteNamespaceAllowed(true);
        this.pulsarClient.newProducer().topic(str2).create().close();
        this.admin.namespaces().setMaxConsumersPerTopic(str, 0);
        Awaitility.await().atMost(3L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.admin.namespaces().getMaxConsumersPerTopic(str).intValue() == 0);
        });
        PersistentTopic persistentTopic = (PersistentTopic) Mockito.spy((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str2).get()).get());
        Policies policies = new Policies();
        policies.deleted = true;
        persistentTopic.onPoliciesUpdate(policies);
        ((PersistentTopic) Mockito.verify(persistentTopic, Mockito.times(0))).checkReplicationAndRetryOnFailure();
        policies.deleted = false;
        persistentTopic.onPoliciesUpdate(policies);
        ((PersistentTopic) Mockito.verify(persistentTopic, Mockito.times(1))).checkReplicationAndRetryOnFailure();
    }

    @Test
    public void testAccumulativeStats() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/ns-abc/aTopic"}).subscriptionType(SubscriptionType.Shared).subscriptionName("shared").subscribe();
        Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/ns-abc/aTopic"}).subscriptionType(SubscriptionType.Failover).subscriptionName("failOver").subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://prop/ns-abc/aTopic").create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/aTopic").get();
        TopicStatsImpl stats = persistentTopic.getStats(false, false);
        Assert.assertEquals(stats.getBytesInCounter(), 0L);
        Assert.assertEquals(stats.getMsgInCounter(), 0L);
        Assert.assertEquals(stats.getBytesOutCounter(), 0L);
        Assert.assertEquals(stats.getMsgOutCounter(), 0L);
        create.newMessage().value("test").eventTime(5L).send();
        Assert.assertNotNull(subscribe.receive());
        Assert.assertNotNull(subscribe2.receive());
        TopicStatsImpl stats2 = persistentTopic.getStats(false, false);
        Assert.assertTrue(stats2.getBytesInCounter() > 0);
        Assert.assertTrue(stats2.getMsgInCounter() > 0);
        Assert.assertTrue(stats2.getBytesOutCounter() > 0);
        Assert.assertTrue(stats2.getMsgOutCounter() > 0);
        subscribe.unsubscribe();
        subscribe2.unsubscribe();
        create.close();
        Collection values = persistentTopic.getProducers().values();
        persistentTopic.getClass();
        values.forEach(persistentTopic::removeProducer);
        Assert.assertEquals(persistentTopic.getProducers().size(), 0);
        TopicStatsImpl stats3 = persistentTopic.getStats(false, false);
        Assert.assertEquals(stats3.getBytesInCounter(), stats2.getBytesInCounter());
        Assert.assertEquals(stats3.getMsgInCounter(), stats2.getMsgInCounter());
        Assert.assertEquals(stats3.getBytesOutCounter(), stats2.getBytesOutCounter());
        Assert.assertEquals(stats3.getMsgOutCounter(), stats2.getMsgOutCounter());
    }

    @Test
    public void testPersistentPartitionedTopicUnload() throws Exception {
        this.admin.namespaces().createNamespace("prop/ns", 2);
        this.admin.topics().createPartitionedTopic("persistent://prop/ns/failedUnload", 5);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 1; i++) {
            arrayList.add(this.pulsarClient.newProducer(Schema.STRING).topic("persistent://prop/ns/failedUnload").create());
        }
        Assert.assertFalse(this.pulsar.getBrokerService().getTopics().containsKey("persistent://prop/ns/failedUnload"));
        this.pulsar.getBrokerService().getTopicIfExists("persistent://prop/ns/failedUnload").get();
        Assert.assertTrue(this.pulsar.getBrokerService().getTopics().containsKey("persistent://prop/ns/failedUnload"));
        Assert.assertFalse(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns/failedUnload").isPresent());
        this.pulsar.getNamespaceService().unloadNamespaceBundle(this.pulsar.getNamespaceService().getBundle(TopicName.get("persistent://prop/ns/failedUnload")), 5L, TimeUnit.SECONDS).get();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Producer) it.next()).close();
        }
    }
}
