package cz.o2.proxima.direct.core.transaction;

import cz.o2.proxima.core.repository.EntityAwareAttributeDescriptor;
import cz.o2.proxima.core.repository.EntityDescriptor;
import cz.o2.proxima.core.repository.Repository;
import cz.o2.proxima.core.util.ExceptionUtils;
import cz.o2.proxima.direct.core.commitlog.ObserverUtils;
import cz.o2.proxima.direct.core.commitlog.Offset;
import cz.o2.proxima.typesafe.config.ConfigFactory;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:cz/o2/proxima/direct/core/transaction/ThreadPooledObserverTest.class */
public class ThreadPooledObserverTest {
    private final Repository repo = Repository.ofTest(ConfigFactory.load("test-reference.conf"), new Repository.Validate[0]);
    private final EntityDescriptor gateway = this.repo.getEntity("gateway");
    private final EntityAwareAttributeDescriptor.Wildcard<byte[]> device = EntityAwareAttributeDescriptor.Wildcard.of(this.gateway, this.gateway.getAttribute("device.*"));

    @Test(timeout = 200000)
    public void testParallelObserve() throws InterruptedException {
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        Set synchronizedSet = Collections.synchronizedSet(new TreeSet());
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        int i = 20;
        int i2 = 2000000;
        ThreadPooledObserver threadPooledObserver = new ThreadPooledObserver(newCachedThreadPool, (streamElement, onNextContext) -> {
            synchronizedList.add(Integer.valueOf(streamElement.getKey().substring(3)));
            onNextContext.confirm();
            return true;
        }, 20);
        long currentTimeMillis = System.currentTimeMillis();
        CountDownLatch countDownLatch = new CountDownLatch(20);
        for (int i3 = 0; i3 < 20; i3++) {
            int i4 = i3;
            newCachedThreadPool.submit(() -> {
                CountDownLatch countDownLatch2 = new CountDownLatch(i2 / i);
                int i5 = i4;
                while (true) {
                    int i6 = i5;
                    if (i6 >= i2) {
                        Objects.requireNonNull(countDownLatch2);
                        ExceptionUtils.ignoringInterrupted(countDownLatch2::await);
                        countDownLatch.countDown();
                        return;
                    }
                    threadPooledObserver.onNext(this.device.upsert("key" + i6, String.valueOf(i6), currentTimeMillis + i6, new byte[0]), ObserverUtils.asOnNextContext((z, th) -> {
                        Assert.assertTrue(z);
                        Assert.assertNull(th);
                        synchronizedSet.add(Integer.valueOf(i6));
                        countDownLatch2.countDown();
                    }, (Offset) null));
                    i5 = i6 + i;
                }
            });
        }
        countDownLatch.await();
        Assert.assertEquals(2000000, synchronizedList.stream().distinct().count());
        Assert.assertEquals(2000000, synchronizedSet.size());
    }

