package org.radarbase.output.worker;

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import kotlin.Metadata;
import kotlin.io.CloseableKt;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.ranges.RangesKt;
import kotlin.sequences.SequencesKt;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.radarbase.output.FileStoreFactory;
import org.radarbase.output.accounting.Accountant;
import org.radarbase.output.accounting.OffsetRangeSet;
import org.radarbase.output.accounting.RemoteLockManager;
import org.radarbase.output.config.RestructureConfig;
import org.radarbase.output.config.TopicConfig;
import org.radarbase.output.config.WorkerConfig;
import org.radarbase.output.source.SourceStorage;
import org.radarbase.output.source.TopicFile;
import org.radarbase.output.source.TopicFileList;
import org.radarbase.output.util.TimeUtil;
import org.radarbase.output.worker.RadarKafkaRestructure;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* compiled from: RadarKafkaRestructure.kt */
@Metadata(mv = {1, 4, 2}, bv = {1, 0, 3}, k = 1, d1 = {"��j\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\"\n\u0002\u0010\u000e\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\b\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010 \n\u0002\b\u0004\u0018�� )2\u00020\u0001:\u0002)*B\r\u0012\u0006\u0010\u0002\u001a\u00020\u0003¢\u0006\u0002\u0010\u0004J\b\u0010\u0018\u001a\u00020\u0019H\u0016J\u0010\u0010\u001a\u001a\u00020\u001b2\u0006\u0010\u001c\u001a\u00020\u001dH\u0002J\u000e\u0010\u001e\u001a\u00020\u00192\u0006\u0010\u001f\u001a\u00020\u0007J(\u0010 \u001a\u00020\u001b2\u0006\u0010!\u001a\u00020\u00072\u0006\u0010\u001c\u001a\u00020\u001d2\u0006\u0010\"\u001a\u00020#2\u0006\u0010$\u001a\u00020%H\u0002J\u0016\u0010&\u001a\b\u0012\u0004\u0012\u00020\u001d0'2\u0006\u0010(\u001a\u00020\u001dH\u0002R\u0014\u0010\u0005\u001a\b\u0012\u0004\u0012\u00020\u00070\u0006X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\b\u001a\u00020\tX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\n\u001a\u00020\u000bX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\f\u001a\u00020\rX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u000e\u001a\u00020\u000fX\u0082\u0004¢\u0006\u0002\n��R\u0011\u0010\u0010\u001a\u00020\u0011¢\u0006\b\n��\u001a\u0004\b\u0012\u0010\u0013R\u0011\u0010\u0014\u001a\u00020\u0011¢\u0006\b\n��\u001a\u0004\b\u0015\u0010\u0013R\u000e\u0010\u0016\u001a\u00020\u0017X\u0082\u0004¢\u0006\u0002\n��¨\u0006+"}, d2 = {"Lorg/radarbase/output/worker/RadarKafkaRestructure;", "Ljava/io/Closeable;", "fileStoreFactory", "Lorg/radarbase/output/FileStoreFactory;", "(Lorg/radarbase/output/FileStoreFactory;)V", "excludeTopics", "", "", "isClosed", "Ljava/util/concurrent/atomic/AtomicBoolean;", "lockManager", "Lorg/radarbase/output/accounting/RemoteLockManager;", "maxFilesPerTopic", "", "minimumFileAge", "Ljava/time/Duration;", "processedFileCount", "Ljava/util/concurrent/atomic/LongAdder;", "getProcessedFileCount", "()Ljava/util/concurrent/atomic/LongAdder;", "processedRecordsCount", "getProcessedRecordsCount", "sourceStorage", "Lorg/radarbase/output/source/SourceStorage;", "close", "", "mapTopic", "Lorg/radarbase/output/worker/RadarKafkaRestructure$ProcessingStatistics;", "topicPath", "Ljava/nio/file/Path;", "process", "directoryName", "startWorker", "topic", "accountant", "Lorg/radarbase/output/accounting/Accountant;", "seenFiles", "Lorg/radarbase/output/accounting/OffsetRangeSet;", "topicPaths", "", "root", "Companion", "ProcessingStatistics", "radar-output-restructure"})
/* loaded from: input_file:org/radarbase/output/worker/RadarKafkaRestructure.class */
public final class RadarKafkaRestructure implements Closeable {
    private final SourceStorage sourceStorage;
    private final RemoteLockManager lockManager;
    private final AtomicBoolean isClosed;
    private final Set<String> excludeTopics;
    private final int maxFilesPerTopic;
    private final Duration minimumFileAge;

