package org.apache.rocketmq.test.container;

import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.container.InnerSalveBrokerController;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.assertj.core.api.Java6Assertions;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
/* loaded from: input_file:org/apache/rocketmq/test/container/PullMultipleReplicasIT.class */
public class PullMultipleReplicasIT extends ContainerIntegrationTestBase {
    private static DefaultMQPullConsumer pullConsumer;
    private static DefaultMQProducer producer;
    private static MQClientInstance mqClientInstance;
    private final String MESSAGE_STRING = RandomStringUtils.random(1024);
    private final byte[] MESSAGE_BODY = this.MESSAGE_STRING.getBytes("UTF-8");

    @BeforeClass
    public static void beforeClass() throws Exception {
        pullConsumer = createPullConsumer(PullMultipleReplicasIT.class.getSimpleName() + "_Consumer");
        pullConsumer.start();
        Field declaredField = DefaultMQPullConsumerImpl.class.getDeclaredField("mQClientFactory");
        declaredField.setAccessible(true);
        mqClientInstance = (MQClientInstance) declaredField.get(pullConsumer.getDefaultMQPullConsumerImpl());
        producer = createProducer(PullMultipleReplicasIT.class.getSimpleName() + "_Producer");
        producer.setSendMsgTimeout(15000);
        producer.start();
    }

    @AfterClass
    public static void afterClass() {
        producer.shutdown();
        pullConsumer.shutdown();
    }

    @Test
    public void testPullMessageFromSlave() throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {
        awaitUntilSlaveOK();
        SendResult send = producer.send(new Message("SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS", this.MESSAGE_BODY));
        Java6Assertions.assertThat(send.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
        MessageQueue messageQueue = send.getMessageQueue();
        long queueOffset = send.getQueueOffset();
        PullResult[] pullResultArr = {null};
        Awaitility.await().atMost(Duration.ofSeconds(5L)).until(() -> {
            pullResultArr[0] = pullConsumer.pull(messageQueue, "*", queueOffset, 1);
            return Boolean.valueOf(pullResultArr[0].getPullStatus() == PullStatus.FOUND);
        });
        List msgFoundList = pullResultArr[0].getMsgFoundList();
        Java6Assertions.assertThat(msgFoundList.size()).isEqualTo(1);
        Java6Assertions.assertThat(new String(((MessageExt) msgFoundList.get(0)).getBody(), "UTF-8")).isEqualTo(this.MESSAGE_STRING);
        pullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().updatePullFromWhichNode(messageQueue, 1L);
        Awaitility.await().atMost(Duration.ofSeconds(5L)).until(() -> {
            pullResultArr[0] = pullConsumer.pull(messageQueue, "*", queueOffset, 1);
            return Boolean.valueOf(pullResultArr[0].getPullStatus() == PullStatus.FOUND);
        });
        List msgFoundList2 = pullResultArr[0].getMsgFoundList();
        Java6Assertions.assertThat(msgFoundList2.size()).isEqualTo(1);
        Java6Assertions.assertThat(new String(((MessageExt) msgFoundList2.get(0)).getBody(), "UTF-8")).isEqualTo(this.MESSAGE_STRING);
        pullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().updatePullFromWhichNode(messageQueue, 2L);
        Awaitility.await().atMost(Duration.ofSeconds(5L)).until(() -> {
            pullResultArr[0] = pullConsumer.pull(messageQueue, "*", queueOffset, 1);
            return Boolean.valueOf(pullResultArr[0].getPullStatus() == PullStatus.FOUND);
        });
        List msgFoundList3 = pullResultArr[0].getMsgFoundList();
        Java6Assertions.assertThat(msgFoundList3.size()).isEqualTo(1);
        Java6Assertions.assertThat(new String(((MessageExt) msgFoundList3.get(0)).getBody(), "UTF-8")).isEqualTo(this.MESSAGE_STRING);
        pullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().updatePullFromWhichNode(messageQueue, 0L);
    }

    @Test
    public void testSendMessageBackToSlave() throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {
        awaitUntilSlaveOK();
        String str = "TOPIC_ON_BROKER2_AND_BROKER3_FOR_MESSAGE_BACK";
        createTopicTo(master1With3Replicas, "TOPIC_ON_BROKER2_AND_BROKER3_FOR_MESSAGE_BACK");
        createTopicTo(master3With3Replicas, "TOPIC_ON_BROKER2_AND_BROKER3_FOR_MESSAGE_BACK");
        Message message = new Message("TOPIC_ON_BROKER2_AND_BROKER3_FOR_MESSAGE_BACK", this.MESSAGE_BODY);
        producer.setSendMsgTimeout(10000);
        MessageQueue[] messageQueueArr = new MessageQueue[1];
        Awaitility.await().atMost(Duration.ofSeconds(5L)).until(() -> {
            for (MessageQueue messageQueue : producer.fetchPublishMessageQueues(str)) {
                if (messageQueue.getBrokerName().equals(master3With3Replicas.getBrokerConfig().getBrokerName())) {
                    messageQueueArr[0] = messageQueue;
                }
            }
            return Boolean.valueOf(messageQueueArr[0] != null);
        });
        SendResult send = producer.send(message, messageQueueArr[0]);
        Java6Assertions.assertThat(send.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
        MessageQueue messageQueue = send.getMessageQueue();
        long queueOffset = send.getQueueOffset();
        PullResult[] pullResultArr = {null};
        Awaitility.await().atMost(Duration.ofSeconds(60L)).until(() -> {
            pullResultArr[0] = pullConsumer.pull(messageQueue, "*", queueOffset, 1);
            return Boolean.valueOf(pullResultArr[0].getPullStatus() == PullStatus.FOUND);
        });
        Awaitility.await().atMost(Duration.ofSeconds(60L)).until(() -> {
            DefaultMessageStore messageStore = master3With3Replicas.getMessageStore();
            return Boolean.valueOf(messageStore.getHaService().inSyncReplicasNums(messageStore.getMaxPhyOffset()) == 3);
        });
        InnerSalveBrokerController innerSalveBrokerController = null;
        for (InnerSalveBrokerController innerSalveBrokerController2 : brokerContainer1.getSlaveBrokers()) {
            if (innerSalveBrokerController2.getBrokerConfig().getBrokerName().equals(master3With3Replicas.getBrokerConfig().getBrokerName())) {
                innerSalveBrokerController = innerSalveBrokerController2;
            }
        }
        Java6Assertions.assertThat(innerSalveBrokerController).isNotNull();
        MessageExt messageExt = (MessageExt) pullResultArr[0].getMsgFoundList().get(0);
        messageExt.setStoreHost(new InetSocketAddress(innerSalveBrokerController.getBrokerConfig().getBrokerIP1(), innerSalveBrokerController.getBrokerConfig().getListenPort()));
        pullConsumer.sendMessageBack(messageExt, 0);
        MessageQueue messageQueue2 = new MessageQueue(MixAll.getRetryTopic(pullConsumer.getConsumerGroup()), master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
        Awaitility.await().atMost(Duration.ofSeconds(60L)).until(() -> {
            pullResultArr[0] = pullConsumer.pull(messageQueue2, "*", 0L, 1);
            return Boolean.valueOf(pullResultArr[0].getPullStatus() == PullStatus.FOUND);
        });
        List msgFoundList = pullResultArr[0].getMsgFoundList();
        Java6Assertions.assertThat(msgFoundList.size()).isEqualTo(1);
        Java6Assertions.assertThat(new String(((MessageExt) msgFoundList.get(0)).getBody(), "UTF-8")).isEqualTo(this.MESSAGE_STRING);
        awaitUntilSlaveOK();
    }
}
