package io.hekate.metrics.influxdb;

import io.hekate.core.internal.util.ArgAssert;
import io.hekate.core.internal.util.AsyncUtils;
import io.hekate.core.internal.util.ConfigCheck;
import io.hekate.core.internal.util.HekateThreadFactory;
import io.hekate.core.internal.util.Waiting;
import io.hekate.metrics.Metric;
import io.hekate.metrics.MetricFilter;
import io.hekate.util.format.ToString;
import io.hekate.util.format.ToStringIgnore;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import okhttp3.OkHttpClient;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/hekate/metrics/influxdb/InfluxDbMetricsPublisher.class */
class InfluxDbMetricsPublisher {
    private static final Logger log;
    private static final boolean DEBUG;
    private static final QueueEntry STOP_ENTRY;
    private static final Pattern UNSAFE_CHARACTERS;
    private final String url;
    private final String database;
    private final String user;

    @ToStringIgnore
    private final String password;
    private final int maxQueueSize;
    private final long timeout;
    private final MetricFilter filter;

    @ToStringIgnore
    private final BlockingQueue<QueueEntry> queue;

    @ToStringIgnore
    private final Object mux = new Object();

    @ToStringIgnore
    private ExecutorService worker;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/hekate/metrics/influxdb/InfluxDbMetricsPublisher$QueueEntry.class */
    public static class QueueEntry {
        private final long timestamp;
        private final Collection<Metric> metrics;
        private final CompletableFuture<Void> future;

        public QueueEntry(long j, Collection<Metric> collection, CompletableFuture<Void> completableFuture) {
            this.timestamp = j;
            this.metrics = collection;
            this.future = completableFuture;
        }

        public long timestamp() {
            return this.timestamp;
        }

        public Collection<Metric> metrics() {
            return this.metrics;
        }

        public CompletableFuture<Void> future() {
            return this.future;
        }
    }

    public InfluxDbMetricsPublisher(InfluxDbMetricsConfig influxDbMetricsConfig) {
        ArgAssert.notNull(influxDbMetricsConfig, "Configuration");
        ConfigCheck configCheck = ConfigCheck.get(InfluxDbMetricsConfig.class);
        configCheck.notEmpty(influxDbMetricsConfig.getUrl(), "URL");
        configCheck.notEmpty(influxDbMetricsConfig.getDatabase(), "database");
        configCheck.positive(influxDbMetricsConfig.getMaxQueueSize(), "maximum queue size");
        configCheck.positive(influxDbMetricsConfig.getTimeout(), "timeout");
        this.url = influxDbMetricsConfig.getUrl().trim();
        this.database = influxDbMetricsConfig.getDatabase().trim();
        this.user = influxDbMetricsConfig.getUser() != null ? influxDbMetricsConfig.getUser().trim() : null;
        this.password = influxDbMetricsConfig.getPassword() != null ? influxDbMetricsConfig.getPassword().trim() : null;
        this.maxQueueSize = influxDbMetricsConfig.getMaxQueueSize();
        this.timeout = influxDbMetricsConfig.getTimeout();
        this.filter = influxDbMetricsConfig.getFilter();
        this.queue = new ArrayBlockingQueue(this.maxQueueSize);
    }

