package io.streamthoughts.azkarra.http.sse;

import io.streamthoughts.azkarra.api.AzkarraStreamsService;
import io.streamthoughts.azkarra.api.errors.NotFoundException;
import io.streamthoughts.azkarra.api.events.reactive.EventStreamPublisher;
import io.streamthoughts.azkarra.http.handler.WithApplication;
import io.streamthoughts.azkarra.serialization.json.Json;
import io.undertow.server.handlers.sse.ServerSentEventConnection;
import io.undertow.server.handlers.sse.ServerSentEventConnectionCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/azkarra/http/sse/EventStreamConnectionCallback.class */
public class EventStreamConnectionCallback implements ServerSentEventConnectionCallback {
    private static final Logger LOG = LoggerFactory.getLogger(EventStreamConnectionCallback.class);
    private final AzkarraStreamsService service;
    private final Json json;

    public EventStreamConnectionCallback(AzkarraStreamsService azkarraStreamsService, Json json) {
        this.service = azkarraStreamsService;
        this.json = json;
    }

    public void connected(ServerSentEventConnection serverSentEventConnection, String str) {
        String parameter = serverSentEventConnection.getParameter(WithApplication.QUERY_PARAM_ID);
        String parameter2 = serverSentEventConnection.getParameter("event");
        LOG.info("ServerSentEventConnection established. Subscribe to event-stream for application='{}', type='{}'", parameter, parameter2);
        try {
            EventStreamPublisher eventStreamPublisherForType = this.service.getStreamsById(parameter).eventStreamPublisherForType(parameter2);
            if (eventStreamPublisherForType == null) {
                serverSentEventConnection.shutdown();
            } else {
                eventStreamPublisherForType.subscribe(new ServerSentEventSubscriber(serverSentEventConnection, eventStreamPublisherForType.type(), parameter, this.json));
            }
        } catch (NotFoundException e) {
            LOG.error(e.getMessage());
            serverSentEventConnection.shutdown();
        }
    }
}
