package com.linkedin.d2.discovery.stores.zk;

import com.linkedin.common.callback.Callback;
import com.linkedin.common.callback.Callbacks;
import com.linkedin.common.callback.MultiCallback;
import com.linkedin.common.util.None;
import com.linkedin.d2.balancer.util.FileSystemDirectory;
import com.linkedin.d2.discovery.PropertySerializer;
import com.linkedin.d2.discovery.event.PropertyEventBus;
import com.linkedin.d2.discovery.event.PropertyEventBusImpl;
import com.linkedin.d2.discovery.event.PropertyEventBusRequestsThrottler;
import com.linkedin.d2.discovery.event.PropertyEventPublisher;
import com.linkedin.d2.discovery.event.PropertyEventSubscriber;
import com.linkedin.d2.discovery.stores.file.FileStore;
import com.linkedin.d2.discovery.stores.zk.builder.ZooKeeperStoreBuilder;
import com.linkedin.r2.transport.http.client.TimeoutCallback;
import java.util.Collections;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/d2/discovery/stores/zk/LastSeenZKStore.class */
public class LastSeenZKStore<T> implements PropertyEventPublisher<T> {
    private static final Logger LOG = LoggerFactory.getLogger(LastSeenZKStore.class);
    private final FileStore<T> _fsStore;
    private final ZooKeeperConnectionAwareStore<T, ? extends ZooKeeperStore<T>> _zkAwareStore;
    private final LastSeenZKStore<T>.ZkBusUpdater _zkBusUpdaterSubscriber = new ZkBusUpdater();
    private final ScheduledExecutorService _executorService;
    private final int _warmUpTimeoutSeconds;
    private final String _fsPath;
    private PropertyEventBus<T> _clientBus;
    private PropertyEventBus<T> _zkToFsBus;
    private final int _concurrentRequests;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/d2/discovery/stores/zk/LastSeenZKStore$ZkBusUpdater.class */
    public class ZkBusUpdater implements PropertyEventSubscriber<T> {
        ZkBusUpdater() {
        }

        void updateFsStore(String str, T t) {
            if (t != null) {
                LastSeenZKStore.this._fsStore.put(str, t);
            } else {
                LastSeenZKStore.this._fsStore.remove(str);
            }
        }

        @Override // com.linkedin.d2.discovery.event.PropertyEventSubscriber
        public void onInitialize(String str, T t) {
            updateFsStore(str, t);
            LastSeenZKStore.this._clientBus.publishInitialize(str, t);
        }

        @Override // com.linkedin.d2.discovery.event.PropertyEventSubscriber
        public void onAdd(String str, T t) {
            updateFsStore(str, t);
            LastSeenZKStore.this._clientBus.publishAdd(str, t);
        }

        @Override // com.linkedin.d2.discovery.event.PropertyEventSubscriber
        public void onRemove(String str) {
            LastSeenZKStore.this._fsStore.remove(str);
            LastSeenZKStore.this._clientBus.publishRemove(str);
        }
    }

    public LastSeenZKStore(String str, PropertySerializer<T> propertySerializer, ZooKeeperStoreBuilder<? extends ZooKeeperStore<T>> zooKeeperStoreBuilder, ZKPersistentConnection zKPersistentConnection, ScheduledExecutorService scheduledExecutorService, int i, int i2) {
        this._fsPath = str;
        this._executorService = scheduledExecutorService;
        this._warmUpTimeoutSeconds = i;
        this._concurrentRequests = i2;
        this._fsStore = new FileStore<>(str, propertySerializer);
        this._zkToFsBus = new PropertyEventBusImpl(scheduledExecutorService);
        this._zkAwareStore = new ZooKeeperConnectionAwareStore<>(zooKeeperStoreBuilder, zKPersistentConnection);
        this._zkToFsBus.setPublisher(this._zkAwareStore);
    }

    @Override // com.linkedin.d2.discovery.event.PropertyEventPublisher
    public void setBus(PropertyEventBus<T> propertyEventBus) {
        if (!(propertyEventBus instanceof PropertyEventBusImpl)) {
            LOG.warn("The bus used in LastSeenZKStore should be a PropertyEventBusImpl and not a " + propertyEventBus.getClass().getName());
        }
        this._clientBus = propertyEventBus;
    }

    @Override // com.linkedin.d2.discovery.event.PropertyEventPublisher
    public void startPublishing(String str) {
        this._executorService.submit(() -> {
            T t = this._fsStore.get(str);
            if (t != null) {
                this._clientBus.publishInitialize(str, t);
            } else {
                this._zkToFsBus.register(Collections.singleton(str), this._zkBusUpdaterSubscriber);
            }
        });
    }

    @Override // com.linkedin.d2.discovery.event.PropertyEventPublisher
    public void stopPublishing(String str) {
        this._zkToFsBus.unregister(Collections.singleton(str), this._zkBusUpdaterSubscriber);
        this._executorService.submit(() -> {
            this._fsStore.remove(str);
        });
    }

    @Override // com.linkedin.d2.discovery.event.PropertyEventPublisher
    public void start(final Callback<None> callback) {
        Callback<None> callback2 = new Callback<None>() { // from class: com.linkedin.d2.discovery.stores.zk.LastSeenZKStore.1
            public void onError(Throwable th) {
                callback.onError(th);
            }

            public void onSuccess(None none) {
                LastSeenZKStore.this.warmUp(callback);
            }
        };
        this._zkAwareStore.start(Callbacks.empty());
        this._fsStore.start(callback2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void warmUp(final Callback<None> callback) {
        new PropertyEventBusRequestsThrottler(this._zkToFsBus, this._zkBusUpdaterSubscriber, FileSystemDirectory.getFileListWithoutExtension(this._fsPath), this._concurrentRequests, true).sendRequests(new TimeoutCallback<>(this._executorService, this._warmUpTimeoutSeconds, TimeUnit.SECONDS, new Callback<None>() { // from class: com.linkedin.d2.discovery.stores.zk.LastSeenZKStore.2
            public void onError(Throwable th) {
                LastSeenZKStore.LOG.info("EventBus Throttler didn't send all requests in time, continuing startup. The WarmUp will continue in background");
                callback.onSuccess(None.none());
            }

            public void onSuccess(None none) {
                LastSeenZKStore.LOG.info("EventBus Throttler sent all requests");
                callback.onSuccess(None.none());
            }
        }, "This message will never be used, even in case of timeout, no exception should be passed up"));
    }

    @Override // com.linkedin.d2.discovery.event.PropertyEventPublisher
    public void shutdown(Callback<None> callback) {
        Callback<None> multiCallback = new MultiCallback<>(callback, 2);
        this._fsStore.shutdown(multiCallback);
        this._zkAwareStore.shutdown(multiCallback);
    }
}
