package com.spotify.helios.servicescommon.coordination;

import com.fasterxml.jackson.databind.JavaType;
import com.google.common.base.Charsets;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Service;
import com.spotify.helios.agent.BoundedRandomExponentialBackoff;
import com.spotify.helios.agent.RetryIntervalPolicy;
import com.spotify.helios.agent.RetryScheduler;
import com.spotify.helios.common.Json;
import com.spotify.helios.servicescommon.DefaultReactor;
import com.spotify.helios.servicescommon.PersistentAtomicReference;
import com.spotify.helios.servicescommon.Reactor;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spotify/helios/servicescommon/coordination/PersistentPathChildrenCache.class */
public class PersistentPathChildrenCache<T> extends AbstractIdleService {
    private static final Logger log = LoggerFactory.getLogger(PersistentPathChildrenCache.class);
    private static final long REFRESH_INTERVAL_MILLIS = 30000;
    private final PersistentAtomicReference<Map<String, T>> snapshot;
    private final CuratorFramework curator;
    private final String path;
    private final JavaType valueType;
    private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<>();
    private final CuratorWatcher childrenWatcher = new ChildrenWatcher();
    private final CuratorWatcher dataWatcher = new DataWatcher();
    private final Set<String> changes = Sets.newConcurrentHashSet();
    private final Reactor reactor;
    private volatile boolean synced;

    /* loaded from: input_file:com/spotify/helios/servicescommon/coordination/PersistentPathChildrenCache$ChildrenWatcher.class */
    private class ChildrenWatcher implements CuratorWatcher {
        private ChildrenWatcher() {
        }

        @Override // org.apache.curator.framework.api.CuratorWatcher
        public void process(WatchedEvent watchedEvent) throws Exception {
            PersistentPathChildrenCache.log.debug("children event: {}", watchedEvent);
            PersistentPathChildrenCache.this.synced = false;
            PersistentPathChildrenCache.this.reactor.signal();
        }
    }

    /* loaded from: input_file:com/spotify/helios/servicescommon/coordination/PersistentPathChildrenCache$ConnectionListener.class */
    private class ConnectionListener implements ConnectionStateListener {
        private ConnectionListener() {
        }