    @NotNull
    private final LongAdder processedFileCount;

    @NotNull
    private final LongAdder processedRecordsCount;
    private final FileStoreFactory fileStoreFactory;

    @NotNull
    public static final Companion Companion = new Companion(null);
    private static final Logger logger = LoggerFactory.getLogger(RadarKafkaRestructure.class);

    /* compiled from: RadarKafkaRestructure.kt */
    @Metadata(mv = {1, 4, 2}, bv = {1, 0, 3}, k = 1, d1 = {"��\u0014\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002R\u0016\u0010\u0003\u001a\n \u0005*\u0004\u0018\u00010\u00040\u0004X\u0082\u0004¢\u0006\u0002\n��¨\u0006\u0006"}, d2 = {"Lorg/radarbase/output/worker/RadarKafkaRestructure$Companion;", "", "()V", "logger", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "radar-output-restructure"})
    /* loaded from: input_file:org/radarbase/output/worker/RadarKafkaRestructure$Companion.class */
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* compiled from: RadarKafkaRestructure.kt */
    @Metadata(mv = {1, 4, 2}, bv = {1, 0, 3}, k = 1, d1 = {"��&\n\u0002\u0018\u0002\n\u0002\u0010��\n��\n\u0002\u0010\t\n\u0002\b\t\n\u0002\u0010\u000b\n\u0002\b\u0002\n\u0002\u0010\b\n��\n\u0002\u0010\u000e\n��\b\u0082\b\u0018��2\u00020\u0001B\u0015\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0003¢\u0006\u0002\u0010\u0005J\t\u0010\t\u001a\u00020\u0003HÆ\u0003J\t\u0010\n\u001a\u00020\u0003HÆ\u0003J\u001d\u0010\u000b\u001a\u00020��2\b\b\u0002\u0010\u0002\u001a\u00020\u00032\b\b\u0002\u0010\u0004\u001a\u00020\u0003HÆ\u0001J\u0013\u0010\f\u001a\u00020\r2\b\u0010\u000e\u001a\u0004\u0018\u00010\u0001HÖ\u0003J\t\u0010\u000f\u001a\u00020\u0010HÖ\u0001J\t\u0010\u0011\u001a\u00020\u0012HÖ\u0001R\u0011\u0010\u0002\u001a\u00020\u0003¢\u0006\b\n��\u001a\u0004\b\u0006\u0010\u0007R\u0011\u0010\u0004\u001a\u00020\u0003¢\u0006\b\n��\u001a\u0004\b\b\u0010\u0007¨\u0006\u0013"}, d2 = {"Lorg/radarbase/output/worker/RadarKafkaRestructure$ProcessingStatistics;", "", "fileCount", "", "recordCount", "(JJ)V", "getFileCount", "()J", "getRecordCount", "component1", "component2", "copy", "equals", "", "other", "hashCode", "", "toString", "", "radar-output-restructure"})
    /* loaded from: input_file:org/radarbase/output/worker/RadarKafkaRestructure$ProcessingStatistics.class */
    public static final class ProcessingStatistics {
        private final long fileCount;
        private final long recordCount;

        public final long getFileCount() {
            return this.fileCount;
        }

        public final long getRecordCount() {
            return this.recordCount;
        }

        public ProcessingStatistics(long j, long j2) {
            this.fileCount = j;
            this.recordCount = j2;
        }

        public final long component1() {
            return this.fileCount;
        }

        public final long component2() {
            return this.recordCount;
        }

        @NotNull
        public final ProcessingStatistics copy(long j, long j2) {
            return new ProcessingStatistics(j, j2);
        }

