package org.apache.rocketmq.test.container;

import java.io.UnsupportedEncodingException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.BrokerIdentity;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.container.BrokerContainer;
import org.apache.rocketmq.container.InnerBrokerController;
import org.apache.rocketmq.container.InnerSalveBrokerController;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
/* loaded from: input_file:org/apache/rocketmq/test/container/PopSlaveActingMasterIT.class */
public class PopSlaveActingMasterIT extends ContainerIntegrationTestBase {
    private static final int MESSAGE_COUNT = 16;
    private static DefaultMQProducer producer;
    private static byte[] MESSAGE_BODY;
    private static final String CONSUME_GROUP = PopSlaveActingMasterIT.class.getSimpleName() + "_Consumer";
    private static final Random random = new Random();
    private static final String MESSAGE_STRING = RandomStringUtils.random(1024);

    void createTopic(String str) {
        createTopicTo(master1With3Replicas, str, 1, 1);
        createTopicTo(master2With3Replicas, str, 1, 1);
        createTopicTo(master3With3Replicas, str, 1, 1);
        System.out.println("Topic [" + str + "] created");
    }

    @BeforeClass
    public static void beforeClass() throws Throwable {
        producer = createProducer(PopSlaveActingMasterIT.class.getSimpleName() + "_PRODUCER");
        producer.setSendMsgTimeout(5000);
        producer.start();
    }

    @AfterClass
    public static void afterClass() throws Exception {
        producer.shutdown();
    }

