package org.codehaus.activemq.transport.multicast;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.io.impl.DefaultWireFormat;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ActiveMQObjectMessage;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.message.PacketListener;
import org.codehaus.activemq.transport.DiscoveryAgentSupport;
import org.codehaus.activemq.transport.DiscoveryEvent;
import org.codehaus.activemq.util.IdGenerator;

/* loaded from: input_file:activemq-core-2.0-SNAPSHOT.jar:org/codehaus/activemq/transport/multicast/MulticastDiscoveryAgent.class */
public class MulticastDiscoveryAgent extends DiscoveryAgentSupport implements PacketListener, Runnable {
    private static final Log log;
    public static final String DEFAULT_DISCOVERY_URI = "multicast://224.1.2.3:6066";
    private static final String KEEP_ALIVE_TYPE = "KEEP_ALIVE";
    private static final String SERVICE_TYPE = "SERVICE";
    private static final String STARTED_TYPE = "STARTED_TYPE";
    private static final String SERVICE_NAME = "SERVICE_NAME";
    private static final String CHANNEL_NAME = "CHANNEL_NAME";
    private static final long DEFAULT_KEEP_ALIVE_TIMEOUT = 5000;
    private static final int DEFAULT_TIMEOUT_COUNT = 2;
    private MulticastTransportChannel channel;
    private Thread runner;
    private URI uri;
    private ActiveMQMessage keepAliveMessage;
    private ActiveMQObjectMessage serviceMessage;
    private String channelName;
    static Class class$org$codehaus$activemq$transport$multicast$MulticastDiscoveryAgent;
    private String serviceName = "";
    private int timeToLive = 1;
    private SynchronizedBoolean started = new SynchronizedBoolean(false);
    private ConcurrentHashMap services = new ConcurrentHashMap();
    private ConcurrentHashMap keepAliveMap = new ConcurrentHashMap();
    private IdGenerator idGen = new IdGenerator();
    private String localId = this.idGen.generateId();
    private long keepAliveTimeout = DEFAULT_KEEP_ALIVE_TIMEOUT;
    private int timeoutCount = 2;
    private long timeoutExpiration = this.keepAliveTimeout * this.timeoutCount;

    public MulticastDiscoveryAgent(String str) throws JMSException {
        this.channelName = str;
        try {
            setUri(new URI(DEFAULT_DISCOVERY_URI));
        } catch (URISyntaxException e) {
            JMSException jMSException = new JMSException(new StringBuffer().append("URI Syntax exception: ").append(e.getMessage()).toString());
            jMSException.setLinkedException(e);
            throw jMSException;
        }
    }

    public long getKeepAliveTimeout() {
        return this.keepAliveTimeout;
    }

    public void setKeepAliveTimeout(long j) {
        this.keepAliveTimeout = j;
    }

    public int getTimeoutCount() {
        return this.timeoutCount;
    }

    public void setTimeoutCount(int i) {
        this.timeoutCount = i;
    }

    public String getLocalId() {
        return this.localId;
    }

    public void setLocalId(String str) {
        this.localId = str;
    }

    public URI getUri() {
        return this.uri;
    }

    public void setUri(URI uri) {
        this.uri = uri;
    }

    public int getTimeToLive() {
        return this.timeToLive;
    }

    public void setTimeToLive(int i) throws IOException {
        this.timeToLive = i;
        if (this.channel != null) {
            this.channel.setTimeToLive(i);
        }
    }

    public String getChannelName() {
        return this.channelName;
    }

    public void setChannelName(String str) {
        this.channelName = str;
    }

    public String toString() {
        return new StringBuffer().append("MulticastDiscoveryAgent:").append(this.serviceName).toString();
    }

    public int getServicesCount() {
        return (this.keepAliveMessage != null ? 1 : 0) + this.services.size();
    }