        @Override // org.apache.curator.framework.state.ConnectionStateListener
        public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
            PersistentPathChildrenCache.log.debug("connection state change: {}", connectionState);
            if (connectionState == ConnectionState.RECONNECTED) {
                PersistentPathChildrenCache.this.synced = false;
                PersistentPathChildrenCache.this.reactor.signal();
            }
            PersistentPathChildrenCache.this.fireConnectionStateChanged(connectionState);
        }
    }

    /* loaded from: input_file:com/spotify/helios/servicescommon/coordination/PersistentPathChildrenCache$DataWatcher.class */
    private class DataWatcher implements CuratorWatcher {
        private DataWatcher() {
        }

        @Override // org.apache.curator.framework.api.CuratorWatcher
        public void process(WatchedEvent watchedEvent) throws Exception {
            PersistentPathChildrenCache.log.debug("data event: {}", watchedEvent);
            if (watchedEvent.getType() == Watcher.Event.EventType.NodeDataChanged) {
                PersistentPathChildrenCache.this.changes.add(ZKPaths.getNodeFromPath(watchedEvent.getPath()));
                PersistentPathChildrenCache.this.reactor.signal();
            }
        }
    }

    /* loaded from: input_file:com/spotify/helios/servicescommon/coordination/PersistentPathChildrenCache$Listener.class */
    public interface Listener {
        void nodesChanged(PersistentPathChildrenCache<?> persistentPathChildrenCache);

        void connectionStateChanged(ConnectionState connectionState);
    }

    /* loaded from: input_file:com/spotify/helios/servicescommon/coordination/PersistentPathChildrenCache$Update.class */
    private class Update implements Reactor.Callback {
        final RetryIntervalPolicy retryIntervalPolicy;

        private Update() {
            this.retryIntervalPolicy = BoundedRandomExponentialBackoff.newBuilder().setMinInterval(1L, TimeUnit.SECONDS).setMaxInterval(30L, TimeUnit.SECONDS).build();
        }

        @Override // com.spotify.helios.servicescommon.Reactor.Callback
        public void run(boolean z) throws InterruptedException {
            RetryScheduler newScheduler = this.retryIntervalPolicy.newScheduler();
            while (PersistentPathChildrenCache.this.isAlive()) {
                try {
                    PersistentPathChildrenCache.this.update();
                    return;
                } catch (KeeperException e) {
                    PersistentPathChildrenCache.this.synced = false;
                    PersistentPathChildrenCache.log.warn("update failed: {}", e.getMessage());
                    Thread.sleep(newScheduler.nextMillis());
                }
            }
        }
    }

    public PersistentPathChildrenCache(CuratorFramework curatorFramework, String str, Path path, JavaType javaType) throws IOException, InterruptedException {
        this.curator = curatorFramework;
        this.path = str;
        this.valueType = javaType;
        this.snapshot = PersistentAtomicReference.create(path, Json.typeFactory().constructMapType(HashMap.class, Json.type(String.class), javaType), Suppliers.ofInstance(Collections.emptyMap()));
        this.reactor = new DefaultReactor("zk-ppcc:" + str, new Update(), REFRESH_INTERVAL_MILLIS);
        curatorFramework.getConnectionStateListenable().addListener(new ConnectionListener());
    }

    public void addListener(Listener listener) {
        this.listeners.add(listener);
    }

    public void removeListener(Listener listener) {
        this.listeners.remove(listener);
    }

    @Override // com.google.common.util.concurrent.AbstractIdleService
    protected void startUp() throws Exception {
        log.debug("starting cache");
        this.reactor.startAsync().awaitRunning();
        this.reactor.signal();
    }

    @Override // com.google.common.util.concurrent.AbstractIdleService
    protected void shutDown() throws Exception {
        this.reactor.stopAsync().awaitTerminated();
    }

    public Map<String, T> getNodes() {
        return this.snapshot.get();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fireConnectionStateChanged(ConnectionState connectionState) {
        Iterator<Listener> it = this.listeners.iterator();
        while (it.hasNext()) {
            try {
                it.next().connectionStateChanged(connectionState);
            } catch (Exception e) {
                log.error("Listener threw exception", (Throwable) e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isAlive() {
        return state().ordinal() < Service.State.STOPPING.ordinal();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void update() throws KeeperException, InterruptedException {
        Map<String, T> newHashMap;
        log.debug("updating: {}", this.path);
        Map<String, T> map = this.snapshot.get();
        if (this.synced) {
            newHashMap = Maps.newHashMap(map);
        } else {
            this.synced = true;
            newHashMap = sync();
        }
        Iterator<String> it = this.changes.iterator();
        while (it.hasNext()) {
            String next = it.next();
            it.remove();
            String makePath = ZKPaths.makePath(this.path, next);
            log.debug("fetching change: {}", makePath);
            try {
                newHashMap.put(makePath, Json.read(this.curator.getData().usingWatcher(this.dataWatcher).forPath(makePath), this.valueType));
            } catch (KeeperException e) {
                throw e;
            } catch (Exception e2) {
                throw Throwables.propagate(e2);
            }
        }
        if (map.equals(newHashMap)) {
            return;
        }
        this.snapshot.setUnchecked(newHashMap);
        fireNodesChanged();
    }

    private void fireNodesChanged() {
        Iterator<Listener> it = this.listeners.iterator();
        while (it.hasNext()) {
            try {
                it.next().nodesChanged(this);
            } catch (Exception e) {
                log.error("Listener threw exception", (Throwable) e);
            }
        }
    }

    private Map<String, T> sync() throws KeeperException {
        log.debug("syncing: {}", this.path);
        HashMap newHashMap = Maps.newHashMap();
        try {
            List<String> forPath = this.curator.getChildren().usingWatcher(this.childrenWatcher).forPath(this.path);
            log.debug("children: {}", forPath);
            Iterator<String> it = forPath.iterator();
            while (it.hasNext()) {
                String makePath = ZKPaths.makePath(this.path, it.next());
                byte[] forPath2 = this.curator.getData().usingWatcher(this.dataWatcher).forPath(makePath);
                String str = new String(forPath2, Charsets.UTF_8);
                log.debug("child: {}={}", makePath, str);
                try {
                    newHashMap.put(makePath, Json.read(forPath2, this.valueType));
                } catch (IOException e) {
                    log.warn("failed to parse node: {}: {}", makePath, str, e);
                }
            }
            return newHashMap;
        } catch (KeeperException e2) {
            throw e2;
        } catch (Exception e3) {
            throw Throwables.propagate(e3);
        }
    }
}