    @Test
    public void testLocalActing_ackSlave() throws Exception {
        String str = PopSlaveActingMasterIT.class.getSimpleName() + random.nextInt(65535);
        createTopic(str);
        String buildPopRetryTopic = KeyBuilder.buildPopRetryTopic(str, CONSUME_GROUP);
        createTopic(buildPopRetryTopic);
        switchPop(str);
        producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(str);
        MessageQueue messageQueue = new MessageQueue(str, master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
        int i = 0;
        for (int i2 = 0; i2 < MESSAGE_COUNT; i2++) {
            SendResult send = producer.send(new Message(str, MESSAGE_BODY), messageQueue);
            if (send.getSendStatus() == SendStatus.SEND_OK) {
                System.out.println("send message id: " + send.getMsgId());
                i++;
            }
        }
        System.out.printf("send success %d%n", Integer.valueOf(i));
        int i3 = i;
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> {
            return Boolean.valueOf(i3 >= MESSAGE_COUNT);
        });
        isolateBroker(master1With3Replicas);
        System.out.printf("isolate master1%n", new Object[0]);
        DefaultMQPushConsumer createPushConsumer = createPushConsumer(CONSUME_GROUP);
        createPushConsumer.subscribe(str, "*");
        createPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        createPushConsumer.registerMessageListener((list, consumeConcurrentlyContext) -> {
            list.forEach(messageExt -> {
                System.out.println("receive msg id: " + messageExt.getMsgId());
                copyOnWriteArrayList.add(messageExt.getMsgId());
            });
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        createPushConsumer.setClientRebalance(false);
        createPushConsumer.start();
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() >= MESSAGE_COUNT);
        });
        System.out.printf("%s pop receive msg count: %d%n", LocalDateTime.now(), Integer.valueOf(copyOnWriteArrayList.size()));
        createPushConsumer.shutdown();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        DefaultMQPushConsumer createPushConsumer2 = createPushConsumer(CONSUME_GROUP);
        createPushConsumer2.subscribe(buildPopRetryTopic, "*");
        createPushConsumer2.registerMessageListener((list2, consumeConcurrentlyContext2) -> {
            Iterator it = list2.iterator();
            while (it.hasNext()) {
                MessageExt messageExt = (MessageExt) it.next();
                System.out.printf("receive retry msg: %s %s%n", new String(messageExt.getBody()), messageExt);
                copyOnWriteArrayList2.add(new String(messageExt.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        createPushConsumer2.start();
        System.out.printf("wait for ack revive%n", new Object[0]);
        Thread.sleep(10000L);
        Assertions.assertThat(copyOnWriteArrayList2.size()).isEqualTo(0);
        cancelIsolatedBroker(master1With3Replicas);
        awaitUntilSlaveOK();
        createPushConsumer2.shutdown();
    }

    @Test
    public void testLocalActing_notAckSlave() throws Exception {
        String str = PopSlaveActingMasterIT.class.getSimpleName() + random.nextInt(65535);
        createTopic(str);
        String buildPopRetryTopic = KeyBuilder.buildPopRetryTopic(str, CONSUME_GROUP);
        createTopic(buildPopRetryTopic);
        switchPop(str);
        producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(str);
        HashSet hashSet = new HashSet();
        MessageQueue messageQueue = new MessageQueue(str, master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
        int i = 0;
        for (int i2 = 0; i2 < MESSAGE_COUNT; i2++) {
            Message message = new Message(str, MESSAGE_BODY);
            if (producer.send(message, messageQueue).getSendStatus() == SendStatus.SEND_OK) {
                hashSet.add(new String(message.getBody()));
                i++;
            }
        }
        System.out.printf("send success %d%n", Integer.valueOf(i));
        int i3 = i;
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> {
            return Boolean.valueOf(i3 >= MESSAGE_COUNT);
        });
        isolateBroker(master1With3Replicas);
        System.out.printf("isolate master1%n", new Object[0]);
        DefaultMQPushConsumer createPushConsumer = createPushConsumer(CONSUME_GROUP);
        createPushConsumer.subscribe(str, "*");
        createPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        createPushConsumer.setPopInvisibleTime(5000L);
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        createPushConsumer.registerMessageListener((list, consumeConcurrentlyContext) -> {
            list.forEach(messageExt -> {
                System.out.println("receive msg id: " + messageExt.getMsgId());
                messageExt.setReconsumeTimes(0);
                copyOnWriteArrayList.add(messageExt.getMsgId());
            });
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        });
        createPushConsumer.setClientRebalance(false);
        createPushConsumer.start();
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() >= MESSAGE_COUNT);
        });
        createPushConsumer.shutdown();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        DefaultMQPushConsumer createPushConsumer2 = createPushConsumer(CONSUME_GROUP);
        createPushConsumer2.subscribe(buildPopRetryTopic, "*");
        createPushConsumer2.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        createPushConsumer2.registerMessageListener((list2, consumeConcurrentlyContext2) -> {
            Iterator it = list2.iterator();
            while (it.hasNext()) {
                MessageExt messageExt = (MessageExt) it.next();
                System.out.printf("receive retry msg: %s%n", messageExt.getUserProperty("UNIQ_KEY"));
                copyOnWriteArrayList2.add(new String(messageExt.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        createPushConsumer2.start();
        System.out.printf(LocalDateTime.now() + ": wait for ack revive%n", new Object[0]);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Awaitility.await().atMost(Duration.ofMinutes(3L)).pollInterval(Duration.ofSeconds(10L)).until(() -> {
            if (copyOnWriteArrayList2.size() < MESSAGE_COUNT) {
                System.out.println("check FAILED" + atomicInteger.incrementAndGet() + ": retryMsgList.size=" + copyOnWriteArrayList2.size() + " less than " + MESSAGE_COUNT);
                return false;
            }
            Iterator it = copyOnWriteArrayList2.iterator();
            while (it.hasNext()) {
                String str2 = (String) it.next();
                if (!hashSet.contains(str2)) {
                    System.out.println("check FAILED: sendToIsolateMsgSet doesn't contain " + str2);
                    return false;
                }
            }
            return true;
        });
        System.out.printf(LocalDateTime.now() + ": receive retry msg size=%d%n", Integer.valueOf(copyOnWriteArrayList2.size()));
        cancelIsolatedBroker(master1With3Replicas);
        awaitUntilSlaveOK();
        createPushConsumer2.shutdown();
    }

    @Test
    public void testRemoteActing_ackSlave() throws Exception {
        String str = PopSlaveActingMasterIT.class.getSimpleName() + random.nextInt(65535);
        createTopic(str);
        String buildPopRetryTopic = KeyBuilder.buildPopRetryTopic(str, CONSUME_GROUP);
        createTopic(buildPopRetryTopic);
        switchPop(str);
        producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(str);
        MessageQueue messageQueue = new MessageQueue(str, master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
        int i = 0;
        for (int i2 = 0; i2 < MESSAGE_COUNT; i2++) {
            SendResult send = producer.send(new Message(str, MESSAGE_BODY), messageQueue);
            if (send.getSendStatus() == SendStatus.SEND_OK) {
                System.out.println("Send message id: " + send.getMsgId());
                i++;
            }
        }
        System.out.printf("%s send success %d%n", LocalDateTime.now(), Integer.valueOf(i));
        int i3 = i;
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> {
            return Boolean.valueOf(i3 >= MESSAGE_COUNT);
        });
        isolateBroker(master1With3Replicas);
        System.out.printf("%s isolate master1%n", LocalDateTime.now());
        isolateBroker(master2With3Replicas);
        brokerContainer2.removeBroker(new BrokerIdentity(master2With3Replicas.getBrokerConfig().getBrokerClusterName(), master2With3Replicas.getBrokerConfig().getBrokerName(), master2With3Replicas.getBrokerConfig().getBrokerId()));
        System.out.printf("%s Remove master2%n", LocalDateTime.now());
        DefaultMQPushConsumer createPushConsumer = createPushConsumer(CONSUME_GROUP);
        createPushConsumer.subscribe(str, "*");
        createPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        createPushConsumer.registerMessageListener((list, consumeConcurrentlyContext) -> {
            list.forEach(messageExt -> {
                System.out.println("receive msg id: " + messageExt.getMsgId());
                copyOnWriteArrayList.add(messageExt.getMsgId());
            });
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        createPushConsumer.setClientRebalance(false);
        createPushConsumer.start();
        Awaitility.await().atMost(Duration.ofMinutes(2L)).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() >= MESSAGE_COUNT);
        });
        createPushConsumer.shutdown();
        System.out.printf("%s %d messages consumed%n", LocalDateTime.now(), Integer.valueOf(copyOnWriteArrayList.size()));
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        DefaultMQPushConsumer createPushConsumer2 = createPushConsumer(CONSUME_GROUP);
        createPushConsumer2.subscribe(buildPopRetryTopic, "*");
        createPushConsumer2.registerMessageListener((list2, consumeConcurrentlyContext2) -> {
            Iterator it = list2.iterator();
            while (it.hasNext()) {
                MessageExt messageExt = (MessageExt) it.next();
                System.out.printf("receive retry msg: %s %n", messageExt.getUserProperty("UNIQ_KEY"));
                copyOnWriteArrayList2.add(new String(messageExt.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        createPushConsumer2.start();
        System.out.printf("%s wait for ack revive%n", LocalDateTime.now());
        Thread.sleep(10000L);
        Assertions.assertThat(copyOnWriteArrayList2.size()).isEqualTo(0);
        cancelIsolatedBroker(master1With3Replicas);
        System.out.printf("%s Cancel isolate master1%n", LocalDateTime.now());
        master2With3Replicas = brokerContainer2.addBroker(master2With3Replicas.getBrokerConfig(), master2With3Replicas.getMessageStoreConfig());
        master2With3Replicas.start();
        cancelIsolatedBroker(master2With3Replicas);
        System.out.printf("%s Add back master2%n", LocalDateTime.now());
        awaitUntilSlaveOK();
        System.out.printf("%s wait for ack revive%n", LocalDateTime.now());
        Thread.sleep(10000L);
        Assertions.assertThat(copyOnWriteArrayList2.size()).isEqualTo(0);
        System.out.printf("%s shutting down pushConsumer%n", LocalDateTime.now());
        createPushConsumer2.shutdown();
    }

    @Test
    public void testRemoteActing_notAckSlave_getFromLocal() throws Exception {
        String str = PopSlaveActingMasterIT.class.getSimpleName() + random.nextInt(65535);
        createTopic(str);
        switchPop(str);
        String buildPopRetryTopic = KeyBuilder.buildPopRetryTopic(str, CONSUME_GROUP);
        createTopic(buildPopRetryTopic);
        producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(str);
        HashSet hashSet = new HashSet();
        MessageQueue messageQueue = new MessageQueue(str, master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
        int i = 0;
        for (int i2 = 0; i2 < MESSAGE_COUNT; i2++) {
            Message message = new Message(str, MESSAGE_BODY);
            if (producer.send(message, messageQueue).getSendStatus() == SendStatus.SEND_OK) {
                hashSet.add(new String(message.getBody()));
                i++;
            }
        }
        System.out.printf("send success %d%n", Integer.valueOf(i));
        int i3 = i;
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> {
            return Boolean.valueOf(i3 >= MESSAGE_COUNT);
        });
        isolateBroker(master1With3Replicas);
        System.out.printf("isolate master1%n", new Object[0]);
        isolateBroker(master2With3Replicas);
        brokerContainer2.removeBroker(new BrokerIdentity(master2With3Replicas.getBrokerConfig().getBrokerClusterName(), master2With3Replicas.getBrokerConfig().getBrokerName(), master2With3Replicas.getBrokerConfig().getBrokerId()));
        System.out.printf("Remove master2%n", new Object[0]);
        DefaultMQPushConsumer createPushConsumer = createPushConsumer(CONSUME_GROUP);
        createPushConsumer.subscribe(str, "*");
        createPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        createPushConsumer.registerMessageListener((list, consumeConcurrentlyContext) -> {
            list.forEach(messageExt -> {
                System.out.println("receive msg id: " + messageExt.getMsgId());
                copyOnWriteArrayList.add(messageExt.getMsgId());
            });
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        });
        createPushConsumer.setClientRebalance(false);
        createPushConsumer.start();
        Awaitility.await().atMost(Duration.ofMinutes(3L)).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() >= MESSAGE_COUNT);
        });
        createPushConsumer.shutdown();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        DefaultMQPushConsumer createPushConsumer2 = createPushConsumer(CONSUME_GROUP);
        createPushConsumer2.subscribe(buildPopRetryTopic, "*");
        createPushConsumer2.registerMessageListener((list2, consumeConcurrentlyContext2) -> {
            Iterator it = list2.iterator();
            while (it.hasNext()) {
                MessageExt messageExt = (MessageExt) it.next();
                System.out.printf("receive retry msg: %s%n", messageExt.getUserProperty("UNIQ_KEY"));
                copyOnWriteArrayList2.add(new String(messageExt.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        createPushConsumer2.start();
        System.out.printf("wait for ack revive%n", new Object[0]);
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> {
            if (copyOnWriteArrayList2.size() < MESSAGE_COUNT) {
                System.out.println("check FAILED: retryMsgList.size=" + copyOnWriteArrayList2.size() + " less than " + MESSAGE_COUNT);
                return false;
            }
            Iterator it = copyOnWriteArrayList2.iterator();
            while (it.hasNext()) {
                String str2 = (String) it.next();
                if (!hashSet.contains(str2)) {
                    System.out.println("check FAILED: sendToIsolateMsgSet doesn't contain: " + str2);
                    return false;
                }
            }
            return true;
        });
        System.out.printf("receive retry msg as expected%n", new Object[0]);
        cancelIsolatedBroker(master1With3Replicas);
        System.out.printf("Cancel isolate master1%n", new Object[0]);
        master2With3Replicas = brokerContainer2.addBroker(master2With3Replicas.getBrokerConfig(), master2With3Replicas.getMessageStoreConfig());
        master2With3Replicas.start();
        cancelIsolatedBroker(master2With3Replicas);
        System.out.printf("Add back master2%n", new Object[0]);
        awaitUntilSlaveOK();
        createPushConsumer2.shutdown();
    }

    @Test
    public void testRemoteActing_notAckSlave_getFromRemote() throws Exception {
        String str = PopSlaveActingMasterIT.class.getSimpleName() + random.nextInt(65535);
        createTopic(str);
        switchPop(str);
        String buildPopRetryTopic = KeyBuilder.buildPopRetryTopic(str, CONSUME_GROUP);
        createTopic(buildPopRetryTopic);
        producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(str);
        HashSet hashSet = new HashSet();
        MessageQueue messageQueue = new MessageQueue(str, master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
        int i = 0;
        for (int i2 = 0; i2 < MESSAGE_COUNT; i2++) {
            Message message = new Message(str, MESSAGE_BODY);
            if (producer.send(message, messageQueue).getSendStatus() == SendStatus.SEND_OK) {
                hashSet.add(new String(message.getBody()));
                i++;
            }
        }
        System.out.printf("send success %d%n", Integer.valueOf(i));
        int i3 = i;
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> {
            return Boolean.valueOf(i3 >= MESSAGE_COUNT);
        });
        isolateBroker(master1With3Replicas);
        System.out.printf("isolate master1%n", new Object[0]);
        isolateBroker(master2With3Replicas);
        brokerContainer2.removeBroker(new BrokerIdentity(master2With3Replicas.getBrokerConfig().getBrokerClusterName(), master2With3Replicas.getBrokerConfig().getBrokerName(), master2With3Replicas.getBrokerConfig().getBrokerId()));
        System.out.printf("Remove master2%n", new Object[0]);
        InnerSalveBrokerController slaveFromContainerByName = getSlaveFromContainerByName(brokerContainer3, master1With3Replicas.getBrokerConfig().getBrokerName());
        isolateBroker(slaveFromContainerByName);
        brokerContainer3.removeBroker(new BrokerIdentity(slaveFromContainerByName.getBrokerConfig().getBrokerClusterName(), slaveFromContainerByName.getBrokerConfig().getBrokerName(), slaveFromContainerByName.getBrokerConfig().getBrokerId()));
        System.out.printf("Remove slave1 form container3%n", new Object[0]);
        DefaultMQPushConsumer createPushConsumer = createPushConsumer(CONSUME_GROUP);
        createPushConsumer.subscribe(str, "*");
        createPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        createPushConsumer.registerMessageListener((list, consumeConcurrentlyContext) -> {
            list.forEach(messageExt -> {
                System.out.println("receive msg id: " + messageExt.getMsgId());
                copyOnWriteArrayList.add(messageExt.getMsgId());
            });
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        });
        createPushConsumer.setClientRebalance(false);
        createPushConsumer.start();
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() >= MESSAGE_COUNT);
        });
        System.out.printf("%s pop receive msg count: %d%n", LocalDateTime.now(), Integer.valueOf(copyOnWriteArrayList.size()));
        createPushConsumer.shutdown();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        DefaultMQPushConsumer createPushConsumer2 = createPushConsumer(CONSUME_GROUP);
        createPushConsumer2.subscribe(buildPopRetryTopic, "*");
        createPushConsumer2.registerMessageListener((list2, consumeConcurrentlyContext2) -> {
            Iterator it = list2.iterator();
            while (it.hasNext()) {
                MessageExt messageExt = (MessageExt) it.next();
                System.out.printf("receive retry msg: %s%n", messageExt.getUserProperty("UNIQ_KEY"));
                copyOnWriteArrayList2.add(new String(messageExt.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        createPushConsumer2.start();
        System.out.printf("wait for ack revive%n", new Object[0]);
        Thread.sleep(10000L);
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> {
            if (copyOnWriteArrayList2.size() < MESSAGE_COUNT) {
                return false;
            }
            Iterator it = copyOnWriteArrayList2.iterator();
            while (it.hasNext()) {
                if (!hashSet.contains((String) it.next())) {
                    return false;
                }
            }
            return true;
        });
        System.out.printf("receive retry msg as expected%n", new Object[0]);
        cancelIsolatedBroker(master1With3Replicas);
        System.out.printf("Cancel isolate master1%n", new Object[0]);
        master2With3Replicas = brokerContainer2.addBroker(master2With3Replicas.getBrokerConfig(), master2With3Replicas.getMessageStoreConfig());
        master2With3Replicas.start();
        cancelIsolatedBroker(master2With3Replicas);
        System.out.printf("Add back master2%n", new Object[0]);
        InnerBrokerController addBroker = brokerContainer3.addBroker(slaveFromContainerByName.getBrokerConfig(), slaveFromContainerByName.getMessageStoreConfig());
        addBroker.start();
        cancelIsolatedBroker(addBroker);
        System.out.printf("Add back slave1 to container3%n", new Object[0]);
        awaitUntilSlaveOK();
        createPushConsumer2.shutdown();
    }

    private void switchPop(String str) throws Exception {
        for (BrokerContainer brokerContainer : brokerContainerList) {
            Iterator it = brokerContainer.getMasterBrokers().iterator();
            while (it.hasNext()) {
                defaultMQAdminExt.setMessageRequestMode(((InnerBrokerController) it.next()).getBrokerAddr(), str, CONSUME_GROUP, MessageRequestMode.POP, 8, 60000L);
            }
            Iterator it2 = brokerContainer.getSlaveBrokers().iterator();
            while (it2.hasNext()) {
                defaultMQAdminExt.setMessageRequestMode(((InnerSalveBrokerController) it2.next()).getBrokerAddr(), str, CONSUME_GROUP, MessageRequestMode.POP, 8, 60000L);
            }
        }
    }

    static {
        try {
            MESSAGE_BODY = MESSAGE_STRING.getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
}