    @Override // org.codehaus.activemq.transport.DiscoveryAgent
    public void registerService(String str, Map map) throws JMSException {
        if (this.keepAliveMessage != null) {
            this.keepAliveMessage.setBooleanProperty(STARTED_TYPE, true);
            sendKeepAlive();
        }
        this.serviceName = str;
        this.serviceMessage = new ActiveMQObjectMessage();
        this.serviceMessage.setJMSType(SERVICE_TYPE);
        this.serviceMessage.setStringProperty(SERVICE_NAME, str);
        this.serviceMessage.setStringProperty(CHANNEL_NAME, this.channelName);
        this.serviceMessage.setObject((Serializable) map);
        sendService();
        this.keepAliveMessage = new ActiveMQMessage();
        this.keepAliveMessage.setJMSType(KEEP_ALIVE_TYPE);
        this.keepAliveMessage.setStringProperty(SERVICE_NAME, str);
        this.keepAliveMessage.setStringProperty(CHANNEL_NAME, this.channelName);
        this.keepAliveMessage.setBooleanProperty(STARTED_TYPE, true);
        sendKeepAlive();
    }

    @Override // org.codehaus.activemq.service.Service
    public void start() throws JMSException {
        if (this.started.commit(false, true)) {
            this.timeoutExpiration = this.keepAliveTimeout * this.timeoutCount;
            this.channel = new MulticastTransportChannel(new DefaultWireFormat(), this.uri);
            this.channel.setClientID(this.localId);
            this.channel.setPacketListener(this);
            try {
                this.channel.setTimeToLive(getTimeToLive());
                this.channel.start();
                this.runner = new Thread(this);
                this.runner.setName(toString());
                this.runner.setDaemon(true);
                this.runner.setPriority(10);
                this.runner.start();
                sendService();
                sendKeepAlive();
                fireServiceStarted(this.serviceMessage);
            } catch (IOException e) {
                JMSException jMSException = new JMSException("Set time to live failed");
                jMSException.setLinkedException(e);
                throw jMSException;
            }
        }
    }