    @Test(timeout = 20000)
    public void testParallelObserveWithRepartition() throws InterruptedException {
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        Set synchronizedSet = Collections.synchronizedSet(new TreeSet());
        ThreadPooledObserver threadPooledObserver = new ThreadPooledObserver(Executors.newCachedThreadPool(), (streamElement, onNextContext) -> {
            synchronizedList.add(Integer.valueOf(streamElement.getKey().substring(3)));
            onNextContext.confirm();
            return true;
        }, 20);
        long currentTimeMillis = System.currentTimeMillis();
        CountDownLatch countDownLatch = new CountDownLatch(50000);
        for (int i = 0; i < 50000; i++) {
            int i2 = i;
            threadPooledObserver.onNext(this.device.upsert("key" + i, String.valueOf(i), currentTimeMillis + i, new byte[0]), ObserverUtils.asOnNextContext((z, th) -> {
                Assert.assertTrue(z);
                Assert.assertNull(th);
                synchronizedSet.add(Integer.valueOf(i2));
                countDownLatch.countDown();
            }, (Offset) null));
            if (i2 % 83 == 0) {
                threadPooledObserver.onRepartition(Collections::emptyList);
            }
        }
        countDownLatch.await();
        threadPooledObserver.onCompleted();
        Assert.assertEquals(50000, synchronizedList.stream().distinct().count());
        Assert.assertEquals(50000, synchronizedSet.size());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2027064618:
                if (implMethodName.equals("lambda$testParallelObserve$c6d7b7b$1")) {
                    z = 3;
                    break;
                }
                break;
            case 93223254:
                if (implMethodName.equals("await")) {
                    z = true;
                    break;
                }
                break;
            case 455151003:
                if (implMethodName.equals("lambda$testParallelObserveWithRepartition$c6d7b7b$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1693523740:
                if (implMethodName.equals("lambda$testParallelObserveWithRepartition$2c3abb90$1")) {
                    z = 4;
                    break;
                }
                break;
            case 2054250817:
                if (implMethodName.equals("lambda$testParallelObserve$2c3abb90$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/direct/core/commitlog/CommitLogObserver$OffsetCommitter") && serializedLambda.getFunctionalInterfaceMethodName().equals("commit") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(ZLjava/lang/Throwable;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/core/transaction/ThreadPooledObserverTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Set;ILjava/util/concurrent/CountDownLatch;ZLjava/lang/Throwable;)V")) {
                    Set set = (Set) serializedLambda.getCapturedArg(0);
                    int intValue = ((Integer) serializedLambda.getCapturedArg(1)).intValue();
                    CountDownLatch countDownLatch = (CountDownLatch) serializedLambda.getCapturedArg(2);
                    return (z2, th) -> {
                        Assert.assertTrue(z2);
                        Assert.assertNull(th);
                        set.add(Integer.valueOf(intValue));
                        countDownLatch.countDown();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/core/util/ExceptionUtils$ThrowingRunnable") && serializedLambda.getFunctionalInterfaceMethodName().equals("run") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("java/util/concurrent/CountDownLatch") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    CountDownLatch countDownLatch2 = (CountDownLatch) serializedLambda.getCapturedArg(0);
                    return countDownLatch2::await;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/direct/core/commitlog/CommitLogObserver") && serializedLambda.getFunctionalInterfaceMethodName().equals("onNext") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lcz/o2/proxima/core/storage/StreamElement;Lcz/o2/proxima/direct/core/LogObserver$OnNextContext;)Z") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/core/transaction/ThreadPooledObserverTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/List;Lcz/o2/proxima/core/storage/StreamElement;Lcz/o2/proxima/direct/core/commitlog/CommitLogObserver$OnNextContext;)Z")) {
                    List list = (List) serializedLambda.getCapturedArg(0);
                    return (streamElement, onNextContext) -> {
                        list.add(Integer.valueOf(streamElement.getKey().substring(3)));
                        onNextContext.confirm();
                        return true;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/direct/core/commitlog/CommitLogObserver") && serializedLambda.getFunctionalInterfaceMethodName().equals("onNext") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lcz/o2/proxima/core/storage/StreamElement;Lcz/o2/proxima/direct/core/LogObserver$OnNextContext;)Z") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/core/transaction/ThreadPooledObserverTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/List;Lcz/o2/proxima/core/storage/StreamElement;Lcz/o2/proxima/direct/core/commitlog/CommitLogObserver$OnNextContext;)Z")) {
                    List list2 = (List) serializedLambda.getCapturedArg(0);
                    return (streamElement2, onNextContext2) -> {
                        list2.add(Integer.valueOf(streamElement2.getKey().substring(3)));
                        onNextContext2.confirm();
                        return true;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/direct/core/commitlog/CommitLogObserver$OffsetCommitter") && serializedLambda.getFunctionalInterfaceMethodName().equals("commit") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(ZLjava/lang/Throwable;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/core/transaction/ThreadPooledObserverTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Set;ILjava/util/concurrent/CountDownLatch;ZLjava/lang/Throwable;)V")) {
                    Set set2 = (Set) serializedLambda.getCapturedArg(0);
                    int intValue2 = ((Integer) serializedLambda.getCapturedArg(1)).intValue();
                    CountDownLatch countDownLatch3 = (CountDownLatch) serializedLambda.getCapturedArg(2);
                    return (z3, th2) -> {
                        Assert.assertTrue(z3);
                        Assert.assertNull(th2);
                        set2.add(Integer.valueOf(intValue2));
                        countDownLatch3.countDown();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
