package cloud.prefab.sse;

import cloud.prefab.sse.events.CommentEvent;
import cloud.prefab.sse.events.DataEvent;
import cloud.prefab.sse.events.Event;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cloud/prefab/sse/SSEHandlerTest.class */
class SSEHandlerTest {
    SubmissionPublisher<String> submissionPublisher = new SubmissionPublisher<>();
    SSEHandler sseHandler = new SSEHandler();
    EndSubscriber endSubscriber = new EndSubscriber();

    /* loaded from: input_file:cloud/prefab/sse/SSEHandlerTest$EndSubscriber.class */
    private static class EndSubscriber implements Flow.Subscriber<Event> {
        private static final Logger LOG = LoggerFactory.getLogger(EndSubscriber.class);
        private Flow.Subscription subscription;
        private final CopyOnWriteArrayList<Event> events = new CopyOnWriteArrayList<>();
        private final AtomicBoolean isComplete = new AtomicBoolean(false);
        private final AtomicReference<Throwable> throwableAtomicReference = new AtomicReference<>();

        private EndSubscriber() {
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            this.subscription.request(1L);
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onNext(Event event) {
            this.events.add(event);
            LOG.info("Received event {}, Now have {} events", event, Integer.valueOf(this.events.size()));
            this.subscription.request(1L);
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onError(Throwable th) {
            this.throwableAtomicReference.set(th);
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onComplete() {
            this.isComplete.set(true);
        }
    }

    SSEHandlerTest() {
    }

    @BeforeEach
    void setup() {
        this.submissionPublisher.subscribe(this.sseHandler);
        this.sseHandler.subscribe(this.endSubscriber);
    }

    @Test
    void itHandlesComments() {
        this.submissionPublisher.submit(":foobar\n");
        Awaitility.await().atMost(3L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assertions.assertThat(this.endSubscriber.events).hasSize(1).containsOnly(new Event[]{new CommentEvent("foobar")});
        });
    }

    @Test
    void itPublishesEventWithDefaultNameForData() throws InterruptedException {
        this.submissionPublisher.submit("data: hello\n");
        this.submissionPublisher.submit("\n");
        Awaitility.await().atMost(3L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assertions.assertThat(this.endSubscriber.events).hasSize(1).containsOnly(new Event[]{new DataEvent("message", "hello\n", "")});
        });
    }

    @Test
    void itPublishesEventWithGivenNameForData() throws InterruptedException {
        this.submissionPublisher.submit("event: coolEvent");
        this.submissionPublisher.submit("data: hello\n");
        this.submissionPublisher.submit("\n");
        Awaitility.await().atMost(3L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assertions.assertThat(this.endSubscriber.events).hasSize(1).containsOnly(new Event[]{new DataEvent("coolEvent", "hello\n", "")});
        });
    }

    @Test
    void itPublishesEventWithGivenNameForDataAndLastEventId() throws InterruptedException {
        this.submissionPublisher.submit("event: coolEvent");
        this.submissionPublisher.submit("id: 101A");
        this.submissionPublisher.submit("data: hello\n");
        this.submissionPublisher.submit("\n");
        Awaitility.await().atMost(3L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assertions.assertThat(this.endSubscriber.events).hasSize(1).containsOnly(new Event[]{new DataEvent("coolEvent", "hello\n", "101A")});
        });
    }

    @Test
    void itPublishesEventWithGivenNameForMultiLineDataAndLastEventId() throws InterruptedException {
        this.submissionPublisher.submit("event: coolEvent");
        this.submissionPublisher.submit("id: 101A");
        this.submissionPublisher.submit("data: hello\n");
        this.submissionPublisher.submit("data: world\n");
        this.submissionPublisher.submit("data: !\n");
        this.submissionPublisher.submit("\n");
        Awaitility.await().atMost(3L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assertions.assertThat(this.endSubscriber.events).hasSize(1).containsOnly(new Event[]{new DataEvent("coolEvent", "hello\nworld\n!\n", "101A")});
        });
    }

    @Test
    void itPublishesEventWithGivenNameForDataAndIgnoresNullEventId() throws InterruptedException {
        this.submissionPublisher.submit("event: coolEvent");
        this.submissionPublisher.submit("id: ��");
        this.submissionPublisher.submit("data: hello\n");
        this.submissionPublisher.submit("\n");
        Awaitility.await().atMost(3L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assertions.assertThat(this.endSubscriber.events).hasSize(1).containsOnly(new Event[]{new DataEvent("coolEvent", "hello\n", "")});
        });
    }

    @Test
    void itPropagatesClose() throws InterruptedException {
        this.submissionPublisher.close();
        Awaitility.await().atMost(3L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assertions.assertThat(this.endSubscriber.isComplete.get()).isTrue();
        });
    }

    @Test
    void itPropagatesError() throws InterruptedException {
        RuntimeException runtimeException = new RuntimeException("closing exceptionally!");
        this.submissionPublisher.closeExceptionally(runtimeException);
        Awaitility.await().atMost(3L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assertions.assertThat(this.endSubscriber.throwableAtomicReference.get()).isEqualTo(runtimeException);
        });
    }
}
