package io.streamthoughts.azkarra.http.sse;

import io.streamthoughts.azkarra.api.model.KV;
import io.streamthoughts.azkarra.serialization.json.Json;
import io.undertow.server.handlers.sse.ServerSentEventConnection;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.Flow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/azkarra/http/sse/ServerSentEventSubscriber.class */
public class ServerSentEventSubscriber<K, V> implements Flow.Subscriber<KV<K, V>> {
    private static final Logger LOG = LoggerFactory.getLogger(ServerSentEventSubscriber.class);
    private final String eventType;
    private final ServerSentEventConnection connection;
    private final String applicationId;
    private Flow.Subscription subscription;
    private final Json json;

    /* loaded from: input_file:io/streamthoughts/azkarra/http/sse/ServerSentEventSubscriber$StreamedEvent.class */
    public static class StreamedEvent {
        private final KV<?, ?> record;

        public static StreamedEvent record(KV<?, ?> kv) {
            return new StreamedEvent(kv);
        }

        public StreamedEvent(KV<?, ?> kv) {
            this.record = kv;
        }

        public KV getRecord() {
            return this.record;
        }
    }

    public ServerSentEventSubscriber(ServerSentEventConnection serverSentEventConnection, String str, String str2, Json json) {
        this.connection = serverSentEventConnection;
        this.eventType = str;
        this.applicationId = str2;
        this.json = json;
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        Objects.requireNonNull(subscription);
        if (this.subscription != null) {
            subscription.cancel();
        } else {
            this.subscription = subscription;
            subscription.request(Long.MAX_VALUE);
        }
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(KV<K, V> kv) {
        Objects.requireNonNull(kv);
        if (this.connection.isOpen()) {
            this.connection.send(this.json.serialize(StreamedEvent.record(kv)), this.eventType, (String) null, new ServerSentEventConnection.EventCallback() { // from class: io.streamthoughts.azkarra.http.sse.ServerSentEventSubscriber.1
                public void done(ServerSentEventConnection serverSentEventConnection, String str, String str2, String str3) {
                    ServerSentEventSubscriber.LOG.debug("Event sent for application='{}', type='{}'", ServerSentEventSubscriber.this.applicationId, ServerSentEventSubscriber.this.eventType);
                }

                public void failed(ServerSentEventConnection serverSentEventConnection, String str, String str2, String str3, IOException iOException) {
                    ServerSentEventSubscriber.LOG.debug("Failed to send event for application='{}', type='{}'", ServerSentEventSubscriber.this.applicationId, ServerSentEventSubscriber.this.eventType);
                }
            });
        } else {
            this.subscription.cancel();
            LOG.info("ServerSentEventConnection was closed. Canceling subscription for application='{}', type='{}'.", this.applicationId, this.eventType);
        }
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        Objects.requireNonNull(th);
        LOG.error("Unexpected error while consuming from . Closing ServerSentEventConnection", th);
        closeConnection();
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        LOG.info("Closing ServerSentEventConnection for application='{}', type='{}'", this.applicationId, this.eventType);
        closeConnection();
    }

    private void closeConnection() {
        this.connection.shutdown();
    }
}