    @Override // org.codehaus.activemq.service.Service
    public void stop() throws JMSException {
        boolean z;
        synchronized (this.started) {
            z = this.started.get();
            if (z) {
                if (this.keepAliveMessage != null) {
                    this.keepAliveMessage.setBooleanProperty(STARTED_TYPE, false);
                    sendKeepAlive();
                }
                this.channel.stop();
                this.started.set(false);
            }
        }
        if (z) {
            fireServiceStopped(this.serviceMessage);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        int i = 0;
        while (this.started.get()) {
            try {
                sendKeepAlive();
                log.debug(new StringBuffer().append(this.serviceName).append(" sent keep alive").toString());
                i++;
                if (i >= this.timeoutCount) {
                    i = 0;
                    checkNodesAlive();
                }
                Thread.sleep(getKeepAliveTimeout());
            } catch (Throwable th) {
                log.error(new StringBuffer().append(toString()).append(" run failed").toString(), th);
                return;
            }
        }
    }

    @Override // org.codehaus.activemq.message.PacketListener
    public void consume(Packet packet) {
        if (packet != null) {
            try {
                if (packet.isJMSMessage()) {
                    ActiveMQMessage activeMQMessage = (ActiveMQMessage) packet;
                    String stringProperty = activeMQMessage.getStringProperty(CHANNEL_NAME);
                    if (stringProperty != null && stringProperty.equals(this.channelName)) {
                        String jMSType = activeMQMessage.getJMSType();
                        if (jMSType == null) {
                            log.error(new StringBuffer().append(toString()).append(" message type is null").toString());
                        } else if (jMSType.equals(KEEP_ALIVE_TYPE)) {
                            processKeepAlive(activeMQMessage);
                        } else if (jMSType.equals(SERVICE_TYPE)) {
                            processService(activeMQMessage);
                        } else {
                            log.warn(new StringBuffer().append(toString()).append(" received Message of unknown type: ").append(jMSType).toString());
                        }
                    }
                }
            } catch (Throwable th) {
                log.error(new StringBuffer().append(toString()).append(" couldn't process packet: ").append(packet).toString(), th);
                return;
            }
        }
        log.warn(new StringBuffer().append(toString()).append(" received unexpected packet: ").append(packet).toString());
    }

    private void sendKeepAlive() throws JMSException {
        if (!this.started.get() || this.channel == null || this.channel.isPendingStop() || this.keepAliveMessage == null) {
            return;
        }
        this.channel.asyncSend(this.keepAliveMessage);
    }

    private void sendService() throws JMSException {
        if (!this.started.get() || this.channel == null || this.channel.isPendingStop() || this.serviceMessage == null) {
            return;
        }
        this.channel.asyncSend(this.serviceMessage);
    }

    private void processKeepAlive(ActiveMQMessage activeMQMessage) throws JMSException {
        String stringProperty = activeMQMessage.getStringProperty(SERVICE_NAME);
        if (activeMQMessage.getBooleanProperty(STARTED_TYPE)) {
            addService(stringProperty);
        } else {
            removeService(stringProperty);
        }
    }

    private void processService(ActiveMQMessage activeMQMessage) throws JMSException {
        if (activeMQMessage != null) {
            ActiveMQObjectMessage activeMQObjectMessage = (ActiveMQObjectMessage) activeMQMessage;
            String stringProperty = activeMQObjectMessage.getStringProperty(SERVICE_NAME);
            addService(stringProperty);
            ActiveMQObjectMessage activeMQObjectMessage2 = (ActiveMQObjectMessage) this.services.get(stringProperty);
            this.services.put(stringProperty, activeMQObjectMessage);
            if (activeMQObjectMessage2 == null) {
                fireServiceStarted(activeMQObjectMessage);
                sendService();
            }
        }
    }

    private void fireServiceStarted(ActiveMQObjectMessage activeMQObjectMessage) throws JMSException {
        if (activeMQObjectMessage != null) {
            fireAddService(new DiscoveryEvent(this, activeMQObjectMessage.getStringProperty(SERVICE_NAME), (Map) activeMQObjectMessage.getObject()));
        }
    }

    private void fireServiceStopped(ActiveMQObjectMessage activeMQObjectMessage) throws JMSException {
        if (activeMQObjectMessage != null) {
            fireRemoveService(new DiscoveryEvent(this, activeMQObjectMessage.getStringProperty(SERVICE_NAME), (Map) activeMQObjectMessage.getObject()));
        }
    }

    private void addService(String str) {
        long currentTimeMillis = System.currentTimeMillis();
        SynchronizedLong synchronizedLong = (SynchronizedLong) this.keepAliveMap.get(str);
        if (synchronizedLong == null) {
            synchronizedLong = new SynchronizedLong(0L);
            this.keepAliveMap.put(str, synchronizedLong);
        }
        synchronizedLong.set(currentTimeMillis);
    }

    private void removeService(String str) throws JMSException {
        this.keepAliveMap.remove(str);
        ActiveMQObjectMessage activeMQObjectMessage = (ActiveMQObjectMessage) this.services.remove(str);
        if (activeMQObjectMessage != null) {
            fireServiceStopped(activeMQObjectMessage);
        }
    }

    private void checkNodesAlive() throws JMSException {
        long currentTimeMillis = System.currentTimeMillis() - this.timeoutExpiration;
        for (Map.Entry entry : this.keepAliveMap.entrySet()) {
            if (((SynchronizedLong) entry.getValue()).get() < currentTimeMillis) {
                String obj = entry.getKey().toString();
                removeService(obj);
                log.warn(new StringBuffer().append(this.serviceName).append(" Expiring node: ").append(obj).toString());
            }
        }
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$codehaus$activemq$transport$multicast$MulticastDiscoveryAgent == null) {
            cls = class$("org.codehaus.activemq.transport.multicast.MulticastDiscoveryAgent");
            class$org$codehaus$activemq$transport$multicast$MulticastDiscoveryAgent = cls;
        } else {
            cls = class$org$codehaus$activemq$transport$multicast$MulticastDiscoveryAgent;
        }
        log = LogFactory.getLog(cls);
    }
}
