package ch.rasc.sse.eventbus;

import ch.rasc.sse.eventbus.config.SseEventBusConfigurer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.PreDestroy;
import org.springframework.context.event.EventListener;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

/* loaded from: input_file:ch/rasc/sse/eventbus/SseEventBus.class */
public class SseEventBus {
    private final Map<String, Client> clients = new ConcurrentHashMap();
    private final Map<String, Set<String>> eventSubscribers = new ConcurrentHashMap();
    private final ScheduledExecutorService taskScheduler;
    private final int noOfSendResponseTries;
    private final Duration clientExpiration;
    private List<DataObjectConverter> dataObjectConverters;
    private final BlockingQueue<ClientEvent> errorQueue;
    private final BlockingQueue<ClientEvent> sendQueue;

    public SseEventBus(SseEventBusConfigurer sseEventBusConfigurer) {
        this.taskScheduler = sseEventBusConfigurer.taskScheduler();
        this.noOfSendResponseTries = sseEventBusConfigurer.noOfSendResponseTries();
        this.clientExpiration = sseEventBusConfigurer.clientExpiration();
        this.errorQueue = sseEventBusConfigurer.errorQueue();
        this.sendQueue = sseEventBusConfigurer.sendQueue();
        this.taskScheduler.submit(this::eventLoop);
        this.taskScheduler.scheduleWithFixedDelay(this::reScheduleFailedEvents, 0L, sseEventBusConfigurer.schedulerDelay().toMillis(), TimeUnit.MILLISECONDS);
        this.taskScheduler.scheduleAtFixedRate(this::cleanUpClients, 0L, this.clientExpiration.toMillis(), TimeUnit.MILLISECONDS);
    }

    @PreDestroy
    public void cleanUp() {
        this.taskScheduler.shutdownNow();
    }

    public SseEmitter createSseEmitter(String str) {
        return createSseEmitter(str, (Long) 180000L, new String[0]);
    }

    public SseEmitter createSseEmitter(String str, String... strArr) {
        return createSseEmitter(str, 180000L, false, strArr);
    }

    public SseEmitter createSseEmitter(String str, boolean z, String... strArr) {
        return createSseEmitter(str, 180000L, z, strArr);
    }

    public SseEmitter createSseEmitter(String str, Long l, String... strArr) {
        return createSseEmitter(str, l, false, strArr);
    }

    public SseEmitter createSseEmitter(String str, Long l, boolean z, String... strArr) {
        SseEmitter sseEmitter = new SseEmitter(l);
        sseEmitter.getClass();
        sseEmitter.onTimeout(sseEmitter::complete);
        registerClient(str, sseEmitter);
        if (strArr != null && strArr.length > 0) {
            if (z) {
                unsubscribeFromAllEvents(str, strArr);
            }
            for (String str2 : strArr) {
                subscribe(str, str2);
            }
        }
        return sseEmitter;
    }

    public void registerClient(String str, SseEmitter sseEmitter) {
        Client client = this.clients.get(str);
        if (client == null) {
            this.clients.put(str, new Client(str, sseEmitter));
        } else {
            client.updateEmitter(sseEmitter);
        }
    }

    public void unregisterClient(String str) {
        unsubscribeFromAllEvents(str, new String[0]);
        this.clients.remove(str);
    }

    public void subscribe(String str) {
        subscribe(str, SseEvent.DEFAULT_EVENT);
    }

    public void subscribe(String str, String str2) {
        this.eventSubscribers.computeIfAbsent(str2, str3 -> {
            return new HashSet();
        }).add(str);
    }

    public void subscribeOnly(String str, String str2) {
        this.eventSubscribers.computeIfAbsent(str2, str3 -> {
            return new HashSet();
        }).add(str);
        unsubscribeFromAllEvents(str, str2);
    }

    public void unsubscribe(String str, String str2) {
        Set<String> set = this.eventSubscribers.get(str2);
        if (set != null) {
            set.remove(str);
            if (set.isEmpty()) {
                this.eventSubscribers.remove(str2);
            }
        }
    }

