package net.yudichev.jiotty.connector.mqtt;

import com.google.common.base.Preconditions;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import javax.inject.Inject;
import net.yudichev.jiotty.common.async.AsyncOperationFailureHandler;
import net.yudichev.jiotty.common.async.AsyncOperationRetryImpl;
import net.yudichev.jiotty.common.async.ExecutorFactory;
import net.yudichev.jiotty.common.async.Scheduler;
import net.yudichev.jiotty.common.async.SchedulingExecutor;
import net.yudichev.jiotty.common.inject.BaseLifecycleComponent;
import net.yudichev.jiotty.common.lang.Closeable;
import net.yudichev.jiotty.common.lang.CompositeException;
import net.yudichev.jiotty.common.lang.HumanReadableExceptionMessage;
import net.yudichev.jiotty.common.lang.MoreThrowables;
import net.yudichev.jiotty.common.lang.Runnables;
import net.yudichev.jiotty.common.lang.backoff.BackOff;
import net.yudichev.jiotty.common.lang.backoff.ExponentialBackOff;
import net.yudichev.jiotty.common.lang.backoff.NanoClock;
import net.yudichev.jiotty.common.lang.backoff.SynchronizedBackOff;
import net.yudichev.jiotty.common.lang.throttling.ThresholdThrottlingConsumerFactory;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:net/yudichev/jiotty/connector/mqtt/MqttImpl.class */
public class MqttImpl extends BaseLifecycleComponent implements Mqtt {
    private static final Logger logger = LoggerFactory.getLogger(MqttImpl.class);
    private final ThresholdThrottlingConsumerFactory<Throwable> throttledLoggerFactory;
    private final MqttConnectOptions mqttConnectOptions;
    private final Map<String, MqttMessage> lastReceivedMessageByTopic;
    private final Map<String, Set<BiConsumer<String, MqttMessage>>> subscriptionsByFilter;
    private final IMqttAsyncClient client;
    private final ExecutorFactory executorFactory;
    private final String name;
    private final double connectBackoffRandmisationFactor;
    private final NanoClock nanoClock;
    private SchedulingExecutor executor;

    @Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
    @BindingAnnotation
    @Retention(RetentionPolicy.RUNTIME)
    /* loaded from: input_file:net/yudichev/jiotty/connector/mqtt/MqttImpl$Dependency.class */
    @interface Dependency {
    }

    /* loaded from: input_file:net/yudichev/jiotty/connector/mqtt/MqttImpl$MessageToStringDataCallback.class */
    private static class MessageToStringDataCallback implements BiConsumer<String, MqttMessage> {
        private final BiConsumer<String, String> delegate;

        MessageToStringDataCallback(BiConsumer<String, String> biConsumer) {
            this.delegate = (BiConsumer) Preconditions.checkNotNull(biConsumer);
        }

        @Override // java.util.function.BiConsumer
        public void accept(String str, MqttMessage mqttMessage) {
            this.delegate.accept(str, new String(mqttMessage.getPayload(), StandardCharsets.UTF_8));
        }
    }

    /* loaded from: input_file:net/yudichev/jiotty/connector/mqtt/MqttImpl$ResubscribeOnReconnectCallback.class */
    private class ResubscribeOnReconnectCallback implements MqttCallbackExtended {
        private final Consumer<Throwable> throttledErrorLogger;
        private final BackOff backOff = new ExponentialBackOff.Builder().setInitialIntervalMillis(10).setMaxIntervalMillis(10000).setMultiplier(2.0d).build();
        private Closeable subRetryTimerHandle = Closeable.noop();

        private ResubscribeOnReconnectCallback() {
            this.throttledErrorLogger = MqttImpl.this.throttledLoggerFactory.create(5, Duration.ofMinutes(1L), th -> {
                MqttImpl.logger.error("{} lost connection to {} too often (suppressing this error for 1 minute)", new Object[]{MqttImpl.this.client.getClientId(), MqttImpl.this.client.getServerURI(), th});
            });
        }

        public void connectComplete(boolean z, String str) {
            MqttImpl.logger.info("{} completed connection to {}, reconnected={}", new Object[]{MqttImpl.this.client.getClientId(), str, Boolean.valueOf(z)});
            if (z) {
                restoreSubscriptions();
            }
        }