    public void start(String str, String str2, int i) {
        if (!$assertionsDisabled && str2 == null) {
            throw new AssertionError("Node host address us null.");
        }
        log.info("Starting InfluxDB metrics publisher [{}]", ToString.formatProperties(this));
        String str3 = str2 + ':' + i;
        String str4 = (str == null || str.isEmpty()) ? str3 : str;
        InfluxDB connect = InfluxDBFactory.connect(this.url, this.user, this.password, new OkHttpClient.Builder().connectTimeout(this.timeout, TimeUnit.MILLISECONDS).readTimeout(this.timeout, TimeUnit.MILLISECONDS).writeTimeout(this.timeout, TimeUnit.MILLISECONDS));
        synchronized (this.mux) {
            this.worker = Executors.newSingleThreadExecutor(new HekateThreadFactory("InfluxDbMetrics"));
            String str5 = str4;
            this.worker.execute(() -> {
                QueueEntry queueEntry = null;
                Throwable th = null;
                try {
                    boolean z = false;
                    while (true) {
                        try {
                            queueEntry = this.queue.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
                            if (queueEntry == STOP_ENTRY) {
                                break;
                            }
                            boolean doPublish = doPublish(connect, str5, str3, queueEntry, z);
                            if (doPublish && z && log.isWarnEnabled()) {
                                log.warn("Resumed InfluxDB metrics publishing after failure.");
                            }
                            z = !doPublish;
                        } catch (Error | RuntimeException e) {
                            th = e;
                            log.error("Got an unexpected runtime error while publishing metrics to InfluxDB.", e);
                            synchronized (this.mux) {
                                if (this.worker != null) {
                                    this.worker.shutdown();
                                    this.worker = null;
                                }
                                if (queueEntry != null) {
                                    try {
                                        try {
                                            if (queueEntry.future() != null) {
                                                if (th == null) {
                                                    queueEntry.future().cancel(false);
                                                } else {
                                                    queueEntry.future().completeExceptionally(th);
                                                }
                                            }
                                        } catch (Throwable th2) {
                                            log.error("Got an unexpected error while notifying InfluxDB publisher future.", th2);
                                            log.info("Stopped InfluxDB metrics publisher.");
                                            connect.close();
                                            return;
                                        }
                                    } catch (Throwable th3) {
                                        log.info("Stopped InfluxDB metrics publisher.");
                                        connect.close();
                                        throw th3;
                                    }
                                }
                                log.info("Stopped InfluxDB metrics publisher.");
                                connect.close();
                                return;
                            }
                        } catch (InterruptedException e2) {
                            try {
                                if (queueEntry != null) {
                                    try {
                                        if (queueEntry.future() != null) {
                                            if (0 == 0) {
                                                queueEntry.future().cancel(false);
                                            } else {
                                                queueEntry.future().completeExceptionally(null);
                                            }
                                        }
                                    } catch (Throwable th4) {
                                        log.error("Got an unexpected error while notifying InfluxDB publisher future.", th4);
                                        log.info("Stopped InfluxDB metrics publisher.");
                                        connect.close();
                                        return;
                                    }
                                }
                                log.info("Stopped InfluxDB metrics publisher.");
                                connect.close();
                                return;
                            } catch (Throwable th5) {
                                log.info("Stopped InfluxDB metrics publisher.");
                                connect.close();
                                throw th5;
                            }
                        }
                    }
                    try {
                        if (queueEntry != null) {
                            try {
                                if (queueEntry.future() != null) {
                                    if (0 == 0) {
                                        queueEntry.future().cancel(false);
                                    } else {
                                        queueEntry.future().completeExceptionally(null);
                                    }
                                }
                            } catch (Throwable th6) {
                                log.error("Got an unexpected error while notifying InfluxDB publisher future.", th6);
                                log.info("Stopped InfluxDB metrics publisher.");
                                connect.close();
                                return;
                            }
                        }
                        log.info("Stopped InfluxDB metrics publisher.");
                        connect.close();
                    } catch (Throwable th7) {
                        log.info("Stopped InfluxDB metrics publisher.");
                        connect.close();
                        throw th7;
                    }
                } catch (Throwable th8) {
                    try {
                        if (queueEntry != null) {
                            try {
                                if (queueEntry.future() != null) {
                                    if (th == null) {
                                        queueEntry.future().cancel(false);
                                    } else {
                                        queueEntry.future().completeExceptionally(th);
                                    }
                                }
                            } catch (Throwable th9) {
                                log.error("Got an unexpected error while notifying InfluxDB publisher future.", th9);
                                log.info("Stopped InfluxDB metrics publisher.");
                                connect.close();
                                throw th8;
                            }
                        }
                        log.info("Stopped InfluxDB metrics publisher.");
                        connect.close();
                        throw th8;
                    } catch (Throwable th10) {
                        log.info("Stopped InfluxDB metrics publisher.");
                        connect.close();
                        throw th10;
                    }
                }
            });
        }
    }

    public void stop() {
        stopAsync().awaitUninterruptedly();
    }

    public boolean isStopped() {
        boolean z;
        synchronized (this.mux) {
            z = this.worker == null;
        }
        return z;
    }

