package cloud.prefab.client.internal;

import cloud.prefab.domain.Prefab;
import cloud.prefab.sse.SSEHandler;
import cloud.prefab.sse.events.DataEvent;
import cloud.prefab.sse.events.Event;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.Base64;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cloud/prefab/client/internal/SseConfigStreamingSubscriber.class */
public class SseConfigStreamingSubscriber {
    private static final Logger LOG = LoggerFactory.getLogger(SseConfigStreamingSubscriber.class);
    private final PrefabHttpClient prefabHttpClient;
    private final Supplier<Long> highwaterMarkSupplier;
    private final Consumer<Prefab.Configs> configsConsumer;
    private final ScheduledExecutorService scheduledExecutorService;

    /* loaded from: input_file:cloud/prefab/client/internal/SseConfigStreamingSubscriber$FlowSubscriber.class */
    static class FlowSubscriber implements Flow.Subscriber<Event> {
        private final Consumer<Prefab.Configs> configConsumer;
        private final Consumer<Boolean> restartHandler;
        private Flow.Subscription subscription;
        private final AtomicBoolean hasReceivedData = new AtomicBoolean(false);

        FlowSubscriber(Consumer<Prefab.Configs> consumer, Consumer<Boolean> consumer2) {
            this.configConsumer = consumer;
            this.restartHandler = consumer2;
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            subscription.request(1L);
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onNext(Event event) {
            if (event instanceof DataEvent) {
                DataEvent dataEvent = (DataEvent) event;
                try {
                    this.configConsumer.accept(Prefab.Configs.parseFrom(Base64.getDecoder().decode(dataEvent.getData().trim())));
                    this.hasReceivedData.setOpaque(true);
                } catch (InvalidProtocolBufferException e) {
                    SseConfigStreamingSubscriber.LOG.warn("Error parsing configs from event name {} - error message {}", dataEvent.getEventName(), e.getMessage());
                }
            }
            this.subscription.request(1L);
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onError(Throwable th) {
            SseConfigStreamingSubscriber.LOG.info("Unexpected error encountered", th);
            this.restartHandler.accept(Boolean.valueOf(getHasReceivedData()));
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onComplete() {
            SseConfigStreamingSubscriber.LOG.info("Unexpected stream completion");
            this.restartHandler.accept(Boolean.valueOf(getHasReceivedData()));
        }

        public boolean getHasReceivedData() {
            return this.hasReceivedData.get();
        }
    }

    public SseConfigStreamingSubscriber(PrefabHttpClient prefabHttpClient, Supplier<Long> supplier, Consumer<Prefab.Configs> consumer, ScheduledExecutorService scheduledExecutorService) {
        this.prefabHttpClient = prefabHttpClient;
        this.highwaterMarkSupplier = supplier;
        this.configsConsumer = consumer;
        this.scheduledExecutorService = scheduledExecutorService;
    }

    public void start() {
        restart(0);
    }

    private void restart(int i) {
        Runnable runnable = () -> {
            try {
                SSEHandler sSEHandler = new SSEHandler();
                sSEHandler.subscribe(new FlowSubscriber(this.configsConsumer, bool -> {
                    restart(bool.booleanValue() ? 1 : i + 1);
                }));
                this.prefabHttpClient.requestConfigSSE(this.highwaterMarkSupplier.get().longValue(), sSEHandler);
            } catch (Exception e) {
                if (e.getMessage().contains("GOAWAY")) {
                    LOG.info("Got GOAWAY on SSE config stream, will restart connection.");
                } else {
                    LOG.warn("Unexpected exception starting SSE config stream, will retry", e);
                }
            }
        };
        if (i == 0) {
            runnable.run();
            return;
        }
        long exponentialMillisToNextTry = RetryDelayCalculator.exponentialMillisToNextTry(i, TimeUnit.SECONDS.toMillis(1L), TimeUnit.SECONDS.toMillis(30L));
        LOG.info("Restarting SSE config connection in {} ms", Long.valueOf(exponentialMillisToNextTry));
        this.scheduledExecutorService.schedule(runnable, exponentialMillisToNextTry, TimeUnit.MILLISECONDS);
    }
}
