package cz.o2.proxima.direct.bulk;

import cz.o2.proxima.direct.core.AbstractBulkAttributeWriter;
import cz.o2.proxima.direct.core.CommitCallback;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.functional.Factory;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Iterables;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Streams;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.util.ExceptionUtils;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/direct/bulk/AbstractBulkFileSystemAttributeWriter.class */
public abstract class AbstractBulkFileSystemAttributeWriter extends AbstractBulkAttributeWriter {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) AbstractBulkFileSystemAttributeWriter.class);
    private static final long serialVersionUID = 2;
    private final FileSystem fs;
    private final NamingConvention namingConvention;
    private final FileFormat format;
    private final long rollPeriodMs;
    private final long allowedLatenessMs;
    private final Factory<Executor> executorFactory;
    private final Map<Long, Bulk> writers;
    private final Map<String, LateBulk> lateWriters;
    private final AtomicInteger inFlightFlushes;
    private long seqNo;

    @Nullable
    private transient Executor executor;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:cz/o2/proxima/direct/bulk/AbstractBulkFileSystemAttributeWriter$Bulk.class */
    public class Bulk {
        private final Path path;
        private final Writer writer;
        private final long startTs;
        private final long maxTs;

        @Nullable
        private CommitCallback commit = null;
        private long lastWriteWatermark = Long.MIN_VALUE;
        private long firstWriteSeqNo = -1;
        private long lastWriteSeqNo = 0;

        Bulk(Path path, long j, long j2) throws IOException {
            this.path = path;
            this.writer = AbstractBulkFileSystemAttributeWriter.this.format.openWriter(path, AbstractBulkFileSystemAttributeWriter.this.getEntityDescriptor());
            this.startTs = j;
            this.maxTs = j2;
        }

        synchronized void write(StreamElement streamElement, CommitCallback commitCallback, long j, long j2) throws IOException {
            this.commit = commitCallback;
            this.lastWriteWatermark = j;
            this.lastWriteSeqNo = j2;
            if (this.firstWriteSeqNo < 0) {
                this.firstWriteSeqNo = j2;
            }
            this.writer.write(streamElement);
        }

        public CommitCallback getCommit() {
            return (CommitCallback) Objects.requireNonNull(this.commit);
        }

        public String toString() {
            return "AbstractBulkFileSystemAttributeWriter.Bulk(path=" + getPath() + ", writer=" + getWriter() + ", startTs=" + getStartTs() + ", maxTs=" + getMaxTs() + ", commit=" + getCommit() + ", lastWriteWatermark=" + getLastWriteWatermark() + ", firstWriteSeqNo=" + getFirstWriteSeqNo() + ", lastWriteSeqNo=" + getLastWriteSeqNo() + ")";
        }

        public Path getPath() {
            return this.path;
        }

        public Writer getWriter() {
            return this.writer;
        }

        public long getStartTs() {
            return this.startTs;
        }

        public long getMaxTs() {
            return this.maxTs;
        }

        public long getLastWriteWatermark() {
            return this.lastWriteWatermark;
        }

        public long getFirstWriteSeqNo() {
            return this.firstWriteSeqNo;
        }

        public long getLastWriteSeqNo() {
            return this.lastWriteSeqNo;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/o2/proxima/direct/bulk/AbstractBulkFileSystemAttributeWriter$LateBulk.class */
    public class LateBulk extends Bulk {
        private long startTs;
        private long maxSeenTs;

        LateBulk(Path path) throws IOException {
            super(path, Long.MIN_VALUE, Long.MAX_VALUE);
            this.startTs = Long.MAX_VALUE;
            this.maxSeenTs = Long.MIN_VALUE;
        }

        @Override // cz.o2.proxima.direct.bulk.AbstractBulkFileSystemAttributeWriter.Bulk
        synchronized void write(StreamElement streamElement, CommitCallback commitCallback, long j, long j2) throws IOException {
            super.write(streamElement, commitCallback, j, j2);
            this.maxSeenTs = Math.max(streamElement.getStamp(), this.maxSeenTs);
            this.startTs = Math.min(streamElement.getStamp(), this.startTs);
        }

        @Override // cz.o2.proxima.direct.bulk.AbstractBulkFileSystemAttributeWriter.Bulk
        public long getStartTs() {
            return this.startTs;
        }

        @Override // cz.o2.proxima.direct.bulk.AbstractBulkFileSystemAttributeWriter.Bulk
        public long getMaxTs() {
            return this.maxSeenTs;
        }

        @Override // cz.o2.proxima.direct.bulk.AbstractBulkFileSystemAttributeWriter.Bulk
        public String toString() {
            return "AbstractBulkFileSystemAttributeWriter.LateBulk(startTs=" + getStartTs() + ", maxSeenTs=" + this.maxSeenTs + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractBulkFileSystemAttributeWriter(EntityDescriptor entityDescriptor, URI uri, FileSystem fileSystem, NamingConvention namingConvention, FileFormat fileFormat, Context context, long j, long j2) {
        super(entityDescriptor, uri);
        this.writers = Collections.synchronizedMap(new HashMap());
        this.lateWriters = Collections.synchronizedMap(new HashMap());
        this.inFlightFlushes = new AtomicInteger();
        this.seqNo = 0L;
        this.executor = null;
        this.fs = fileSystem;
        this.namingConvention = namingConvention;
        this.format = fileFormat;
        this.rollPeriodMs = j;
        this.allowedLatenessMs = j2;
        Objects.requireNonNull(context);
        this.executorFactory = context::getExecutorService;
    }

    public void write(StreamElement streamElement, long j, CommitCallback commitCallback) {
        long stamp = streamElement.getStamp() - (streamElement.getStamp() % this.rollPeriodMs);
        long max = Math.max(stamp + this.rollPeriodMs, stamp);
        if (max + this.allowedLatenessMs >= j) {
            writeToBulk(streamElement, commitCallback, j, this.writers.computeIfAbsent(Long.valueOf(stamp), l -> {
                return (Bulk) ExceptionUtils.uncheckedFactory(() -> {
                    return new Bulk(this.fs.newPath(stamp), stamp, max);
                });
            }));
        } else {
            handleLateData(streamElement, j, commitCallback);
        }
        flushOnWatermark(j);
    }

    protected void handleLateData(StreamElement streamElement, long j, CommitCallback commitCallback) {
        LateBulk computeIfAbsent = this.lateWriters.computeIfAbsent((String) Iterables.getOnlyElement(this.namingConvention.prefixesOf(streamElement.getStamp(), streamElement.getStamp())), str -> {
            return newLateBulkFor(streamElement);
        });
        writeToBulk(streamElement, commitCallback, j, computeIfAbsent);
        log.debug("Written late element {} to {} on watermark {}", streamElement, computeIfAbsent.getWriter(), Long.valueOf(j));
    }

    private LateBulk newLateBulkFor(StreamElement streamElement) {
        return (LateBulk) ExceptionUtils.uncheckedFactory(() -> {
            log.debug("Created new late bulk for {}", streamElement);
            return new LateBulk(this.fs.newPath(Long.MAX_VALUE));
        });
    }

    private void writeToBulk(StreamElement streamElement, CommitCallback commitCallback, long j, Bulk bulk) {
        ExceptionUtils.unchecked(() -> {
            long j2 = this.seqNo;
            this.seqNo = j2 + 1;
            bulk.write(streamElement, commitCallback, j, j2);
        });
        log.debug("Written element {} to {} on watermark {}", streamElement, bulk.getWriter(), Long.valueOf(j));
    }

    public void updateWatermark(long j) {
        flushOnWatermark(j);
    }

    public void rollback() {
        close();
    }

    public void close() {
        synchronized (this.writers) {
            synchronized (this.lateWriters) {
                Streams.concat(new Stream[]{this.writers.values().stream(), this.lateWriters.values().stream()}).forEach(bulk -> {
                    Path path = bulk.getWriter().getPath();
                    try {
                        bulk.getWriter().close();
                        path.delete();
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                });
                this.lateWriters.clear();
            }
            this.writers.clear();
        }
    }

    protected abstract void flush(Bulk bulk);

    private void flushOnWatermark(long j) {
        Collection<Bulk> collectFlushable = collectFlushable(j);
        if (collectFlushable.isEmpty()) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("Collected {} flushable bulks at watermark {} with allowedLatenessMs {}.", collectFlushable, Long.valueOf(j), Long.valueOf(this.allowedLatenessMs));
        }
        CommitCallback commit = collectFlushable.stream().max(Comparator.comparing((v0) -> {
            return v0.getLastWriteSeqNo();
        })).get().getCommit();
        try {
            AtomicInteger atomicInteger = new AtomicInteger(collectFlushable.size());
            collectFlushable.forEach(bulk -> {
                ExceptionUtils.unchecked(() -> {
                    bulk.getWriter().close();
                });
                executor().execute(() -> {
                    try {
                        flush(bulk);
                        log.info("Flushed path {}", bulk.getPath());
                        if (atomicInteger.decrementAndGet() == 0) {
                            commit.commit(true, (Throwable) null);
                        }
                        this.inFlightFlushes.decrementAndGet();
                    } catch (Exception e) {
                        this.inFlightFlushes.decrementAndGet();
                        log.error("Failed to flush path {}", bulk.getPath(), e);
                        atomicInteger.set(-1);
                        Path path = bulk.getPath();
                        Objects.requireNonNull(path);
                        ExceptionUtils.unchecked(path::delete);
                        commit.commit(false, e);
                    }
                });
                this.writers.remove(Long.valueOf(bulk.getStartTs()));
            });
        } catch (Exception e) {
            log.error("Failed to flush paths {}", collectFlushable, e);
            commit.commit(false, e);
        }
    }

    private Executor executor() {
        if (this.executor == null) {
            this.executor = (Executor) this.executorFactory.apply();
        }
        return this.executor;
    }

    private Collection<Bulk> collectFlushable(long j) {
        int size;
        if (this.inFlightFlushes.get() > 0) {
            return Collections.emptyList();
        }
        synchronized (this.writers) {
            Set<Bulk> set = (Set) this.writers.values().stream().filter(bulk -> {
                return bulk.getMaxTs() + this.allowedLatenessMs < j;
            }).collect(Collectors.toSet());
            long maxWriteSeqNo = getMaxWriteSeqNo(set);
            if (set.isEmpty()) {
                return set;
            }
            synchronized (this.lateWriters) {
                do {
                    size = set.size();
                    getAdditionalBulks(this.lateWriters.entrySet(), maxWriteSeqNo).forEach(entry -> {
                        set.add(this.lateWriters.remove(entry.getKey()));
                    });
                    getAdditionalBulks(this.writers.entrySet(), getMaxWriteSeqNo(set)).forEach(entry2 -> {
                        set.add((Bulk) entry2.getValue());
                    });
                    maxWriteSeqNo = getMaxWriteSeqNo(set);
                } while (size < set.size());
            }
            this.inFlightFlushes.addAndGet(set.size());
            return set;
        }
    }

    private long getMaxWriteSeqNo(Set<Bulk> set) {
        return set.stream().mapToLong((v0) -> {
            return v0.getLastWriteSeqNo();
        }).max().orElse(Long.MIN_VALUE);
    }

    private <K, B extends Bulk> List<Map.Entry<K, B>> getAdditionalBulks(Collection<Map.Entry<K, B>> collection, long j) {
        return (List) collection.stream().filter(entry -> {
            return ((Bulk) entry.getValue()).getFirstWriteSeqNo() < j;
        }).collect(Collectors.toList());
    }

    public FileSystem getFs() {
        return this.fs;
    }

    public NamingConvention getNamingConvention() {
        return this.namingConvention;
    }

    public FileFormat getFormat() {
        return this.format;
    }

    public long getRollPeriodMs() {
        return this.rollPeriodMs;
    }

    public long getAllowedLatenessMs() {
        return this.allowedLatenessMs;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1828840556:
                if (implMethodName.equals("lambda$write$1add582d$1")) {
                    z = true;
                    break;
                }
                break;
            case -1358870757:
                if (implMethodName.equals("lambda$newLateBulkFor$fd412a5c$1")) {
                    z = 2;
                    break;
                }
                break;
            case -1335458389:
                if (implMethodName.equals("delete")) {
                    z = 5;
                    break;
                }
                break;
            case 868027649:
                if (implMethodName.equals("lambda$flushOnWatermark$c1dd5951$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1854485548:
                if (implMethodName.equals("getExecutorService")) {
                    z = false;
                    break;
                }
                break;
            case 1982607952:
                if (implMethodName.equals("lambda$writeToBulk$34fb43a3$1")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/core/Context") && serializedLambda.getImplMethodSignature().equals("()Ljava/util/concurrent/ExecutorService;")) {
                    Context context = (Context) serializedLambda.getCapturedArg(0);
                    return context::getExecutorService;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/util/ExceptionUtils$ThrowingFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/bulk/AbstractBulkFileSystemAttributeWriter") && serializedLambda.getImplMethodSignature().equals("(JJ)Lcz/o2/proxima/direct/bulk/AbstractBulkFileSystemAttributeWriter$Bulk;")) {
                    AbstractBulkFileSystemAttributeWriter abstractBulkFileSystemAttributeWriter = (AbstractBulkFileSystemAttributeWriter) serializedLambda.getCapturedArg(0);
                    long longValue = ((Long) serializedLambda.getCapturedArg(1)).longValue();
                    long longValue2 = ((Long) serializedLambda.getCapturedArg(2)).longValue();
                    return () -> {
                        return new Bulk(this.fs.newPath(longValue), longValue, longValue2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/util/ExceptionUtils$ThrowingFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/bulk/AbstractBulkFileSystemAttributeWriter") && serializedLambda.getImplMethodSignature().equals("(Lcz/o2/proxima/storage/StreamElement;)Lcz/o2/proxima/direct/bulk/AbstractBulkFileSystemAttributeWriter$LateBulk;")) {
                    AbstractBulkFileSystemAttributeWriter abstractBulkFileSystemAttributeWriter2 = (AbstractBulkFileSystemAttributeWriter) serializedLambda.getCapturedArg(0);
                    StreamElement streamElement = (StreamElement) serializedLambda.getCapturedArg(1);
                    return () -> {
                        log.debug("Created new late bulk for {}", streamElement);
                        return new LateBulk(this.fs.newPath(Long.MAX_VALUE));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/util/ExceptionUtils$ThrowingRunnable") && serializedLambda.getFunctionalInterfaceMethodName().equals("run") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/bulk/AbstractBulkFileSystemAttributeWriter") && serializedLambda.getImplMethodSignature().equals("(Lcz/o2/proxima/direct/bulk/AbstractBulkFileSystemAttributeWriter$Bulk;Lcz/o2/proxima/storage/StreamElement;Lcz/o2/proxima/direct/core/CommitCallback;J)V")) {
                    AbstractBulkFileSystemAttributeWriter abstractBulkFileSystemAttributeWriter3 = (AbstractBulkFileSystemAttributeWriter) serializedLambda.getCapturedArg(0);
                    Bulk bulk = (Bulk) serializedLambda.getCapturedArg(1);
                    StreamElement streamElement2 = (StreamElement) serializedLambda.getCapturedArg(2);
                    CommitCallback commitCallback = (CommitCallback) serializedLambda.getCapturedArg(3);
                    long longValue3 = ((Long) serializedLambda.getCapturedArg(4)).longValue();
                    return () -> {
                        long j2 = this.seqNo;
                        this.seqNo = j2 + 1;
                        bulk.write(streamElement2, commitCallback, longValue3, j2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/util/ExceptionUtils$ThrowingRunnable") && serializedLambda.getFunctionalInterfaceMethodName().equals("run") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/bulk/AbstractBulkFileSystemAttributeWriter") && serializedLambda.getImplMethodSignature().equals("(Lcz/o2/proxima/direct/bulk/AbstractBulkFileSystemAttributeWriter$Bulk;)V")) {
                    Bulk bulk2 = (Bulk) serializedLambda.getCapturedArg(0);
                    return () -> {
                        bulk2.getWriter().close();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/util/ExceptionUtils$ThrowingRunnable") && serializedLambda.getFunctionalInterfaceMethodName().equals("run") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/bulk/Path") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    Path path = (Path) serializedLambda.getCapturedArg(0);
                    return path::delete;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
