package fun.pplm.framework.poplar.event.hub.service;

import fun.pplm.framework.poplar.common.runnable.LineWorker;
import fun.pplm.framework.poplar.common.utils.ThreadPoolUtil;
import fun.pplm.framework.poplar.common.utils.Uuid;
import fun.pplm.framework.poplar.event.hub.config.EventHubConfig;
import fun.pplm.framework.poplar.event.hub.model.Event;
import fun.pplm.framework.poplar.event.hub.psi.EventPublisherPsi;
import fun.pplm.framework.poplar.event.hub.psi.EventSubscriberPsi;
import fun.pplm.framework.poplar.json.Json;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.stream.IntStream;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;

@Service
@ConditionalOnExpression("${poplar.event.hub.enabled:true}")
/* loaded from: input_file:fun/pplm/framework/poplar/event/hub/service/EventHubService.class */
public class EventHubService extends LineWorker implements EventPublisherPsi {
    private static Logger logger = LoggerFactory.getLogger(EventHubService.class);

    @Autowired(required = false)
    private EventHubConfig eventHubConfig;

    @Autowired(required = false)
    private volatile List<EventSubscriberPsi> eventSubscribers;
    private BlockingQueue<Event> blockingQueue;
    private ExecutorService subscriberExecuterService;
    private final ReentrantLock lock;

    public EventHubService() {
        super(0L);
        this.eventSubscribers = new CopyOnWriteArrayList();
        this.lock = new ReentrantLock();
    }

    @PostConstruct
    protected void autoInit() {
        if (this.eventHubConfig.getAutoInit().booleanValue()) {
            init(this.eventHubConfig);
        }
    }

    public void init(EventHubConfig eventHubConfig) {
        logger.debug("初始化事件中心服务开始...");
        this.eventHubConfig = eventHubConfig;
        this.blockingQueue = new PriorityBlockingQueue(eventHubConfig.getCapacity().intValue(), new EventPriorityComparator());
        startup();
        logger.debug("初始化事件中心服务完成");
    }

    public synchronized void startup() {
        if (running()) {
            return;
        }
        this.subscriberExecuterService = ThreadPoolUtil.newThreadPool(this.eventHubConfig.getSubThreadPool());
        super.startup();
    }

    public synchronized void shutdown() {
        if (running()) {
            ThreadPoolUtil.destoryThreadPool(this.subscriberExecuterService, this.eventHubConfig.getSubThreadPool());
            super.shutdown();
        }
    }

    @Override // fun.pplm.framework.poplar.event.hub.psi.EventPublisherPsi
    public Event publish(int i) {
        return publish(i, null);
    }

    @Override // fun.pplm.framework.poplar.event.hub.psi.EventPublisherPsi
    public Event publish(int i, Object obj) {
        Event event = new Event();
        event.setType(Integer.valueOf(i));
        event.setContent(obj);
        if (publish(event)) {
            return event;
        }
        return null;
    }

    @Override // fun.pplm.framework.poplar.event.hub.psi.EventPublisherPsi
    public Event publish(int i, Object obj, int i2) {
        Event event = new Event();
        event.setType(Integer.valueOf(i));
        event.setContent(obj);
        event.setPriority(i2);
        if (publish(event)) {
            return event;
        }
        return null;
    }

    @Override // fun.pplm.framework.poplar.event.hub.psi.EventPublisherPsi
    public boolean publish(Event event) {
        if (!running()) {
            throw new RuntimeException("事件中心服务没启动");
        }
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            if (StringUtils.isBlank(event.getId())) {
                event.setId(Uuid.uuid32());
            }
            if (this.blockingQueue.size() >= this.eventHubConfig.getCapacity().intValue()) {
                logger.warn("事件队列已满，无法发布新的事件, event: {}", Json.string(event));
                reentrantLock.unlock();
                return false;
            }
            if (!this.blockingQueue.offer(event)) {
                reentrantLock.unlock();
                return false;
            }
            logger.debug("事件发布成功, event: {}", Json.string(event));
            reentrantLock.unlock();
            return true;
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    protected void execute() {
        try {
            Event poll = this.blockingQueue.poll(1000L, TimeUnit.MILLISECONDS);
            if (poll != null) {
                logger.debug("事件到达，处理订阅, event: {}", Json.string(poll));
                for (EventSubscriberPsi eventSubscriberPsi : this.eventSubscribers) {
                    int[] subTypes = eventSubscriberPsi.subTypes();
                    if (subTypes == null || subTypes.length <= 0) {
                        logger.warn("事件订阅者订阅的事件类型为空, class: {}", eventSubscriberPsi.getClass().getCanonicalName());
                    } else {
                        IntStream stream = Arrays.stream(eventSubscriberPsi.subTypes());
                        Integer type = poll.getType();
                        type.getClass();
                        if (stream.anyMatch((v1) -> {
                            return r1.equals(v1);
                        })) {
                            this.subscriberExecuterService.submit(() -> {
                                eventSubscriberPsi.arrived(poll);
                            });
                        }
                    }
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

    public void addSubscriber(EventSubscriberPsi eventSubscriberPsi) {
        this.eventSubscribers.add(eventSubscriberPsi);
    }

    public void addSubscribers(Collection<EventSubscriberPsi> collection) {
        collection.addAll(collection);
    }

    public void removeSubscriber(EventSubscriberPsi eventSubscriberPsi) {
        this.eventSubscribers.remove(eventSubscriberPsi);
    }

    public void removeSubscriber(int i) {
        this.eventSubscribers.remove(i);
    }

    public void removeIf(Predicate<? super EventSubscriberPsi> predicate) {
        this.eventSubscribers.removeIf(predicate);
    }

    public void removeAll(Collection<EventSubscriberPsi> collection) {
        collection.removeAll(collection);
    }

    public int size() {
        return this.blockingQueue.size();
    }
}
