package cz.o2.proxima.core.storage.watermark;

import cz.o2.proxima.core.storage.ThroughputLimiter;
import cz.o2.proxima.core.time.Watermarks;
import cz.o2.proxima.core.util.Classpath;
import cz.o2.proxima.core.util.ExceptionUtils;
import cz.o2.proxima.core.util.Pair;
import cz.o2.proxima.internal.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.com.google.common.base.MoreObjects;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/core/storage/watermark/GlobalWatermarkThroughputLimiter.class */
public class GlobalWatermarkThroughputLimiter implements ThroughputLimiter {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(GlobalWatermarkThroughputLimiter.class);
    private static final long serialVersionUID = 1;
    private static volatile GlobalWatermarkTracker singletonTracker;
    static final String TRACKER_CFG_PREFIX = "tracker.";
    static final String KW_CLASS = "class";
    public static final String MAX_AHEAD_TIME_MS_CFG = "max-watermark-ahead-ms";
    public static final String GLOBAL_WATERMARK_UPDATE_MS_CFG = "global-watermark-update-ms";
    public static final String DEFAULT_SLEEP_TIME_CFG = "default-sleep-time-ms";

    @VisibleForTesting
    private GlobalWatermarkTracker tracker;
    private long maxAheadTimeFromGlobalMs = Watermarks.MAX_WATERMARK;
    private long globalWatermarkUpdatePeriodMs = Duration.ofMinutes(serialVersionUID).toMillis();
    private boolean closed = false;
    private transient String processName = UUID.randomUUID().toString();
    private long lastGlobalWatermarkUpdate = Long.MIN_VALUE;
    private long sleepTimeMs = this.globalWatermarkUpdatePeriodMs;

    @Override // cz.o2.proxima.core.storage.ThroughputLimiter
    public void setup(Map<String, Object> map) {
        initializeTracker(map);
        this.tracker = singletonTracker;
        this.maxAheadTimeFromGlobalMs = getLong(map, MAX_AHEAD_TIME_MS_CFG, this.maxAheadTimeFromGlobalMs).longValue();
        this.globalWatermarkUpdatePeriodMs = getLong(map, GLOBAL_WATERMARK_UPDATE_MS_CFG, this.globalWatermarkUpdatePeriodMs).longValue();
        this.sleepTimeMs = getLong(map, DEFAULT_SLEEP_TIME_CFG, this.sleepTimeMs).longValue();
    }

    private void initializeTracker(Map<String, Object> map) {
        if (singletonTracker == null) {
            synchronized (GlobalWatermarkThroughputLimiter.class) {
                if (singletonTracker == null) {
                    singletonTracker = (GlobalWatermarkTracker) Optional.ofNullable(map.get("tracker.class")).map((v0) -> {
                        return v0.toString();
                    }).map(str -> {
                        return (GlobalWatermarkTracker) ExceptionUtils.uncheckedFactory(() -> {
                            return (GlobalWatermarkTracker) Classpath.newInstance(str, GlobalWatermarkTracker.class);
                        });
                    }).orElseThrow(() -> {
                        return new IllegalArgumentException(String.format("Missing %s%s specifying GlobalWatermarkTracker implementation", TRACKER_CFG_PREFIX, KW_CLASS));
                    });
                }
                singletonTracker.setup((Map) map.entrySet().stream().filter(entry -> {
                    return ((String) entry.getKey()).startsWith(TRACKER_CFG_PREFIX);
                }).map(entry2 -> {
                    return Pair.of(((String) entry2.getKey()).substring(TRACKER_CFG_PREFIX.length()), entry2.getValue());
                }).collect(Collectors.toMap((v0) -> {
                    return v0.getFirst();
                }, (v0) -> {
                    return v0.getSecond();
                })));
            }
        }
    }

    private static Long getLong(Map<String, Object> map, String str, long j) {
        return (Long) Optional.ofNullable(map.get(str)).map((v0) -> {
            return v0.toString();
        }).map(Long::valueOf).orElse(Long.valueOf(j));
    }

    @Override // cz.o2.proxima.core.storage.ThroughputLimiter
    public synchronized Duration getPauseTime(ThroughputLimiter.Context context) {
        if (!this.closed) {
            updateGlobalWatermarkIfNeeded(context);
            long globalWatermark = this.tracker.getGlobalWatermark(this.processName, context.getMinWatermark());
            if (globalWatermark < Watermarks.MAX_WATERMARK && globalWatermark + this.maxAheadTimeFromGlobalMs < context.getMinWatermark()) {
                log.info("ThroughputLimiter {} pausing processing for {} ms on global watermark {} and context.minWatermark {}", new Object[]{this, Long.valueOf(this.sleepTimeMs), Long.valueOf(globalWatermark), Long.valueOf(context.getMinWatermark())});
                return Duration.ofMillis(this.sleepTimeMs);
            }
        }
        return Duration.ZERO;
    }

    @Override // cz.o2.proxima.core.storage.ThroughputLimiter, java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        this.tracker.finished(this.processName);
        this.closed = true;
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).add("tracker", this.tracker).add("maxAheadTimeFromGlobalMs", this.maxAheadTimeFromGlobalMs).add("globalWatermarkUpdatePeriodMs", this.globalWatermarkUpdatePeriodMs).add("processName", this.processName).add("sleepTimeMs", this.sleepTimeMs).toString();
    }

    private void updateGlobalWatermarkIfNeeded(ThroughputLimiter.Context context) {
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis - this.globalWatermarkUpdatePeriodMs > this.lastGlobalWatermarkUpdate) {
            CompletableFuture<Void> update = this.tracker.update(this.processName, context.getMinWatermark());
            if (this.lastGlobalWatermarkUpdate == Long.MIN_VALUE) {
                Objects.requireNonNull(update);
                ExceptionUtils.ignoringInterrupted(update::get);
            }
            this.lastGlobalWatermarkUpdate = currentTimeMillis;
        }
    }

    @VisibleForTesting
    void forceUpdateGlobalWatermark() {
        this.lastGlobalWatermarkUpdate = Long.MIN_VALUE;
    }

    protected Object readResolve() {
        this.processName = UUID.randomUUID().toString();
        if (singletonTracker == null) {
            synchronized (GlobalWatermarkThroughputLimiter.class) {
                if (singletonTracker == null) {
                    singletonTracker = this.tracker;
                }
            }
        } else {
            this.tracker = singletonTracker;
        }
        return this;
    }

    @Generated
    public GlobalWatermarkTracker getTracker() {
        return this.tracker;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 102230:
                if (implMethodName.equals("get")) {
                    z = true;
                    break;
                }
                break;
            case 1946253523:
                if (implMethodName.equals("lambda$initializeTracker$16fd6b97$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/core/util/ExceptionUtils$ThrowingFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/core/storage/watermark/GlobalWatermarkThroughputLimiter") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Lcz/o2/proxima/core/storage/watermark/GlobalWatermarkTracker;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return (GlobalWatermarkTracker) Classpath.newInstance(str, GlobalWatermarkTracker.class);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/core/util/ExceptionUtils$ThrowingRunnable") && serializedLambda.getFunctionalInterfaceMethodName().equals("run") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("java/util/concurrent/CompletableFuture") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    CompletableFuture completableFuture = (CompletableFuture) serializedLambda.getCapturedArg(0);
                    return completableFuture::get;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
