package cz.o2.proxima.direct.bulk;

import com.typesafe.config.ConfigFactory;
import cz.o2.proxima.direct.bulk.AbstractBulkFileSystemAttributeWriter;
import cz.o2.proxima.direct.core.BulkAttributeWriter;
import cz.o2.proxima.direct.core.CommitCallback;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Iterables;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Lists;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Sets;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.util.ExceptionUtils;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:cz/o2/proxima/direct/bulk/AbstractBulkFileSystemAttributeWriterTest.class */
public class AbstractBulkFileSystemAttributeWriterTest implements Serializable {
    private static final Logger log = LoggerFactory.getLogger(AbstractBulkFileSystemAttributeWriterTest.class);

    @Parameterized.Parameter
    public Params params;
    AbstractBulkFileSystemAttributeWriter writer;
    Set<String> flushedPaths;
    Map<Long, List<StreamElement>> written;

    @Rule
    public final TemporaryFolder tempFolder = new TemporaryFolder();
    final URI uri = URI.create("abstract-bulk:///");
    final Repository repo = Repository.ofTest(ConfigFactory.load().resolve(), new Repository.Validate[0]);
    final DirectDataOperator direct = this.repo.getOrCreateOperator(DirectDataOperator.class, new Consumer[0]);
    AtomicReference<Throwable> onFlush = new AtomicReference<>();
    final AttributeDescriptor<?> wildcard = AttributeDescriptor.newBuilder(this.repo).setEntity("dummy").setSchemeUri(new URI("bytes:///")).setName("wildcard.*").build();
    final AttributeDescriptor<?> attr = AttributeDescriptor.newBuilder(this.repo).setEntity("dummy").setSchemeUri(new URI("bytes:///")).setName("attr").build();
    final EntityDescriptor entity = EntityDescriptor.newBuilder().setName("dummy").addAttribute(this.attr).addAttribute(this.wildcard).build();

    /* loaded from: input_file:cz/o2/proxima/direct/bulk/AbstractBulkFileSystemAttributeWriterTest$InstantiableJsonFormat.class */
    public static class InstantiableJsonFormat extends JsonFormat {
        private static final long serialVersionUID = 1;

        public InstantiableJsonFormat() {
            super(false);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/o2/proxima/direct/bulk/AbstractBulkFileSystemAttributeWriterTest$Params.class */
    public static final class Params {
        private final long rollPeriod;
        private final String format;
        private final boolean gzip;
        private final long allowedLateness;

        FileSystem getFs(TemporaryFolder temporaryFolder, String str) throws IOException {
            return FileSystem.local(new File(temporaryFolder.newFolder(), str), getNamingConvention());
        }

        FileFormat getFileFormat() {
            return FileFormatUtils.getFileFormatFromName(this.format, this.gzip);
        }

        NamingConvention getNamingConvention() {
            return NamingConvention.defaultConvention(Duration.ofMillis(this.rollPeriod), "prefix", getFileFormat().fileSuffix());
        }

        public Params(long j, String str, boolean z, long j2) {
            this.rollPeriod = j;
            this.format = str;
            this.gzip = z;
            this.allowedLateness = j2;
        }

        public long getRollPeriod() {
            return this.rollPeriod;
        }

        public String getFormat() {
            return this.format;
        }

        public boolean isGzip() {
            return this.gzip;
        }

        public long getAllowedLateness() {
            return this.allowedLateness;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof Params)) {
                return false;
            }
            Params params = (Params) obj;
            if (getRollPeriod() != params.getRollPeriod()) {
                return false;
            }
            String format = getFormat();
            String format2 = params.getFormat();
            if (format == null) {
                if (format2 != null) {
                    return false;
                }
            } else if (!format.equals(format2)) {
                return false;
            }
            return isGzip() == params.isGzip() && getAllowedLateness() == params.getAllowedLateness();
        }

        public int hashCode() {
            long rollPeriod = getRollPeriod();
            int i = (1 * 59) + ((int) ((rollPeriod >>> 32) ^ rollPeriod));
            String format = getFormat();
            int hashCode = (((i * 59) + (format == null ? 43 : format.hashCode())) * 59) + (isGzip() ? 79 : 97);
            long allowedLateness = getAllowedLateness();
            return (hashCode * 59) + ((int) ((allowedLateness >>> 32) ^ allowedLateness));
        }

