package cz.o2.proxima.core.storage.watermark;

import cz.o2.proxima.core.storage.Partition;
import cz.o2.proxima.core.storage.ThroughputLimiter;
import cz.o2.proxima.core.util.TestUtils;
import cz.o2.proxima.internal.com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import lombok.Generated;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:cz/o2/proxima/core/storage/watermark/GlobalWatermarkThroughputLimiterTest.class */
public class GlobalWatermarkThroughputLimiterTest {
    private final GlobalWatermarkThroughputLimiter limiter = new GlobalWatermarkThroughputLimiter();

    /* loaded from: input_file:cz/o2/proxima/core/storage/watermark/GlobalWatermarkThroughputLimiterTest$TestTracker.class */
    public static class TestTracker implements GlobalWatermarkTracker {
        private final Map<String, Long> updates = new HashMap();
        private Instant globalWatermark = Instant.ofEpochMilli(Long.MIN_VALUE);
        private int testConf = -1;

        public String getName() {
            return getClass().getSimpleName();
        }

        public void setup(Map<String, Object> map) {
            this.testConf = ((Integer) Optional.ofNullable(map.get("test-tracker-conf")).map((v0) -> {
                return v0.toString();
            }).map(Integer::parseInt).orElse(-1)).intValue();
        }

        public void initWatermarks(Map<String, Long> map) {
            map.forEach((v1, v2) -> {
                update(v1, v2);
            });
        }

        public synchronized CompletableFuture<Void> update(String str, long j) {
            this.updates.put(str, Long.valueOf(j));
            return CompletableFuture.completedFuture(null);
        }

        public void finished(String str) {
            update(str, Long.MAX_VALUE);
        }

        public long getGlobalWatermark(@Nullable String str, long j) {
            return this.globalWatermark.toEpochMilli();
        }

        @Generated
        public Map<String, Long> getUpdates() {
            return this.updates;
        }

        @Generated
        public void setGlobalWatermark(Instant instant) {
            this.globalWatermark = instant;
        }

        @Generated
        public int getTestConf() {
            return this.testConf;
        }
    }

    @Before
    public void setUp() {
        this.limiter.setup(cfg(TestTracker.class));
    }

    private Map<String, Object> cfg(Class<TestTracker> cls) {
        return new ImmutableMap.Builder().put("tracker.class", cls.getName()).put("max-watermark-ahead-ms", 3600000).put("global-watermark-update-ms", 100).put("default-sleep-time-ms", 10).build();
    }

    @Test
    public void testLimiter() {
        TestTracker testTracker = (TestTracker) this.limiter.getTracker();
        Assert.assertEquals(Duration.ZERO, this.limiter.getPauseTime(context(Long.MIN_VALUE)));
        Instant now = Instant.now();
        long epochMilli = now.toEpochMilli();
        testTracker.setGlobalWatermark(now);
        this.limiter.forceUpdateGlobalWatermark();
        Assert.assertEquals(Duration.ofMillis(10L), this.limiter.getPauseTime(context(epochMilli + 3600001)));
        Assert.assertEquals(Duration.ZERO, this.limiter.getPauseTime(context(epochMilli + 3600000)));
    }

    @Test
    public void testLimiterWithMaxWatermark() {
        ((TestTracker) this.limiter.getTracker()).setGlobalWatermark(Instant.ofEpochMilli(Long.MAX_VALUE));
        Assert.assertEquals(Duration.ZERO, this.limiter.getPauseTime(context(Long.MAX_VALUE)));
    }

    @Test
    public void testLimiterSerializable() throws IOException, ClassNotFoundException {
        Assert.assertNotSame(this.limiter, TestUtils.deserializeObject(TestUtils.serializeObject(this.limiter)));
    }

    private ThroughputLimiter.Context context(final long j) {
        return new ThroughputLimiter.Context() { // from class: cz.o2.proxima.core.storage.watermark.GlobalWatermarkThroughputLimiterTest.1
            public Collection<Partition> getConsumedPartitions() {
                return Collections.singleton(Partition.of(0));
            }

            public long getMinWatermark() {
                return j;
            }
        };
    }
}
