package com.canoo.dp.impl.server.event;

import com.canoo.dp.impl.platform.core.Assert;
import com.canoo.platform.core.functional.Subscription;
import com.canoo.platform.remoting.server.event.MessageListener;
import com.canoo.platform.remoting.server.event.Topic;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Message;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/* loaded from: input_file:com/canoo/dp/impl/server/event/DistributedEventBus.class */
public class DistributedEventBus extends AbstractEventBus {
    private final HazelcastInstance hazelcastClient;
    private final Map<String, String> iTopicRegistrations = new ConcurrentHashMap();
    private final Map<String, Integer> iTopicCount = new ConcurrentHashMap();
    private final Lock hazelcastEventPipeLock = new ReentrantLock();

    public DistributedEventBus(HazelcastInstance hazelcastInstance) {
        this.hazelcastClient = (HazelcastInstance) Assert.requireNonNull(hazelcastInstance, "hazelcastClient");
    }

    protected <T extends Serializable> void publishForOtherSessions(DolphinEvent<T> dolphinEvent) {
        Assert.requireNonNull(dolphinEvent, "event");
        toHazelcastTopic(dolphinEvent.getTopic()).publish(dolphinEvent);
    }

    public <T extends Serializable> Subscription subscribe(Topic<T> topic, MessageListener<? super T> messageListener) {
        final Subscription subscribe = super.subscribe(topic, messageListener);
        final Subscription createHazelcastSubscription = createHazelcastSubscription(topic);
        return new Subscription() { // from class: com.canoo.dp.impl.server.event.DistributedEventBus.1
            public void unsubscribe() {
                createHazelcastSubscription.unsubscribe();
                subscribe.unsubscribe();
            }
        };
    }

    private <T extends Serializable> Subscription createHazelcastSubscription(final Topic<T> topic) {
        this.hazelcastEventPipeLock.lock();
        try {
            Assert.requireNonNull(toHazelcastTopic(topic), "hazelcastTopic");
            Integer num = this.iTopicCount.get(topic.getName());
            if (num == null || num.intValue() == 0) {
                registerHazelcastEventPipe(topic);
            } else {
                this.iTopicCount.put(topic.getName(), Integer.valueOf(num.intValue() + 1));
            }
            Subscription subscription = new Subscription() { // from class: com.canoo.dp.impl.server.event.DistributedEventBus.2
                public void unsubscribe() {
                    Integer num2 = (Integer) DistributedEventBus.this.iTopicCount.get(topic.getName());
                    if (num2.intValue() > 1) {
                        DistributedEventBus.this.iTopicCount.put(topic.getName(), Integer.valueOf(num2.intValue() - 1));
                    } else {
                        DistributedEventBus.this.unregisterHazelcastEventPipe(topic);
                    }
                }
            };
            this.hazelcastEventPipeLock.unlock();
            return subscription;
        } catch (Throwable th) {
            this.hazelcastEventPipeLock.unlock();
            throw th;
        }
    }

    private <T extends Serializable> void registerHazelcastEventPipe(Topic<T> topic) {
        this.hazelcastEventPipeLock.lock();
        try {
            ITopic<DolphinEvent<T>> hazelcastTopic = toHazelcastTopic(topic);
            Assert.requireNonNull(hazelcastTopic, "hazelcastTopic");
            String addMessageListener = hazelcastTopic.addMessageListener(new com.hazelcast.core.MessageListener<DolphinEvent<T>>() { // from class: com.canoo.dp.impl.server.event.DistributedEventBus.3
                public void onMessage(Message<DolphinEvent<T>> message) {
                    DistributedEventBus.this.triggerEventHandling((DolphinEvent) message.getMessageObject());
                }
            });
            Assert.requireNonBlank(addMessageListener, "registrationId");
            this.iTopicRegistrations.put(hazelcastTopic.getName(), addMessageListener);
            this.iTopicCount.put(hazelcastTopic.getName(), 1);
            this.hazelcastEventPipeLock.unlock();
        } catch (Throwable th) {
            this.hazelcastEventPipeLock.unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T extends Serializable> void unregisterHazelcastEventPipe(Topic<T> topic) {
        this.hazelcastEventPipeLock.lock();
        try {
            ITopic<DolphinEvent<T>> hazelcastTopic = toHazelcastTopic(topic);
            Assert.requireNonNull(hazelcastTopic, "hazelcastTopic");
            Integer num = this.iTopicCount.get(hazelcastTopic.getName());
            if (num == null || num.intValue() != 1) {
                throw new IllegalStateException("Count for topic " + topic.getName() + " is wrong: " + num);
            }
            String str = this.iTopicRegistrations.get(hazelcastTopic.getName());
            Assert.requireNonBlank(str, "registrationId");
            hazelcastTopic.removeMessageListener(str);
            this.iTopicRegistrations.remove(hazelcastTopic.getName());
            this.iTopicCount.remove(hazelcastTopic.getName());
            this.hazelcastEventPipeLock.unlock();
        } catch (Throwable th) {
            this.hazelcastEventPipeLock.unlock();
            throw th;
        }
    }

    private <T extends Serializable> ITopic<DolphinEvent<T>> toHazelcastTopic(Topic<T> topic) {
        return this.hazelcastClient.getTopic(topic.getName());
    }
}
