package cz.o2.proxima.source;

import com.typesafe.config.ConfigFactory;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.ConfigRepository;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.OnlineAttributeWriter;
import cz.o2.proxima.storage.StreamElement;
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.dataset.windowing.Time;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.io.BoundedDataSource;
import cz.seznam.euphoria.core.client.io.ListDataSink;
import cz.seznam.euphoria.core.client.operator.AssignEventTime;
import cz.seznam.euphoria.core.client.operator.CountByKey;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.executor.local.LocalExecutor;
import cz.seznam.euphoria.testing.DatasetAssert;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;

/* loaded from: input_file:cz/o2/proxima/source/BoundedSourceTest.class */
abstract class BoundedSourceTest {
    final Repository repo = ConfigRepository.of(ConfigFactory.load("test-reference.conf").resolve());
    final EntityDescriptor entity = getEntity(this.repo);
    final AttributeDescriptor<byte[]> attr = getAttr(this.entity);
    final AttributeDescriptor<byte[]> wildcard = getWildcard(this.entity);

    abstract EntityDescriptor getEntity(Repository repository);

    abstract AttributeDescriptor<byte[]> getAttr(EntityDescriptor entityDescriptor);

    abstract AttributeDescriptor<byte[]> getWildcard(EntityDescriptor entityDescriptor);

    private StreamElement update(String str, AttributeDescriptor<byte[]> attributeDescriptor) {
        return update(str, attributeDescriptor, System.currentTimeMillis());
    }

    private StreamElement update(String str, AttributeDescriptor<byte[]> attributeDescriptor, long j) {
        return update(str, attributeDescriptor, attributeDescriptor.getName(), j);
    }

    private StreamElement update(String str, AttributeDescriptor<byte[]> attributeDescriptor, String str2, long j) {
        return StreamElement.update(this.entity, attributeDescriptor, UUID.randomUUID().toString(), str, str2, j, new byte[]{1, 2, 3});
    }

    public void testSimpleConsume(OnlineAttributeWriter onlineAttributeWriter, BoundedDataSource<StreamElement> boundedDataSource) throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        onlineAttributeWriter.write(update("key", this.attr, currentTimeMillis), (z, th) -> {
            countDownLatch.countDown();
        });
        countDownLatch.await();
        Flow create = Flow.create();
        Dataset createInput = create.createInput(boundedDataSource);
        ListDataSink listDataSink = ListDataSink.get();
        CountByKey.of(createInput).keyBy((v0) -> {
            return v0.getKey();
        }).windowBy(Time.of(Duration.ofHours(1L))).output().persist(listDataSink);
        new LocalExecutor().submit(create).join();
        DatasetAssert.unorderedEquals(listDataSink.getOutputs(), new Pair[]{Pair.of("key", 1L)});
    }

    public void testSimpleConsumeWildcard(OnlineAttributeWriter onlineAttributeWriter, BoundedDataSource<StreamElement> boundedDataSource) throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        onlineAttributeWriter.write(update("key", this.wildcard, "wildcard.1", currentTimeMillis), (z, th) -> {
            countDownLatch.countDown();
        });
        onlineAttributeWriter.write(update("key", this.wildcard, "wildcard.2", currentTimeMillis - 3600000), (z2, th2) -> {
            countDownLatch.countDown();
        });
        countDownLatch.await();
        Flow create = Flow.create();
        Dataset createInput = create.createInput(boundedDataSource);
        ListDataSink listDataSink = ListDataSink.get();
        CountByKey.of(AssignEventTime.of(createInput).using((v0) -> {
            return v0.getStamp();
        }).output()).keyBy((v0) -> {
            return v0.getKey();
        }).windowBy(Time.of(Duration.ofHours(1L))).output().persist(listDataSink);
        new LocalExecutor().submit(create).join();
        DatasetAssert.unorderedEquals(listDataSink.getOutputs(), new Pair[]{Pair.of("key", 1L), Pair.of("key", 1L)});
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OnlineAttributeWriter getWriter(AttributeDescriptor<byte[]> attributeDescriptor) {
        return (OnlineAttributeWriter) this.repo.getWriter(attributeDescriptor).orElseThrow(() -> {
            return new IllegalArgumentException("Attribute " + attributeDescriptor + " has no writer");
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1249358039:
                if (implMethodName.equals("getKey")) {
                    z = false;
                    break;
                }
                break;
            case 1965582861:
                if (implMethodName.equals("getStamp")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("cz/seznam/euphoria/core/client/functional/UnaryFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/StreamElement") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/String;")) {
                    return (v0) -> {
                        return v0.getKey();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("cz/seznam/euphoria/core/client/functional/UnaryFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/StreamElement") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/String;")) {
                    return (v0) -> {
                        return v0.getKey();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("cz/seznam/euphoria/core/client/functional/ExtractEventTime") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/StreamElement") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return (v0) -> {
                        return v0.getStamp();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
