package com.spotify.helios.servicescommon;

import com.google.cloud.ByteArray;
import com.google.cloud.pubsub.Message;
import com.google.cloud.pubsub.PubSub;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import io.dropwizard.lifecycle.Managed;
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spotify/helios/servicescommon/GooglePubSubSender.class */
public class GooglePubSubSender implements EventSender {
    private static final Logger log = LoggerFactory.getLogger(GooglePubSubSender.class);
    private final PubSub pubsub;
    private final String topicPrefix;
    private final HealthChecker healthchecker;

    /* loaded from: input_file:com/spotify/helios/servicescommon/GooglePubSubSender$DefaultHealthChecker.class */
    public static class DefaultHealthChecker implements HealthChecker {
        private final PubSub pubsub;
        private final String topic;
        private final ScheduledExecutorService executor;
        private final Duration healthcheckInterval;
        private AtomicBoolean healthy = new AtomicBoolean(false);

        public DefaultHealthChecker(PubSub pubSub, String str, ScheduledExecutorService scheduledExecutorService, Duration duration) {
            this.pubsub = pubSub;
            this.topic = str;
            this.executor = scheduledExecutorService;
            this.healthcheckInterval = duration;
        }

        public void start() {
            this.executor.scheduleWithFixedDelay(this::checkHealth, 0L, this.healthcheckInterval.toMillis(), TimeUnit.MILLISECONDS);
        }

        public void stop() throws Exception {
            this.executor.shutdown();
        }

        @Override // com.spotify.helios.servicescommon.GooglePubSubSender.HealthChecker
        public boolean isHealthy() {
            return this.healthy.get();
        }

        @VisibleForTesting
        void checkHealth() {
            this.healthy.set(doCheckHealth());
        }

        private boolean doCheckHealth() {
            try {
                this.pubsub.getTopic(this.topic);
                GooglePubSubSender.log.info("successfully checked if pubsub topic {} exists - this instance is healthy", this.topic);
                return true;
            } catch (RuntimeException e) {
                GooglePubSubSender.log.warn("caught exception checking if pubsub topic {} exists. Publishing to pubsub will be disabled until connectivity is restored (next check is in {})", new Object[]{this.topic, this.healthcheckInterval, e});
                return false;
            }
        }
    }

    /* loaded from: input_file:com/spotify/helios/servicescommon/GooglePubSubSender$HealthChecker.class */
    public interface HealthChecker extends Managed {
        boolean isHealthy();
    }

    public static GooglePubSubSender create(PubSub pubSub, String str, HealthChecker healthChecker) {
        return new GooglePubSubSender(pubSub, str, healthChecker);
    }

    private GooglePubSubSender(PubSub pubSub, String str, HealthChecker healthChecker) {
        this.pubsub = pubSub;
        this.topicPrefix = str;
        this.healthchecker = healthChecker;
    }

    public void start() throws Exception {
        this.healthchecker.start();
    }

    public void stop() throws Exception {
        this.healthchecker.stop();
    }

    @Override // com.spotify.helios.servicescommon.EventSender
    public void send(String str, byte[] bArr) {
        final String str2 = this.topicPrefix + str;
        if (!this.healthchecker.isHealthy()) {
            log.warn("will not publish message to pubsub topic={} as the pubsub client appears to be unhealthy", str2);
            return;
        }
        try {
            Futures.addCallback(JdkFutureAdapters.listenInPoolThread(this.pubsub.publishAsync(str2, Message.of(ByteArray.copyFrom(bArr)))), new FutureCallback<String>() { // from class: com.spotify.helios.servicescommon.GooglePubSubSender.1
                public void onSuccess(@Nullable String str3) {
                    GooglePubSubSender.log.debug("Sent an event to Google PubSub, topic: {}, ack: {}", str2, str3);
                }

                public void onFailure(Throwable th) {
                    GooglePubSubSender.log.warn("Unable to send an event to Google PubSub, topic: {}", str2, th);
                }
            });
        } catch (Exception e) {
            log.warn("Failed to publish Google PubSub message, topic: {}", str2, e);
        }
    }
}
