package org.activemq.transport.peer;

import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.TestCase;
import org.activemq.ActiveMQConnectionFactory;
import org.activemq.message.ActiveMQQueue;
import org.activemq.message.ActiveMQTextMessage;
import org.activemq.message.ActiveMQTopic;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/activemq/transport/peer/PeerTransportTest.class */
public class PeerTransportTest extends TestCase implements MessageListener {
    protected Destination destination;
    protected static int MESSAGE_COUNT = 50;
    protected static int NUMBER_IN_CLUSTER = 3;
    protected MessageProducer[] producers;
    protected Connection[] connections;
    protected Log log = LogFactory.getLog(getClass());
    protected boolean topic = true;
    protected SynchronizedInt receivedMessageCount = new SynchronizedInt(0);
    protected int deliveryMode = 1;

    protected void setUp() throws Exception {
        this.connections = new Connection[NUMBER_IN_CLUSTER];
        this.producers = new MessageProducer[NUMBER_IN_CLUSTER];
        Destination createDestination = createDestination();
        String property = System.getProperty("activemq.store.dir");
        for (int i = 0; i < NUMBER_IN_CLUSTER; i++) {
            System.setProperty("activemq.store.dir", new StringBuffer().append(property).append("_broker_").append(i).toString());
            this.connections[i] = createConnection();
            this.connections[i].setClientID(new StringBuffer().append("ClusterTest").append(i).toString());
            this.connections[i].start();
            Session createSession = this.connections[i].createSession(false, 1);
            this.producers[i] = createSession.createProducer(createDestination);
            this.producers[i].setDeliveryMode(this.deliveryMode);
            createMessageConsumer(createSession, createDestination).setMessageListener(this);
        }
        System.out.println("Sleeping to ensure cluster is fully connected");
        Thread.sleep(5000L);
    }

    protected void tearDown() throws Exception {
        if (this.connections != null) {
            for (int i = 0; i < this.connections.length; i++) {
                this.connections[i].close();
            }
        }
    }

    protected MessageConsumer createMessageConsumer(Session session, Destination destination) throws JMSException {
        return session.createConsumer(destination);
    }

    protected int expectedReceiveCount() {
        return MESSAGE_COUNT * NUMBER_IN_CLUSTER * NUMBER_IN_CLUSTER;
    }

    protected Connection createConnection() throws JMSException {
        System.err.println("creating connection ....");
        return new ActiveMQConnectionFactory(new StringBuffer().append("peer://").append(getClass().getName()).toString()).createConnection();
    }

    protected Destination createDestination() {
        return createDestination(getClass().getName());
    }

    protected Destination createDestination(String str) {
        return this.topic ? new ActiveMQTopic(str) : new ActiveMQQueue(str);
    }

    public void onMessage(Message message) {
        this.receivedMessageCount.increment();
        synchronized (this.receivedMessageCount) {
            if (this.receivedMessageCount.get() >= expectedReceiveCount()) {
                this.receivedMessageCount.notify();
            }
        }
    }

    public void testSendReceive() throws Exception {
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            ActiveMQTextMessage activeMQTextMessage = new ActiveMQTextMessage();
            activeMQTextMessage.setText(new StringBuffer().append("MSG-NO:").append(i).toString());
            for (int i2 = 0; i2 < this.producers.length; i2++) {
                this.producers[i2].send(activeMQTextMessage);
            }
        }
        synchronized (this.receivedMessageCount) {
            if (this.receivedMessageCount.get() < expectedReceiveCount()) {
                this.receivedMessageCount.wait(20000L);
            }
        }
        Thread.sleep(2000L);
        System.err.println(new StringBuffer().append("GOT: ").append(this.receivedMessageCount.get()).toString());
        assertEquals("Expected message count not correct", expectedReceiveCount(), this.receivedMessageCount.get());
    }
}