        public String toString() {
            return "AbstractBulkFileSystemAttributeWriterTest.Params(rollPeriod=" + getRollPeriod() + ", format=" + getFormat() + ", gzip=" + isGzip() + ", allowedLateness=" + getAllowedLateness() + ")";
        }
    }

    @Parameterized.Parameters
    public static Collection<Params> parameters() {
        return Arrays.asList(new Params(1000L, "binary", false, 0L), new Params(1000L, "binary", false, 500L), new Params(1000L, "binary", true, 0L), new Params(1000L, "binary", true, 500L), new Params(1000L, "json", false, 0L), new Params(1000L, "json", false, 500L), new Params(1000L, "json", true, 0L), new Params(1000L, "json", true, 500L), new Params(2000L, "binary", false, 0L), new Params(2000L, "binary", false, 500L), new Params(2000L, "binary", true, 0L), new Params(2000L, "binary", true, 500L), new Params(2000L, "json", false, 0L), new Params(2000L, "json", false, 500L), new Params(2000L, "json", true, 0L), new Params(2000L, "json", true, 500L), new Params(1000L, InstantiableJsonFormat.class.getCanonicalName(), false, 0L), new Params(1000L, InstantiableJsonFormat.class.getCanonicalName(), false, 500L), new Params(2000L, InstantiableJsonFormat.class.getCanonicalName(), true, 0L), new Params(2000L, InstantiableJsonFormat.class.getCanonicalName(), true, 500L));
    }

    @Before
    public void setUp() throws IOException {
        this.flushedPaths = Collections.synchronizedSet(new TreeSet());
        this.onFlush.set(null);
        this.written = Collections.synchronizedMap(new HashMap());
        this.writer = initWriter();
    }

    @After
    public void tearDown() {
        this.writer.close();
    }

