package group.liquido.databuffer.core;

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.StopWatch;
import cn.hutool.core.lang.Pair;
import group.liquido.databuffer.core.common.BufferFlushListenerWrapper;
import group.liquido.databuffer.core.common.DelegateDataBuffer;
import group.liquido.databuffer.core.epoll.AbstractEventPoller;
import group.liquido.databuffer.core.epoll.PollableEvent;
import group.liquido.databuffer.core.epoll.PollableEventListener;
import group.liquido.databuffer.core.event.DataBufferLayerCloseEvent;
import group.liquido.databuffer.core.event.DataBufferLayerEvent;
import group.liquido.databuffer.core.event.DataBufferLayerOpenEvent;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.ApplicationListener;
import org.springframework.util.Assert;

/* loaded from: input_file:group/liquido/databuffer/core/BufferEventPoller.class */
public class BufferEventPoller extends AbstractEventPoller implements BufferFlushListenerRegistry, ApplicationEventPublisherAware, ApplicationListener<DataBufferLayerEvent> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BufferEventPoller.class);
    private final BufferStore bufferStore;
    private final Map<String, BufferFlushListenerWrapper> registeredListenerMap;
    private final Map<String, PollableEvent> pollableEventMap;
    private final ExecutorService executorService;
    private BufferFlushEventFactory bufferFlushEventFactory;
    private ApplicationEventPublisher eventPublisher;
    private long maxWaitForFlushing;

    /* loaded from: input_file:group/liquido/databuffer/core/BufferEventPoller$BufferEventPublisher.class */
    static class BufferEventPublisher<T> implements PollableEventListener {
        private final String bufferKey;
        private final BufferFlushEventFactory bufferFlushEventFactory;
        private final ApplicationEventPublisher eventPublisher;
        private DataBuffer dataBuffer;

        BufferEventPublisher(String str, BufferFlushEventFactory bufferFlushEventFactory, ApplicationEventPublisher applicationEventPublisher) {
            this.bufferKey = str;
            this.bufferFlushEventFactory = bufferFlushEventFactory;
            this.eventPublisher = applicationEventPublisher;
        }

        @Override // group.liquido.databuffer.core.epoll.PollableEventListener
        public void onEventReady(PollableEvent pollableEvent) {
            DataBuffer ofKeyCollection = DelegateDataBuffer.ofKeyCollection(this.bufferKey, (Collection) pollableEvent.getEventSource());
            this.dataBuffer = ofKeyCollection;
            this.eventPublisher.publishEvent(this.bufferFlushEventFactory.createBufferFlushEvent(ofKeyCollection));
        }

        protected DataBuffer getDataBuffer() {
            return this.dataBuffer;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:group/liquido/databuffer/core/BufferEventPoller$BufferPollingEvent.class */
    public static class BufferPollingEvent extends PollableEvent {
        private final BufferStore bufferStore;
        private final int bufferSize;
        private final String bufferKey;
        private final Class<?> fetchType;
        private final long maxWaitForFlushing;
        private final AtomicInteger bufferCounter = new AtomicInteger(0);
        private StopWatch stopWatch;

        BufferPollingEvent(BufferStore bufferStore, long j, String str, Class<?> cls) {
            this.bufferStore = bufferStore;
            this.bufferSize = bufferStore.getBufferSize();
            this.bufferKey = str;
            this.fetchType = cls;
            this.maxWaitForFlushing = j;
            resumeStopWatch();
        }

        @Override // group.liquido.databuffer.core.epoll.PollableEvent
        public boolean isReady() {
            int countBufferItem = this.bufferStore.countBufferItem(this.bufferKey);
            if (countBufferItem > 0 && countBufferItem < this.bufferSize) {
                this.stopWatch.stop();
                long totalTimeMillis = this.stopWatch.getTotalTimeMillis();
                this.stopWatch.start(this.bufferKey);
                if (totalTimeMillis >= this.maxWaitForFlushing) {
                    BufferEventPoller.LOGGER.info("BufferPollingEvent isReady the waiting time {} has exceeded the maximum waiting time {}, current count is {} event will be ready now", new Object[]{Long.valueOf(totalTimeMillis), Long.valueOf(this.maxWaitForFlushing), Integer.valueOf(countBufferItem)});
                    resumeStopWatch();
                    this.bufferCounter.addAndGet(countBufferItem);
                    return true;
                }
            }
            this.bufferCounter.addAndGet(countBufferItem);
            return countBufferItem >= this.bufferSize;
        }

        private void resumeStopWatch() {
            this.stopWatch = StopWatch.create(this.bufferKey);
            this.stopWatch.start(this.bufferKey);
        }

        @Override // group.liquido.databuffer.core.epoll.PollableEvent
        public void commit() {
            BufferEventPoller.LOGGER.info("BufferPollingEvent commit fake event submission, event will be reconsume in next ready state, already consumed buffer count {}, ", Integer.valueOf(this.bufferCounter.get()));
        }

        public void destroy() {
            super.commit();
            BufferEventPoller.LOGGER.info("BufferPollingEvent destroy event has been committed, already consumed buffer count {}", Integer.valueOf(this.bufferCounter.get()));
        }

        @Override // group.liquido.databuffer.core.epoll.PollableEvent
        public Object getEventSource() {
            return this.bufferStore.fetchBuffers(this.bufferKey, this.fetchType);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:group/liquido/databuffer/core/BufferEventPoller$DelegateBufferEventListener.class */
    public static class DelegateBufferEventListener<T> extends BufferEventPublisher<T> {
        private final BufferFlushListener bufferFlushListener;

        DelegateBufferEventListener(String str, BufferFlushEventFactory bufferFlushEventFactory, ApplicationEventPublisher applicationEventPublisher, BufferFlushListener bufferFlushListener) {
            super(str, bufferFlushEventFactory, applicationEventPublisher);
            this.bufferFlushListener = bufferFlushListener;
        }

        @Override // group.liquido.databuffer.core.BufferEventPoller.BufferEventPublisher, group.liquido.databuffer.core.epoll.PollableEventListener
        public void onEventReady(PollableEvent pollableEvent) {
            super.onEventReady(pollableEvent);
            this.bufferFlushListener.onBufferFlush(getDataBuffer());
            pollableEvent.commit();
        }
    }

    public BufferEventPoller(BufferStore bufferStore) {
        this(bufferStore, null);
    }

    public BufferEventPoller(BufferStore bufferStore, ExecutorService executorService) {
        this.registeredListenerMap = new ConcurrentHashMap();
        this.pollableEventMap = new ConcurrentHashMap();
        Assert.notNull(bufferStore, "BufferEventPoller bufferStore must not null");
        this.bufferStore = bufferStore;
        this.executorService = executorService;
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.eventPublisher = applicationEventPublisher;
    }

    public void onApplicationEvent(DataBufferLayerEvent dataBufferLayerEvent) {
        if (dataBufferLayerEvent instanceof DataBufferLayerOpenEvent) {
            LOGGER.info("BufferEventPoller on data buffer layer open event...");
            DataBufferLayer dataBufferLayer = ((DataBufferLayerOpenEvent) dataBufferLayerEvent).getDataBufferLayer();
            this.bufferFlushEventFactory = dataBufferLayer.getBufferFlushEventFactory();
            this.maxWaitForFlushing = dataBufferLayer.getMaxWaitForFlushing();
            return;
        }
        if (dataBufferLayerEvent instanceof DataBufferLayerCloseEvent) {
            LOGGER.info("BufferEventPoller on data buffer layer close event...");
            flushRemainsBuffers();
        }
    }

    private <T> PollableEvent createPollableEvent(String str, Class<T> cls) {
        return new BufferPollingEvent(this.bufferStore, this.maxWaitForFlushing, str, cls);
    }

    private <T> PollableEventListener createPollableEventListener(String str, Class<T> cls, BufferFlushListener bufferFlushListener) {
        return new DelegateBufferEventListener(str, this.bufferFlushEventFactory, this.eventPublisher, bufferFlushListener);
    }

    public void flushRemainsBuffers() {
        if (CollectionUtil.isEmpty(this.registeredListenerMap)) {
            return;
        }
        Iterator<PollableEvent> it = this.pollableEventMap.values().iterator();
        while (it.hasNext()) {
            ((BufferPollingEvent) it.next()).destroy();
        }
        Iterator<String> it2 = this.registeredListenerMap.keySet().iterator();
        while (it2.hasNext()) {
            doFlushRemainBuffers(it2.next());
        }
        this.bufferStore.clearBufferBuckets();
    }

    private void doFlushRemainBuffers(String str) {
        BufferFlushListenerWrapper bufferFlushListenerWrapper = this.registeredListenerMap.get(str);
        List fetchAll = this.bufferStore.fetchAll(str, bufferFlushListenerWrapper.getBufferType());
        if (CollectionUtil.isNotEmpty(fetchAll)) {
            Iterator it = fetchAll.iterator();
            while (it.hasNext()) {
                publishAndFlush(str, (Collection) it.next(), bufferFlushListenerWrapper);
            }
        }
    }

    private <T> void publishAndFlush(String str, Collection<T> collection, BufferFlushListener bufferFlushListener) {
        DataBuffer ofKeyCollection = DelegateDataBuffer.ofKeyCollection(str, collection);
        this.eventPublisher.publishEvent(this.bufferFlushEventFactory.createBufferFlushEvent(ofKeyCollection));
        bufferFlushListener.onBufferFlush(ofKeyCollection);
    }

    @Override // group.liquido.databuffer.core.BufferFlushListenerRegistry
    public <T> void registerListener(String str, BufferFlushListener bufferFlushListener, Class<T> cls) {
        if (null == this.bufferFlushEventFactory) {
            throw new IllegalStateException("BufferEventPoller registerListener bufferFlushEventFactory is not ready yet, make sure you have already opened a DataBufferLayer");
        }
        if (containsListener(str)) {
            LOGGER.warn("BufferEventPoller registerListener listener with buffer key {} is already registered", str);
            return;
        }
        PollableEvent createPollableEvent = createPollableEvent(str, cls);
        registerEvent(createPollableEvent, createPollableEventListener(str, cls, bufferFlushListener));
        this.registeredListenerMap.put(str, new BufferFlushListenerWrapper(str, cls, bufferFlushListener));
        this.pollableEventMap.put(str, createPollableEvent);
    }

    @Override // group.liquido.databuffer.core.BufferFlushListenerRegistry
    public void unregisterListener(String str) {
        this.registeredListenerMap.remove(str);
        PollableEvent remove = this.pollableEventMap.remove(str);
        if (remove != null) {
            if (remove.isReady()) {
                doFlushRemainBuffers(str);
            }
            ((BufferPollingEvent) remove).destroy();
        }
    }

    @Override // group.liquido.databuffer.core.BufferFlushListenerRegistry
    public boolean containsListener(String str) {
        return this.registeredListenerMap.containsKey(str);
    }

    @Override // group.liquido.databuffer.core.BufferFlushListenerRegistry
    public Set<String> getRegisteredBufferKeys() {
        return this.registeredListenerMap.keySet();
    }

    @Override // group.liquido.databuffer.core.BufferFlushListenerRegistry
    public BufferFlushListener getRegisteredListener(String str) {
        return this.registeredListenerMap.get(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // group.liquido.databuffer.core.epoll.AbstractEventPoller
    public ExecutorService getExecutorService() {
        return (ExecutorService) Objects.requireNonNullElse(this.executorService, super.getExecutorService());
    }

    @Override // group.liquido.databuffer.core.epoll.AbstractEventPoller
    protected void beforeShutdown(List<Pair<PollableEvent, PollableEventListener>> list) {
        flushRemainsBuffers();
    }
}