        private void restoreSubscriptions() {
            MqttImpl.this.executor.submit(() -> {
                MqttImpl.logger.info("Restoring subscriptions: {}", MqttImpl.this.subscriptionsByFilter);
                try {
                    MqttImpl.this.subscriptionsByFilter.forEach((str, set) -> {
                        set.forEach(biConsumer -> {
                            MqttImpl.this.doSubscribe(str, biConsumer);
                        });
                    });
                    this.backOff.reset();
                } catch (RuntimeException e) {
                    long nextBackOffMillis = this.backOff.nextBackOffMillis();
                    MqttImpl.logger.info("Re-subscription failed, will re-try in {}ms", Long.valueOf(nextBackOffMillis), e);
                    this.subRetryTimerHandle = MqttImpl.this.executor.schedule(Duration.ofMillis(nextBackOffMillis), this::restoreSubscriptions);
                }
            });
        }

        public void connectionLost(Throwable th) {
            MqttImpl.logger.info("{} lost connection to {}", new Object[]{MqttImpl.this.client.getClientId(), MqttImpl.this.client.getServerURI(), th});
            MqttImpl.this.executor.submit(() -> {
                this.subRetryTimerHandle.close();
                this.throttledErrorLogger.accept(th);
            });
        }

        public void messageArrived(String str, MqttMessage mqttMessage) {
            MqttImpl.this.executor.submit(() -> {
                MqttImpl.logger.debug("messageArrived: {}->{}", str, mqttMessage);
                MqttImpl.this.lastReceivedMessageByTopic.put(str, mqttMessage);
            });
        }

        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
            MqttImpl.logger.debug("Message delivered: {}", Integer.valueOf(iMqttDeliveryToken.getMessageId()));
        }
    }

    @Inject
    MqttImpl(IMqttAsyncClient iMqttAsyncClient, ExecutorFactory executorFactory, @Dependency ThresholdThrottlingConsumerFactory<Throwable> thresholdThrottlingConsumerFactory, @Dependency MqttConnectOptions mqttConnectOptions) {
        this(iMqttAsyncClient, executorFactory, thresholdThrottlingConsumerFactory, mqttConnectOptions, System::nanoTime, 0.5d);
    }

    MqttImpl(IMqttAsyncClient iMqttAsyncClient, ExecutorFactory executorFactory, ThresholdThrottlingConsumerFactory<Throwable> thresholdThrottlingConsumerFactory, MqttConnectOptions mqttConnectOptions, NanoClock nanoClock, double d) {
        this.lastReceivedMessageByTopic = new HashMap();
        this.subscriptionsByFilter = new HashMap();
        this.executorFactory = (ExecutorFactory) Preconditions.checkNotNull(executorFactory);
        this.throttledLoggerFactory = (ThresholdThrottlingConsumerFactory) Preconditions.checkNotNull(thresholdThrottlingConsumerFactory);
        this.mqttConnectOptions = (MqttConnectOptions) Preconditions.checkNotNull(mqttConnectOptions);
        this.client = iMqttAsyncClient;
        this.name = super.name() + " " + iMqttAsyncClient.getClientId() + " " + iMqttAsyncClient.getServerURI();
        this.nanoClock = (NanoClock) Preconditions.checkNotNull(nanoClock);
        this.connectBackoffRandmisationFactor = d;
    }

    public String name() {
        return this.name;
    }

    protected void doStart() {
        this.executor = this.executorFactory.createSingleThreadedSchedulingExecutor("Handler-" + this.client.getServerURI());
        AsyncOperationRetryImpl asyncOperationRetryImpl = new AsyncOperationRetryImpl(AsyncOperationFailureHandler.forBackoff(new SynchronizedBackOff(new ExponentialBackOff.Builder().setNanoClock(this.nanoClock).setInitialIntervalMillis(1000).setMaxIntervalMillis(30000).setMaxElapsedTimeMillis(Integer.MAX_VALUE).setRandomizationFactor(this.connectBackoffRandmisationFactor).build()), logger));
        this.executor.execute(() -> {
            this.client.setCallback(new ResubscribeOnReconnectCallback());
            try {
                waitForConnectFutureAndThen(asyncOperationRetryImpl.withBackOffAndRetry("MQTT Connect to " + this.client.getServerURI(), () -> {
                    logger.debug("MQTT Connecting to {}", this.client.getServerURI());
                    final CompletableFuture completableFuture = new CompletableFuture();
                    try {
                        this.client.connect(this.mqttConnectOptions, (Object) null, new IMqttActionListener() { // from class: net.yudichev.jiotty.connector.mqtt.MqttImpl.1
                            public void onSuccess(IMqttToken iMqttToken) {
                                completableFuture.complete(null);
                            }

                            public void onFailure(IMqttToken iMqttToken, Throwable th) {
                                completableFuture.completeExceptionally(th);
                            }
                        });
                    } catch (MqttException e) {
                        completableFuture.completeExceptionally(e);
                    }
                    return completableFuture;
                }, (l, runnable) -> {
                    scheduleReconnect(this.executor, l, runnable);
                }), () -> {
                    logger.info("Connected to {}", this.client.getServerURI());
                });
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                logger.warn("Failed to connect to {}", this.client.getServerURI(), e);
            } catch (ExecutionException e2) {
                logger.warn("Failed to connect to {}", this.client.getServerURI(), e2);
            }
        });
    }

    void scheduleReconnect(Scheduler scheduler, Long l, Runnable runnable) {
        if (l.longValue() > 0) {
            MoreThrowables.asUnchecked(() -> {
                Thread.sleep(l.longValue());
            });
        }
        runnable.run();
    }

    void waitForConnectFutureAndThen(CompletableFuture<Void> completableFuture, Runnable runnable) throws InterruptedException, ExecutionException {
        completableFuture.get();
        runnable.run();
    }

    @Override // net.yudichev.jiotty.connector.mqtt.Mqtt
    public Closeable subscribe(String str, BiConsumer<String, String> biConsumer) {
        checkStarted();
        BiConsumer exceptionLogging = exceptionLogging(new MessageToStringDataCallback(biConsumer));
        this.executor.execute(() -> {
            deliverImage(str, exceptionLogging);
            this.subscriptionsByFilter.computeIfAbsent(str, str2 -> {
                doSubscribe(str2, (str2, mqttMessage) -> {
                    CompositeException.runForAll(this.subscriptionsByFilter.get(str2), biConsumer2 -> {
                        biConsumer2.accept(str2, mqttMessage);
                    });
                });
                return new HashSet();
            }).add(exceptionLogging);
        });
        return Closeable.idempotent(() -> {
            this.executor.submit(Runnables.guarded(logger, "unsubscribe", () -> {
                this.subscriptionsByFilter.computeIfPresent(str, (str2, set) -> {
                    set.remove(exceptionLogging);
                    if (!set.isEmpty()) {
                        return set;
                    }
                    if (!this.client.isConnected()) {
                        return null;
                    }
                    MoreThrowables.asUnchecked(() -> {
                        this.client.unsubscribe(str);
                    });
                    return null;
                });
            }));
        });
    }

    @Override // net.yudichev.jiotty.connector.mqtt.Mqtt
    public CompletableFuture<Void> publish(String str, String str2) {
        checkStarted();
        return CompletableFuture.supplyAsync(() -> {
            logger.debug("OUT topic: {}, msg: {}", str, str2);
            MoreThrowables.asUnchecked(() -> {
                this.client.publish(str, str2.getBytes(StandardCharsets.UTF_8), 1, false);
            });
            return null;
        }, this.executor);
    }

    protected void doStop() {
        Closeable.closeIfNotNull(this.executor);
        try {
            try {
                this.client.disconnect();
                Closeable.closeSafelyIfNotNull(logger, this.client);
            } catch (MqttException e) {
                logger.info("Failed to disconnect client: {}", HumanReadableExceptionMessage.humanReadableMessage(e));
                Closeable.closeSafelyIfNotNull(logger, this.client);
            }
        } catch (Throwable th) {
            Closeable.closeSafelyIfNotNull(logger, this.client);
            throw th;
        }
    }

    private static <T, U> BiConsumer<T, U> exceptionLogging(BiConsumer<T, U> biConsumer) {
        return (obj, obj2) -> {
            try {
                biConsumer.accept(obj, obj2);
            } catch (RuntimeException e) {
                logger.error("Error handling message", e);
            }
        };
    }

    private void doSubscribe(String str, BiConsumer<String, MqttMessage> biConsumer) {
        MoreThrowables.asUnchecked(() -> {
            this.client.subscribe(str, 2, (str2, mqttMessage) -> {
                logger.debug("IN topic: {}, msg: {}", str2, mqttMessage);
                Runnables.guarded(logger, "Notify client on MQTT message", () -> {
                    biConsumer.accept(str2, mqttMessage);
                }).run();
            });
        });
    }

    private void deliverImage(String str, BiConsumer<String, MqttMessage> biConsumer) {
        this.lastReceivedMessageByTopic.forEach((str2, mqttMessage) -> {
            if (MqttTopic.isMatched(str, str2)) {
                logger.debug("Delivering last known message {} -> {}", str2, mqttMessage);
                Runnables.guarded(logger, "deliver last known message", () -> {
                    biConsumer.accept(str2, mqttMessage);
                }).run();
            }
        });
    }
}