    AbstractBulkFileSystemAttributeWriter initWriter() throws IOException {
        return new AbstractBulkFileSystemAttributeWriter(this.entity, this.uri, this.params.getFs(this.tempFolder, "/path/"), this.params.getNamingConvention(), this.params.getFileFormat(), this.direct.getContext(), this.params.getRollPeriod(), this.params.getAllowedLateness()) { // from class: cz.o2.proxima.direct.bulk.AbstractBulkFileSystemAttributeWriterTest.1
            /* renamed from: asFactory, reason: merged with bridge method [inline-methods] */
            public BulkAttributeWriter.Factory<?> m1asFactory() {
                return repository -> {
                    return (AbstractBulkFileSystemAttributeWriter) ExceptionUtils.uncheckedFactory(() -> {
                        return AbstractBulkFileSystemAttributeWriterTest.this.initWriter();
                    });
                };
            }

            protected void flush(AbstractBulkFileSystemAttributeWriter.Bulk bulk) {
                Throwable andSet = AbstractBulkFileSystemAttributeWriterTest.this.onFlush.getAndSet(null);
                if (andSet != null) {
                    throw new RuntimeException(andSet);
                }
                AbstractBulkFileSystemAttributeWriterTest.this.flushedPaths.add(bulk.getPath().toString());
                try {
                    ArrayList newArrayList = Lists.newArrayList(AbstractBulkFileSystemAttributeWriterTest.this.params.getFileFormat().openReader(bulk.getPath(), AbstractBulkFileSystemAttributeWriterTest.this.entity));
                    AbstractBulkFileSystemAttributeWriterTest.log.debug("Putting elements {} to stamp {}", newArrayList, Long.valueOf(bulk.getMaxTs()));
                    AbstractBulkFileSystemAttributeWriterTest.this.written.put(Long.valueOf(bulk.getMaxTs()), newArrayList);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case -942836497:
                        if (implMethodName.equals("lambda$asFactory$bbb3fae2$1")) {
                            z = false;
                            break;
                        }
                        break;
                    case -745077365:
                        if (implMethodName.equals("lambda$asFactory$aa33392a$1")) {
                            z = true;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/direct/core/BulkAttributeWriter$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/bulk/AbstractBulkFileSystemAttributeWriterTest$1") && serializedLambda.getImplMethodSignature().equals("(Lcz/o2/proxima/repository/Repository;)Lcz/o2/proxima/direct/core/BulkAttributeWriter;")) {
                            AnonymousClass1 anonymousClass1 = (AnonymousClass1) serializedLambda.getCapturedArg(0);
                            return repository -> {
                                return (AbstractBulkFileSystemAttributeWriter) ExceptionUtils.uncheckedFactory(() -> {
                                    return AbstractBulkFileSystemAttributeWriterTest.this.initWriter();
                                });
                            };
                        }
                        break;
                    case true:
                        if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/util/ExceptionUtils$ThrowingFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/bulk/AbstractBulkFileSystemAttributeWriterTest$1") && serializedLambda.getImplMethodSignature().equals("()Lcz/o2/proxima/direct/bulk/AbstractBulkFileSystemAttributeWriter;")) {
                            AnonymousClass1 anonymousClass12 = (AnonymousClass1) serializedLambda.getCapturedArg(0);
                            return () -> {
                                return AbstractBulkFileSystemAttributeWriterTest.this.initWriter();
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        };
    }

    @Test(timeout = 10000)
    public synchronized void testWrite() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        StreamElement upsert = StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key", "attr", 1500000000000L, new byte[]{1, 2});
        StreamElement upsert2 = StreamElement.upsert(this.entity, this.wildcard, UUID.randomUUID().toString(), "key", "wildcard.1", 1500000000000L + 200, new byte[]{3});
        write(upsert, (z, th) -> {
            Assert.fail("This should not have been committed!");
        });
        write(upsert2, (z2, th2) -> {
            Assert.assertTrue("Exception " + th2, z2);
            Assert.assertNull(th2);
            atomicInteger.incrementAndGet();
            countDownLatch.countDown();
        });
        this.writer.updateWatermark(Long.MAX_VALUE);
        countDownLatch.await();
        Assert.assertEquals(1L, atomicInteger.get());
        Assert.assertEquals(1L, this.written.size());
        validate(this.written.get(Long.valueOf(1500000000000L + this.params.getRollPeriod())), upsert, upsert2);
        Assert.assertEquals(1L, this.flushedPaths.size());
        Assert.assertTrue(this.params.getNamingConvention().isInRange((String) Iterables.getOnlyElement(this.flushedPaths), 1500000000000L, 1500000000000L + 1));
        Assert.assertTrue(((String) Iterables.getOnlyElement(this.flushedPaths)).contains("/path/2017/07/"));
    }

    @Test(timeout = 10000)
    public synchronized void testWriteAutoFlush() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        StreamElement upsert = StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key", "attr", 1500000000000L, new byte[]{1, 2});
        StreamElement upsert2 = StreamElement.upsert(this.entity, this.wildcard, UUID.randomUUID().toString(), "key", "wildcard.1", 1500000000000L + (2 * this.params.getRollPeriod()), new byte[]{3});
        write(upsert, (z, th) -> {
            Assert.assertTrue("Exception " + th, z);
            Assert.assertNull(th);
            countDownLatch.countDown();
        });
        write(upsert2, (z2, th2) -> {
            Assert.fail("Should not be committed");
        });
        countDownLatch.await();
        Assert.assertEquals(1L, this.written.size());
        validate(this.written.get(Long.valueOf(1500000000000L + this.params.getRollPeriod())), upsert);
        Assert.assertEquals(1L, this.flushedPaths.size());
        Assert.assertTrue(this.params.getNamingConvention().isInRange(((String) Iterables.getOnlyElement(this.flushedPaths)).toString(), 1500000000000L, 1500000000000L + 1));
        Assert.assertTrue(((String) Iterables.getOnlyElement(this.flushedPaths)).toString().contains("/path/2017/07/"));
    }

    @Test(timeout = 10000)
    public synchronized void testWriteOutOfOrder() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        long j = 1500000000000L;
        StreamElement[] streamElementArr = {StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key", "attr", 1500000000000L + 200, new byte[]{1}), StreamElement.upsert(this.entity, this.wildcard, UUID.randomUUID().toString(), "key", "wildcard.1", 1500000000000L, new byte[]{1, 2}), StreamElement.upsert(this.entity, this.wildcard, UUID.randomUUID().toString(), "key", "wildcard.1", (1500000000000L + this.params.getRollPeriod()) - 1, new byte[]{1, 2, 3}), StreamElement.upsert(this.entity, this.wildcard, UUID.randomUUID().toString(), "key", "wildcard.1", 1500000000000L + (this.params.getRollPeriod() / 2), new byte[]{1, 2, 3, 4}), StreamElement.upsert(this.entity, this.wildcard, UUID.randomUUID().toString(), "key", "wildcard.1", 1500000000000L + (2 * this.params.getRollPeriod()), new byte[]{1, 2, 3, 4, 5})};
        Arrays.stream(streamElementArr).forEach(streamElement -> {
            write(streamElement, (z, th) -> {
                Assert.assertTrue("Exception " + th, z);
                Assert.assertNull(th);
                Assert.assertEquals(j + (this.params.getRollPeriod() / 2), streamElement.getStamp());
                countDownLatch.countDown();
            });
        });
        countDownLatch.await();
        Assert.assertEquals(1L, this.written.size());
        validate(this.written.get(Long.valueOf(1500000000000L + this.params.getRollPeriod())), streamElementArr[0], streamElementArr[1], streamElementArr[2], streamElementArr[3]);
        Assert.assertEquals(1L, this.flushedPaths.size());
        Assert.assertTrue(this.params.getNamingConvention().isInRange(((String) Iterables.getOnlyElement(this.flushedPaths)).toString(), 1500000000000L, 1500000000000L + 1));
        Assert.assertTrue(((String) Iterables.getOnlyElement(this.flushedPaths)).toString().contains("/path/2017/07/"));
    }

    @Test(timeout = 10000)
    public synchronized void testFlushingOnOutOfOrder() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        long j = 1500000000000L;
        StreamElement[] streamElementArr = {StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key", "attr", 1500000000000L + 200, new byte[]{1}), StreamElement.upsert(this.entity, this.wildcard, UUID.randomUUID().toString(), "key", "wildcard.1", 1500000000000L, new byte[]{1, 2}), StreamElement.upsert(this.entity, this.wildcard, UUID.randomUUID().toString(), "key", "wildcard.1", 1500000000000L + this.params.getRollPeriod(), new byte[]{1, 2, 3}), StreamElement.upsert(this.entity, this.wildcard, UUID.randomUUID().toString(), "key", "wildcard.1", 1500000000000L + (this.params.getRollPeriod() / 2), new byte[]{1, 2, 3, 4}), StreamElement.upsert(this.entity, this.wildcard, UUID.randomUUID().toString(), "key", "wildcard.1", 1500000000000L + (3 * this.params.getRollPeriod()), new byte[]{1, 2, 3, 4, 5})};
        ArrayList arrayList = new ArrayList(Arrays.asList(1500000000000L, 1500000000000L, Long.valueOf(1500000000000L + (this.params.getRollPeriod() / 2)), Long.valueOf(1500000000000L + (this.params.getRollPeriod() / 2)), Long.valueOf((1500000000000L + (3 * this.params.getRollPeriod())) - (this.params.getRollPeriod() / 2))));
        Arrays.stream(streamElementArr).forEach(streamElement -> {
            write(streamElement, ((Long) arrayList.remove(0)).longValue(), (z, th) -> {
                Assert.assertTrue("Exception " + th, z);
                Assert.assertNull(th);
                Assert.assertEquals(j + (this.params.getRollPeriod() / 2), streamElement.getStamp());
                countDownLatch.countDown();
            });
        });
        countDownLatch.await();
        Assert.assertEquals(2L, this.written.size());
        validate(this.written.get(Long.valueOf(1500000000000L + this.params.getRollPeriod())), streamElementArr[0], streamElementArr[1], streamElementArr[3]);
        validate(this.written.get(Long.valueOf(1500000000000L + (2 * this.params.getRollPeriod()))), streamElementArr[2]);
        Assert.assertEquals("Expected two paths, got " + this.flushedPaths, 2L, this.flushedPaths.size());
        Assert.assertTrue("Invalid range for " + ((String) Iterables.get(this.flushedPaths, 0)).toString(), this.params.getNamingConvention().isInRange(((String) Iterables.get(this.flushedPaths, 0)).toString(), 1500000000000L, 1500000000000L + 1));
        Assert.assertTrue(((String) Iterables.get(this.flushedPaths, 0)).contains("/path/2017/07/"));
        Assert.assertTrue(this.params.getNamingConvention().isInRange(((String) Iterables.get(this.flushedPaths, 1)).toString(), 1500000000000L + this.params.getRollPeriod(), 1500000000000L + this.params.getRollPeriod() + 1));
        Assert.assertTrue(((String) Iterables.get(this.flushedPaths, 1)).contains("/path/2017/07/"));
    }

    @Test(timeout = 10000)
    public synchronized void testFailWrite() throws InterruptedException {
        this.onFlush.set(new RuntimeException("Fail"));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Arrays.stream(new StreamElement[]{StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key", "attr", 1500000000000L + 200, new byte[]{1}), StreamElement.upsert(this.entity, this.wildcard, UUID.randomUUID().toString(), "key", "wildcard.1", 1500000000000L, new byte[]{1, 2})}).forEach(streamElement -> {
            write(streamElement, (z, th) -> {
                Assert.assertFalse(z);
                Assert.assertNotNull(th);
                countDownLatch.countDown();
            });
        });
        this.writer.updateWatermark(Long.MAX_VALUE);
        countDownLatch.await();
        Assert.assertTrue(this.written.isEmpty());
    }

    @Test(timeout = 10000)
    public synchronized void testWriteNoWatermark() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key", "attr", 1500000000000L, new byte[]{1, 2}), (z, th) -> {
            countDownLatch.countDown();
        });
        this.writer.updateWatermark(Long.MIN_VALUE);
        Assert.assertFalse(countDownLatch.await(500L, TimeUnit.MILLISECONDS));
    }

    @Test(timeout = 10000)
    public synchronized void testWriteLate() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        StreamElement[] streamElementArr = {StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key1", "attr", 1500000000000L + 200, new byte[]{1}), StreamElement.upsert(this.entity, this.wildcard, UUID.randomUUID().toString(), "key2", "wildcard.1", 1500000000000L, new byte[]{1, 2}), StreamElement.upsert(this.entity, this.wildcard, UUID.randomUUID().toString(), "key3", "wildcard.1", 1500000000000L + this.params.getRollPeriod() + 1, new byte[]{1, 2, 3}), StreamElement.upsert(this.entity, this.wildcard, UUID.randomUUID().toString(), "key4", "wildcard.1", 1500000000000L + (this.params.getRollPeriod() / 2) + 1, new byte[]{1, 2, 3, 4}), StreamElement.upsert(this.entity, this.wildcard, UUID.randomUUID().toString(), "key5", "wildcard.1", 1500000000000L + (3 * this.params.getRollPeriod()), new byte[]{1, 2, 3, 4, 5})};
        ArrayList arrayList = new ArrayList(Arrays.asList(1500000000000L, 1500000000000L, Long.valueOf(1500000000000L + (2 * this.params.getRollPeriod()) + this.params.getAllowedLateness() + 1), Long.valueOf(1500000000000L + (2 * this.params.getRollPeriod()) + this.params.getAllowedLateness() + 1), Long.valueOf(1500000000000L + (3 * this.params.getRollPeriod()) + this.params.getAllowedLateness())));
        Set synchronizedSet = Collections.synchronizedSet(new HashSet());
        Arrays.stream(streamElementArr).forEach(streamElement -> {
            write(streamElement, ((Long) arrayList.remove(0)).longValue(), (z, th) -> {
                Assert.assertTrue("Exception " + th, z);
                Assert.assertNull(th);
                synchronizedSet.add(Long.valueOf(streamElement.getStamp()));
                countDownLatch.countDown();
            });
        });
        long rollPeriod = 1500000000000L + (4 * this.params.getRollPeriod()) + 1 + this.params.getAllowedLateness();
        do {
            this.writer.bulk().updateWatermark(rollPeriod);
        } while (!countDownLatch.await(50L, TimeUnit.MILLISECONDS));
        Assert.assertEquals("Written: " + this.written.keySet(), 3L, this.written.size());
        validate(this.written.get(Long.valueOf(1500000000000L + this.params.getRollPeriod())), streamElementArr[0], streamElementArr[1]);
        validate(this.written.get(Long.valueOf(1500000000000L + this.params.getRollPeriod() + 1)), streamElementArr[2], streamElementArr[3]);
        validate(this.written.get(Long.valueOf(1500000000000L + (4 * this.params.getRollPeriod()))), streamElementArr[4]);
        Assert.assertEquals("Expected three paths, got " + this.flushedPaths, 3L, this.flushedPaths.size());
        Assert.assertEquals(2L, synchronizedSet.size());
        Assert.assertEquals(Sets.newHashSet(new Long[]{1500000000000L, Long.valueOf(1500000000000L + (3 * this.params.getRollPeriod()))}), synchronizedSet);
    }

    @Test(timeout = 10000)
    public synchronized void testWriteLateWithLargeDelay() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        StreamElement[] streamElementArr = {StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key1", "attr", 0L, new byte[]{1}), StreamElement.upsert(this.entity, this.wildcard, UUID.randomUUID().toString(), "key3", "wildcard.1", 1500000000000L, new byte[]{1, 2, 3}), StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key1", "attr", 1500000000000L + (2 * this.params.getRollPeriod()), new byte[]{1})};
        ArrayList arrayList = new ArrayList(Arrays.asList(1500000000000L, Long.valueOf(1500000000000L + (2 * this.params.getRollPeriod()) + this.params.getAllowedLateness()), Long.valueOf(1500000000000L + (2 * this.params.getRollPeriod()) + this.params.getAllowedLateness() + 1)));
        Set synchronizedSet = Collections.synchronizedSet(new HashSet());
        Arrays.stream(streamElementArr).forEach(streamElement -> {
            write(streamElement, ((Long) arrayList.remove(0)).longValue(), (z, th) -> {
                Assert.assertTrue("Exception " + th, z);
                Assert.assertNull(th);
                synchronizedSet.add(Long.valueOf(streamElement.getStamp()));
                countDownLatch.countDown();
            });
        });
        this.writer.bulk().updateWatermark(1500000000000L + (4 * this.params.getRollPeriod()) + 1 + this.params.getAllowedLateness());
        countDownLatch.await();
        Assert.assertEquals("Written: " + this.written.keySet(), 3L, this.written.size());
        validate(this.written.get(Long.valueOf(1500000000000L + (3 * this.params.getRollPeriod()))), streamElementArr[2]);
        validate(this.written.get(1500000000000L), streamElementArr[1]);
        validate(this.written.get(0L), streamElementArr[0]);
        Assert.assertEquals("Expected three paths, got " + this.flushedPaths, 3L, this.flushedPaths.size());
        Assert.assertEquals(1L, synchronizedSet.size());
        Assert.assertEquals(Sets.newHashSet(new Long[]{Long.valueOf(1500000000000L + (2 * this.params.getRollPeriod()))}), synchronizedSet);
    }

    @Test
    public void testUpdateWatermarkPrioToWrite() {
        this.writer.updateWatermark(System.currentTimeMillis());
        Assert.assertTrue(true);
    }

    private void validate(List<StreamElement> list, StreamElement... streamElementArr) throws IOException {
        Assert.assertNotNull(list);
        Iterator<StreamElement> it = list.iterator();
        Assert.assertEquals("Expected " + Arrays.toString(streamElementArr) + " got " + list, streamElementArr.length, list.size());
        for (StreamElement streamElement : streamElementArr) {
            Assert.assertEquals(it.next(), streamElement);
        }
    }

    private void write(StreamElement streamElement, CommitCallback commitCallback) {
        write(streamElement, streamElement.getStamp(), commitCallback);
    }

    private void write(StreamElement streamElement, long j, CommitCallback commitCallback) {
        this.writer.write(streamElement, j, commitCallback);
    }
}