        public static /* synthetic */ ProcessingStatistics copy$default(ProcessingStatistics processingStatistics, long j, long j2, int i, Object obj) {
            if ((i & 1) != 0) {
                j = processingStatistics.fileCount;
            }
            if ((i & 2) != 0) {
                j2 = processingStatistics.recordCount;
            }
            return processingStatistics.copy(j, j2);
        }

        @NotNull
        public String toString() {
            return "ProcessingStatistics(fileCount=" + this.fileCount + ", recordCount=" + this.recordCount + ")";
        }

        public int hashCode() {
            return (Long.hashCode(this.fileCount) * 31) + Long.hashCode(this.recordCount);
        }

        public boolean equals(@Nullable Object obj) {
            if (this == obj) {
                return true;
            }
            if (!(obj instanceof ProcessingStatistics)) {
                return false;
            }
            ProcessingStatistics processingStatistics = (ProcessingStatistics) obj;
            return this.fileCount == processingStatistics.fileCount && this.recordCount == processingStatistics.recordCount;
        }
    }

    @NotNull
    public final LongAdder getProcessedFileCount() {
        return this.processedFileCount;
    }

    @NotNull
    public final LongAdder getProcessedRecordsCount() {
        return this.processedRecordsCount;
    }

    public final void process(@NotNull String str) throws IOException, InterruptedException {
        Intrinsics.checkNotNullParameter(str, "directoryName");
        Path path = Paths.get(str, new String[0]);
        logger.info("Scanning topics...");
        Intrinsics.checkNotNullExpressionValue(path, "absolutePath");
        List<Path> list = topicPaths(path);
        logger.info("{} topics found", Integer.valueOf(list.size()));
        list.parallelStream().forEach(new Consumer<Path>() { // from class: org.radarbase.output.worker.RadarKafkaRestructure$process$1
            @Override // java.util.function.Consumer
            public final void accept(Path path2) {
                Logger logger2;
                RadarKafkaRestructure.ProcessingStatistics mapTopic;
                try {
                    RadarKafkaRestructure radarKafkaRestructure = RadarKafkaRestructure.this;
                    Intrinsics.checkNotNullExpressionValue(path2, "p");
                    mapTopic = radarKafkaRestructure.mapTopic(path2);
                    long component1 = mapTopic.component1();
                    long component2 = mapTopic.component2();
                    RadarKafkaRestructure.this.getProcessedFileCount().add(component1);
                    RadarKafkaRestructure.this.getProcessedRecordsCount().add(component2);
                } catch (Exception e) {
                    logger2 = RadarKafkaRestructure.logger;
                    logger2.warn("Failed to map topic", e);
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final ProcessingStatistics mapTopic(final Path path) {
        ProcessingStatistics processingStatistics;
        if (this.isClosed.get()) {
            return new ProcessingStatistics(0L, 0L);
        }
        final String obj = path.getFileName().toString();
        try {
            processingStatistics = (ProcessingStatistics) this.lockManager.tryRunLocked(obj, new Function0<ProcessingStatistics>() { // from class: org.radarbase.output.worker.RadarKafkaRestructure$mapTopic$1
                @NotNull
                public final RadarKafkaRestructure.ProcessingStatistics invoke() {
                    FileStoreFactory fileStoreFactory;
                    RadarKafkaRestructure.ProcessingStatistics startWorker;
                    fileStoreFactory = RadarKafkaRestructure.this.fileStoreFactory;
                    Accountant accountant = new Accountant(fileStoreFactory, obj);
                    Throwable th = (Throwable) null;
                    try {
                        try {
                            Accountant accountant2 = accountant;
                            startWorker = RadarKafkaRestructure.this.startWorker(obj, path, accountant2, accountant2.getOffsets());
                            CloseableKt.closeFinally(accountant, th);
                            return startWorker;
                        } finally {
                        }
                    } catch (Throwable th2) {
                        CloseableKt.closeFinally(accountant, th);
                        throw th2;
                    }
                }

                /* JADX INFO: Access modifiers changed from: package-private */
                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super(0);
                }
            });
        } catch (IOException e) {
            logger.error("Failed to map files of topic {}", obj, e);
            processingStatistics = null;
        }
        ProcessingStatistics processingStatistics2 = processingStatistics;
        return processingStatistics2 != null ? processingStatistics2 : new ProcessingStatistics(0L, 0L);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final ProcessingStatistics startWorker(final String str, final Path path, Accountant accountant, final OffsetRangeSet offsetRangeSet) {
        RestructureWorker restructureWorker = new RestructureWorker(this.sourceStorage, accountant, this.fileStoreFactory, this.isClosed);
        Throwable th = (Throwable) null;
        try {
            RestructureWorker restructureWorker2 = restructureWorker;
            try {
                TopicFileList topicFileList = new TopicFileList(str, SequencesKt.toList(SequencesKt.take(SequencesKt.filter(this.sourceStorage.getWalker().walkRecords(str, path), new Function1<TopicFile, Boolean>() { // from class: org.radarbase.output.worker.RadarKafkaRestructure$startWorker$$inlined$use$lambda$1
                    /* JADX INFO: Access modifiers changed from: package-private */
                    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                    {
                        super(1);
                    }

                    public /* bridge */ /* synthetic */ Object invoke(Object obj) {
                        return Boolean.valueOf(invoke((TopicFile) obj));
                    }

                    public final boolean invoke(@NotNull TopicFile topicFile) {
                        Duration duration;
                        Intrinsics.checkNotNullParameter(topicFile, "f");
                        if (!offsetRangeSet.contains(topicFile.getRange())) {
                            Duration durationSince = TimeUtil.INSTANCE.durationSince(topicFile.getLastModified());
                            duration = RadarKafkaRestructure.this.minimumFileAge;
                            if (durationSince.compareTo(duration) >= 0) {
                                return true;
                            }
                        }
                        return false;
                    }
                }), this.maxFilesPerTopic)));
                if (topicFileList.getNumberOfFiles() > 0) {
                    restructureWorker2.processPaths(topicFileList);
                }
            } catch (Exception e) {
                logger.error("Failed to map files of topic {}", str, e);
            }
            ProcessingStatistics processingStatistics = new ProcessingStatistics(restructureWorker2.getProcessedFileCount(), restructureWorker2.getProcessedRecordsCount());
            CloseableKt.closeFinally(restructureWorker, th);
            return processingStatistics;
        } catch (Throwable th2) {
            CloseableKt.closeFinally(restructureWorker, th);
            throw th2;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.isClosed.set(true);
    }

    private final List<Path> topicPaths(Path path) {
        List<Path> mutableList = SequencesKt.toMutableList(this.sourceStorage.getWalker().walkTopics(path, this.excludeTopics));
        Collections.shuffle(mutableList);
        return mutableList;
    }

    public RadarKafkaRestructure(@NotNull FileStoreFactory fileStoreFactory) {
        Intrinsics.checkNotNullParameter(fileStoreFactory, "fileStoreFactory");
        this.fileStoreFactory = fileStoreFactory;
        this.sourceStorage = this.fileStoreFactory.getSourceStorage();
        this.lockManager = this.fileStoreFactory.getRemoteLockManager();
        this.isClosed = new AtomicBoolean(false);
        RestructureConfig config = this.fileStoreFactory.getConfig();
        Map<String, TopicConfig> topics = config.getTopics();
        HashSet hashSet = new HashSet();
        for (Map.Entry<String, TopicConfig> entry : topics.entrySet()) {
            String key = entry.getValue().getExclude() ? entry.getKey() : null;
            if (key != null) {
                hashSet.add(key);
            }
        }
        this.excludeTopics = hashSet;
        WorkerConfig worker = config.getWorker();
        Integer maxFilesPerTopic = worker.getMaxFilesPerTopic();
        this.maxFilesPerTopic = maxFilesPerTopic != null ? maxFilesPerTopic.intValue() : Integer.MAX_VALUE;
        Duration ofSeconds = Duration.ofSeconds(RangesKt.coerceAtLeast(worker.getMinimumFileAge(), 0L));
        Intrinsics.checkNotNullExpressionValue(ofSeconds, "Duration.ofSeconds(worke…ileAge.coerceAtLeast(0L))");
        this.minimumFileAge = ofSeconds;
        this.processedFileCount = new LongAdder();
        this.processedRecordsCount = new LongAdder();
    }
}
