package org.apache.rocketmq.test.container;

import java.io.UnsupportedEncodingException;
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.assertj.core.api.Java6Assertions;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
/* loaded from: input_file:org/apache/rocketmq/test/container/ScheduledMessageIT.class */
public class ScheduledMessageIT extends ContainerIntegrationTestBase {
    private static DefaultMQProducer producer;
    private static final String CONSUME_GROUP = ScheduledMessageIT.class.getSimpleName() + "_Consumer";
    private static final String MESSAGE_STRING = RandomStringUtils.random(1024);
    private static byte[] MESSAGE_BODY;
    private static final String TOPIC_PREFIX;
    private static Random random;
    private static final int MESSAGE_COUNT = 128;

    void createTopic(String str) {
        createTopicTo(master1With3Replicas, str, 1, 1);
        createTopicTo(master2With3Replicas, str, 1, 1);
        createTopicTo(master3With3Replicas, str, 1, 1);
    }

    @BeforeClass
    public static void beforeClass() throws Throwable {
        producer = createProducer(ScheduledMessageIT.class.getSimpleName() + "_PRODUCER");
        producer.setSendMsgTimeout(5000);
        producer.start();
    }

    @AfterClass
    public static void afterClass() throws Exception {
        producer.shutdown();
    }

    @Test
    @Ignore
    public void consumeScheduledMsg() throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        String str = TOPIC_PREFIX + random.nextInt(65535);
        createTopic(str);
        DefaultMQPushConsumer createPushConsumer = createPushConsumer(CONSUME_GROUP + random.nextInt(65535));
        createPushConsumer.subscribe(str, "*");
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        createPushConsumer.registerMessageListener((list, consumeConcurrentlyContext) -> {
            long currentTimeMillis = System.currentTimeMillis() - ((MessageExt) list.get(0)).getBornTimestamp();
            if (Math.abs(currentTimeMillis - 5000) <= 1000) {
                atomicInteger2.addAndGet(list.size());
            }
            atomicInteger.addAndGet(list.size());
            list.forEach(messageExt -> {
                System.out.printf(atomicInteger.get() + " cost " + currentTimeMillis + " " + messageExt + "%n", new Object[0]);
            });
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        createPushConsumer.start();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            Message message = new Message(str, MESSAGE_BODY);
            message.setDelayTimeLevel(2);
            producer.send(message);
        }
        Awaitility.await().atMost(Duration.ofSeconds(256L)).until(() -> {
            return Boolean.valueOf(atomicInteger.get() >= MESSAGE_COUNT && ((double) atomicInteger2.get()) >= 121.6d);
        });
        System.out.printf("consumer received %d msg, %d in time%n", Integer.valueOf(atomicInteger.get()), Integer.valueOf(atomicInteger2.get()));
        createPushConsumer.shutdown();
    }

    @Test
    public void consumeScheduledMsgFromSlave() throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        String str = TOPIC_PREFIX + random.nextInt(65535);
        createTopic(str);
        DefaultMQPushConsumer createPushConsumer = createPushConsumer(CONSUME_GROUP + random.nextInt(65535));
        createPushConsumer.subscribe(str, "*");
        AtomicInteger atomicInteger = new AtomicInteger(0);
        createPushConsumer.registerMessageListener((list, consumeConcurrentlyContext) -> {
            atomicInteger.addAndGet(list.size());
            list.forEach(messageExt -> {
                System.out.printf(messageExt + "%n", new Object[0]);
            });
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            Message message = new Message(str, String.valueOf(i).getBytes());
            message.setDelayTimeLevel(2);
            producer.send(message);
        }
        isolateBroker(master1With3Replicas);
        producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(str);
        Java6Assertions.assertThat(producer.getDefaultMQProducerImpl().getmQClientFactory().findBrokerAddressInPublish(str)).isNull();
        createPushConsumer.start();
        Awaitility.await().atMost(Duration.ofSeconds(256L)).until(() -> {
            return Boolean.valueOf(atomicInteger.get() >= MESSAGE_COUNT);
        });
        createPushConsumer.shutdown();
        cancelIsolatedBroker(master1With3Replicas);
        Awaitility.await().atMost(100L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(master1With3Replicas.getMessageStore().getHaService().getConnectionCount().get() == 2);
        });
    }

    @Test
    public void consumeTimerMsgFromSlave() throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        String str = TOPIC_PREFIX + random.nextInt(65535);
        createTopic(str);
        DefaultMQPushConsumer createPushConsumer = createPushConsumer(CONSUME_GROUP);
        createPushConsumer.subscribe(str, "*");
        AtomicInteger atomicInteger = new AtomicInteger(0);
        createPushConsumer.registerMessageListener((list, consumeConcurrentlyContext) -> {
            atomicInteger.addAndGet(list.size());
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            Message message = new Message(str, String.valueOf(i).getBytes());
            message.setDelayTimeSec(3L);
            producer.send(message);
        }
        isolateBroker(master1With3Replicas);
        producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(str);
        Java6Assertions.assertThat(producer.getDefaultMQProducerImpl().getmQClientFactory().findBrokerAddressInPublish(str)).isNull();
        createPushConsumer.start();
        Awaitility.await().atMost(Duration.ofSeconds(256L)).until(() -> {
            return Boolean.valueOf(atomicInteger.get() >= MESSAGE_COUNT);
        });
        createPushConsumer.shutdown();
        cancelIsolatedBroker(master1With3Replicas);
        Awaitility.await().atMost(100L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(master1With3Replicas.getMessageStore().getHaService().getConnectionCount().get() == 2);
        });
    }

    static {
        try {
            MESSAGE_BODY = MESSAGE_STRING.getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
        }
        TOPIC_PREFIX = ScheduledMessageIT.class.getSimpleName() + "_TOPIC";
        random = new Random();
    }
}