    public void unsubscribeFromAllEvents(String str, String... strArr) {
        HashSet hashSet = null;
        if (strArr != null && strArr.length > 0) {
            hashSet = new HashSet();
            for (String str2 : strArr) {
                hashSet.add(str2);
            }
        }
        HashSet hashSet2 = new HashSet();
        for (Map.Entry<String, Set<String>> entry : this.eventSubscribers.entrySet()) {
            if (hashSet == null || !hashSet.contains(entry.getKey())) {
                Set<String> value = entry.getValue();
                value.remove(str);
                if (value.isEmpty()) {
                    hashSet2.add(entry.getKey());
                }
            }
        }
        Map<String, Set<String>> map = this.eventSubscribers;
        map.getClass();
        hashSet2.forEach((v1) -> {
            r1.remove(v1);
        });
    }

    @EventListener
    public void handleEvent(SseEvent sseEvent) {
        try {
            String convertObject = sseEvent.data() instanceof String ? null : convertObject(sseEvent);
            if (sseEvent.clientIds().isEmpty()) {
                for (Client client : this.clients.values()) {
                    if (!sseEvent.excludeClientIds().contains(client.getId()) && isUserSubscribed(client.getId(), sseEvent)) {
                        this.sendQueue.put(new ClientEvent(client, sseEvent, convertObject));
                    }
                }
            } else {
                for (String str : sseEvent.clientIds()) {
                    if (isUserSubscribed(str, sseEvent)) {
                        this.sendQueue.put(new ClientEvent(this.clients.get(str), sseEvent, convertObject));
                    }
                }
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private void reScheduleFailedEvents() {
        ArrayList<ClientEvent> arrayList = new ArrayList();
        this.errorQueue.drainTo(arrayList);
        for (ClientEvent clientEvent : arrayList) {
            if (isUserSubscribed(clientEvent.getClient().getId(), clientEvent.getSseEvent())) {
                try {
                    this.sendQueue.put(clientEvent);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    private boolean isUserSubscribed(String str, SseEvent sseEvent) {
        Set<String> set = this.eventSubscribers.get(sseEvent.event());
        if (set != null) {
            return set.contains(str);
        }
        return false;
    }

    private void eventLoop() {
        while (true) {
            try {
                ClientEvent take = this.sendQueue.take();
                if (take.getErrorCounter() < this.noOfSendResponseTries) {
                    Client client = take.getClient();
                    if (sendEventToClient(take)) {
                        client.updateLastTransfer();
                    } else {
                        take.incErrorCounter();
                        this.errorQueue.put(take);
                    }
                } else {
                    unregisterClient(take.getClient().getId());
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private static boolean sendEventToClient(ClientEvent clientEvent) {
        Client client = clientEvent.getClient();
        try {
            client.sseEmitter().send(clientEvent.createSseEventBuilder());
            return true;
        } catch (Exception e) {
            client.sseEmitter().completeWithError(e);
            return false;
        }
    }

    private String convertObject(SseEvent sseEvent) {
        if (this.dataObjectConverters == null) {
            return null;
        }
        for (DataObjectConverter dataObjectConverter : this.dataObjectConverters) {
            if (dataObjectConverter.supports(sseEvent)) {
                return dataObjectConverter.convert(sseEvent);
            }
        }
        return null;
    }

    private void cleanUpClients() {
        if (this.clients.isEmpty()) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis() - this.clientExpiration.toMillis();
        HashSet hashSet = new HashSet();
        for (Map.Entry<String, Client> entry : this.clients.entrySet()) {
            if (entry.getValue().lastTransfer() < currentTimeMillis) {
                hashSet.add(entry.getKey());
            }
        }
        hashSet.forEach(this::unregisterClient);
    }

    public List<DataObjectConverter> getDataObjectConverters() {
        return this.dataObjectConverters;
    }

    public void setDataObjectConverters(List<DataObjectConverter> list) {
        this.dataObjectConverters = list;
    }
}
