package cz.o2.proxima.direct.kafka;

import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigResolveOptions;
import com.typesafe.config.ConfigValueFactory;
import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.commitlog.LogObserver;
import cz.o2.proxima.direct.core.CommitCallback;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.admin.NewTopic;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.Position;
import cz.o2.proxima.util.Optionals;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;

/* loaded from: input_file:cz/o2/proxima/direct/kafka/KafkaLogReaderIT.class */
public class KafkaLogReaderIT {
    private static final String CONFIG = "entities {\n  entity {\n    attributes {\n      foo: { scheme: bytes }\n    }\n  }\n}\nattributeFamilies {\n  scalar-primary {\n    entity: entity\n    attributes: [\"foo\"]\n    storage: \"kafka://\"${broker}\"/foo\"\n    type: primary\n    access: commit-log\n    watermark {\n      idle-policy-factory: cz.o2.proxima.direct.time.NotProgressingWatermarkIdlePolicy.Factory\n    }\n    assignment-timeout-ms: 100\n  }\n}\n";

    @Rule
    public final EmbeddedKafkaRule rule = new EmbeddedKafkaRule(1);

    @Test(timeout = 30000)
    public void testWriteRead() throws InterruptedException {
        EmbeddedKafkaBroker embeddedKafka = this.rule.getEmbeddedKafka();
        embeddedKafka.addTopics(new NewTopic[]{new NewTopic("foo", 3, (short) 1)});
        Repository ofTest = Repository.ofTest(ConfigFactory.parseString(CONFIG).resolveWith(ConfigFactory.empty().withValue("broker", ConfigValueFactory.fromAnyRef((String) Arrays.stream(embeddedKafka.getBrokerAddresses()).map(brokerAddress -> {
            return brokerAddress.getHost() + ":" + brokerAddress.getPort();
        }).collect(Collectors.joining(",")))), ConfigResolveOptions.noSystem()), new Repository.Validate[0]);
        EntityDescriptor entity = ofTest.getEntity("entity");
        AttributeDescriptor attribute = entity.getAttribute("foo");
        CommitLogReader commitLogReader = (CommitLogReader) Optionals.get(ofTest.getOrCreateOperator(DirectDataOperator.class, new Consumer[0]).getCommitLogReader(new AttributeDescriptor[]{attribute}));
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        commitLogReader.observe("test-reader", Position.OLDEST, new LogObserver() { // from class: cz.o2.proxima.direct.kafka.KafkaLogReaderIT.1
            public void onCompleted() {
                countDownLatch.countDown();
            }

            public boolean onError(Throwable th) {
                return false;
            }

            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                System.out.println("WATERMARK " + onNextContext.getWatermark());
                atomicInteger.incrementAndGet();
                return Long.MAX_VALUE != onNextContext.getWatermark();
            }
        }).waitUntilReady();
        OnlineAttributeWriter onlineAttributeWriter = (OnlineAttributeWriter) Optionals.get(ofTest.getOrCreateOperator(DirectDataOperator.class, new Consumer[0]).getWriter(attribute));
        for (int i = 0; i < 100; i++) {
            onlineAttributeWriter.write(StreamElement.upsert(entity, attribute, UUID.randomUUID().toString(), Integer.toString(i), attribute.getName(), Long.MAX_VALUE, "value".getBytes(StandardCharsets.UTF_8)), CommitCallback.noop());
        }
        countDownLatch.await();
        Assert.assertTrue(atomicInteger.get() > 3);
    }
}
