package ai.vespa.reindexing;

import ai.vespa.reindexing.Reindexer;
import ai.vespa.reindexing.Reindexing;
import ai.vespa.reindexing.ReindexingCurator;
import com.yahoo.cloud.config.ClusterListConfig;
import com.yahoo.cloud.config.ZookeepersConfig;
import com.yahoo.component.AbstractComponent;
import com.yahoo.component.annotation.Inject;
import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.documentapi.DocumentAccess;
import com.yahoo.jdisc.Metric;
import com.yahoo.net.HostName;
import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
import com.yahoo.vespa.config.content.reindexing.ReindexingConfig;
import com.yahoo.vespa.curator.Curator;
import com.yahoo.vespa.zookeeper.VespaZooKeeperServer;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/* loaded from: input_file:ai/vespa/reindexing/ReindexingMaintainer.class */
public class ReindexingMaintainer extends AbstractComponent {
    private static final Logger log = Logger.getLogger(Reindexing.class.getName());
    private final Curator curator;
    private final List<Reindexer> reindexers;
    private final ScheduledExecutorService executor;

    @Inject
    public ReindexingMaintainer(VespaZooKeeperServer vespaZooKeeperServer, Metric metric, DocumentAccess documentAccess, ZookeepersConfig zookeepersConfig, ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig, ReindexingConfig reindexingConfig) {
        this(Clock.systemUTC(), metric, documentAccess, zookeepersConfig, clusterListConfig, allClustersBucketSpacesConfig, reindexingConfig);
    }

    ReindexingMaintainer(Clock clock, Metric metric, DocumentAccess documentAccess, ZookeepersConfig zookeepersConfig, ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig, ReindexingConfig reindexingConfig) {
        this.curator = Curator.create(zookeepersConfig.zookeeperserverlist());
        ReindexingCurator reindexingCurator = new ReindexingCurator(this.curator, documentAccess.getDocumentTypeManager());
        this.reindexers = (List) reindexingConfig.clusters().entrySet().stream().map(entry -> {
            return new Reindexer(parseCluster((String) entry.getKey(), clusterListConfig, allClustersBucketSpacesConfig, documentAccess.getDocumentTypeManager()), parseReady((ReindexingConfig.Clusters) entry.getValue(), documentAccess.getDocumentTypeManager()), reindexingCurator, documentAccess, metric, clock);
        }).collect(Collectors.toUnmodifiableList());
        this.executor = new ScheduledThreadPoolExecutor(reindexingConfig.clusters().size(), (ThreadFactory) new DaemonThreadFactory("reindexer-"));
        if (reindexingConfig.enabled()) {
            scheduleStaggered((l, l2) -> {
                this.executor.scheduleAtFixedRate(this::maintain, l.longValue(), l2.longValue(), TimeUnit.MILLISECONDS);
            }, Duration.ofMinutes(1L), clock.instant(), HostName.getLocalhost(), zookeepersConfig.zookeeperserverlist());
        }
    }

    private void maintain() {
        for (Reindexer reindexer : this.reindexers) {
            this.executor.submit(() -> {
                try {
                    reindexer.reindex();
                } catch (ReindexingCurator.ReindexingLockException e) {
                    log.log(Level.FINE, "Failed to acquire reindexing lock");
                } catch (Exception e2) {
                    log.log(Level.WARNING, "Exception when reindexing", (Throwable) e2);
                }
            });
        }
    }

    public void deconstruct() {
        try {
            Iterator<Reindexer> it = this.reindexers.iterator();
            while (it.hasNext()) {
                it.next().shutdown();
            }
            this.executor.shutdown();
            this.executor.awaitTermination(5L, TimeUnit.SECONDS);
            this.curator.close();
            if (!this.executor.isShutdown() && !this.executor.awaitTermination(5L, TimeUnit.SECONDS)) {
                log.log(Level.WARNING, "Failed to shut down reindexing within timeout");
            }
        } catch (InterruptedException e) {
            log.log(Level.WARNING, "Interrupted while waiting for reindexing to shut down");
            Thread.currentThread().interrupt();
        }
        if (this.executor.isShutdown()) {
            return;
        }
        log.log(Level.WARNING, "Number of tasks remaining at hard shutdown: " + this.executor.shutdownNow().size());
    }

    static List<Reindexing.Trigger> parseReady(ReindexingConfig.Clusters clusters, DocumentTypeManager documentTypeManager) {
        return (List) clusters.documentTypes().entrySet().stream().map(entry -> {
            return new Reindexing.Trigger(documentTypeManager.getDocumentType((String) entry.getKey()), Instant.ofEpochMilli(((ReindexingConfig.Clusters.DocumentTypes) entry.getValue()).readyAtMillis()), ((ReindexingConfig.Clusters.DocumentTypes) entry.getValue()).speed());
        }).collect(Collectors.toUnmodifiableList());
    }

    static void scheduleStaggered(BiConsumer<Long, Long> biConsumer, Duration duration, Instant instant, String str, String str2) {
        long j = 0;
        long millis = duration.toMillis();
        if (((List) Stream.of((Object[]) str2.split(",")).map(str3 -> {
            return str3.split(":")[0];
        }).collect(Collectors.toList())).contains(str)) {
            long indexOf = r0.indexOf(str) * millis;
            millis *= r0.size();
            j = Math.floorMod(indexOf - instant.toEpochMilli(), millis);
        }
        biConsumer.accept(Long.valueOf(j), Long.valueOf(millis));
    }

    static Reindexer.Cluster parseCluster(String str, ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig, DocumentTypeManager documentTypeManager) {
        return (Reindexer.Cluster) clusterListConfig.storage().stream().filter(storage -> {
            return storage.name().equals(str);
        }).map(storage2 -> {
            return new Reindexer.Cluster(str, (Map) allClustersBucketSpacesConfig.cluster(str).documentType().entrySet().stream().collect(Collectors.toMap(entry -> {
                return documentTypeManager.getDocumentType((String) entry.getKey());
            }, entry2 -> {
                return ((AllClustersBucketSpacesConfig.Cluster.DocumentType) entry2.getValue()).bucketSpace();
            })));
        }).findAny().orElseThrow(() -> {
            return new IllegalStateException("This cluster (" + str + ") not among the list of clusters");
        });
    }
}