    public CompletableFuture<Void> publish(Collection<Metric> collection) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        boolean z = false;
        if (collection != null && !collection.isEmpty()) {
            synchronized (this.mux) {
                if (this.worker != null) {
                    if (this.queue.offer(new QueueEntry(System.currentTimeMillis(), collection, completableFuture))) {
                        z = true;
                        if (DEBUG) {
                            log.debug("Scheduled asynchronous metrics publishing [metrics-size={}]", Integer.valueOf(collection.size()));
                        }
                    } else if (DEBUG) {
                        log.debug("Skipped asynchronous metrics publishing due to queue overflow [max-queue-size={}]", Integer.valueOf(this.maxQueueSize));
                    }
                } else if (DEBUG) {
                    log.debug("Skipped asynchronous metrics publishing since publisher is stopped.");
                }
            }
        }
        if (!z) {
            completableFuture.cancel(false);
        }
        return completableFuture;
    }

    static String toSafeName(String str) {
        return UNSAFE_CHARACTERS.matcher(str.trim()).replaceAll("_");
    }

    Waiting stopAsync() {
        Waiting waiting = Waiting.NO_WAIT;
        ArrayList arrayList = null;
        try {
            synchronized (this.mux) {
                if (this.worker != null) {
                    log.info("Stopping InfluxDB metrics publisher...");
                    do {
                        if (arrayList == null) {
                            arrayList = new ArrayList(this.queue.size());
                        }
                        this.queue.drainTo(arrayList);
                    } while (!this.queue.offer(STOP_ENTRY));
                    waiting = AsyncUtils.shutdown(this.worker);
                    this.worker = null;
                }
            }
            return waiting;
        } finally {
            if (arrayList != null) {
                arrayList.stream().map((v0) -> {
                    return v0.future();
                }).filter((v0) -> {
                    return Objects.nonNull(v0);
                }).forEach(completableFuture -> {
                    completableFuture.cancel(false);
                });
            }
        }
    }

    int queueSize() {
        return this.queue.size();
    }

    void doWrite(InfluxDB influxDB, BatchPoints batchPoints) {
        influxDB.write(batchPoints);
    }

    private boolean doPublish(InfluxDB influxDB, String str, String str2, QueueEntry queueEntry, boolean z) {
        if (!$assertionsDisabled && influxDB == null) {
            throw new AssertionError("Database is null.");
        }
        if (!$assertionsDisabled && str == null) {
            throw new AssertionError("Node name is null.");
        }
        if (!$assertionsDisabled && str2 == null) {
            throw new AssertionError("Node address is null.");
        }
        if (!$assertionsDisabled && queueEntry == null) {
            throw new AssertionError("Metrics queue entry is null.");
        }
        if (DEBUG) {
            log.debug("Publishing metrics [timestamp={}, metrics-size={}]", Long.valueOf(queueEntry.timestamp()), Integer.valueOf(queueEntry.metrics().size()));
        }
        CompletableFuture<Void> future = queueEntry.future();
        try {
            BatchPoints build = BatchPoints.database(this.database).tag(InfluxDbMetricsPlugin.NODE_NAME_TAG, str).tag(InfluxDbMetricsPlugin.NODE_ADDRESS_TAG, str2).build();
            queueEntry.metrics().stream().filter(metric -> {
                return this.filter == null || this.filter.accept(metric);
            }).forEach(metric2 -> {
                build.point(Point.measurement(toSafeName(metric2.name())).time(queueEntry.timestamp(), TimeUnit.MILLISECONDS).addField(InfluxDbMetricsPlugin.METRIC_VALUE_FIELD, metric2.value()).build());
            });
            if (build.getPoints() != null && !build.getPoints().isEmpty()) {
                doWrite(influxDB, build);
            }
            if (future != null) {
                future.complete(null);
            }
            if (!DEBUG) {
                return true;
            }
            log.debug("Published metrics [timestamp={}, metrics-size={}]", Long.valueOf(queueEntry.timestamp()), Integer.valueOf(queueEntry.metrics().size()));
            return true;
        } catch (RuntimeException e) {
            if (z) {
                if (DEBUG) {
                    log.debug("Throttled error during metrics publishing.", e);
                }
            } else if (log.isWarnEnabled()) {
                log.warn("Got an error while publishing metrics to InfluxDB (will silently ignore subsequent errors).", e);
            }
            if (future == null) {
                return false;
            }
            future.completeExceptionally(e);
            return false;
        }
    }

    public String toString() {
        return ToString.format(this);
    }

    static {
        $assertionsDisabled = !InfluxDbMetricsPublisher.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(InfluxDbMetricsPlugin.class);
        DEBUG = log.isDebugEnabled();
        STOP_ENTRY = new QueueEntry(0L, Collections.emptyList(), null);
        UNSAFE_CHARACTERS = Pattern.compile("[^a-z0-9.]", 2);
    }
}
