package co.cask.cdap.common.zookeeper.store;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.common.conf.AbstractPropertyStore;
import co.cask.cdap.common.conf.PropertyUpdater;
import co.cask.cdap.common.io.Codec;
import co.cask.cdap.common.zookeeper.ZKExtOperations;
import com.google.common.base.Suppliers;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.Set;
import org.apache.twill.common.Threads;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClients;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:lib/cdap-common-4.0.0.jar:co/cask/cdap/common/zookeeper/store/ZKPropertyStore.class */
public final class ZKPropertyStore<T> extends AbstractPropertyStore<T> {
    private static final Logger LOG = LoggerFactory.getLogger(ZKPropertyStore.class);
    private static final int MAX_ZK_FAILURE_RETRIES = 10;
    private final ZKClient zkClient;
    private final Codec<T> codec;
    private final Set<String> watchedSet = Sets.newHashSet();

    public static <T> ZKPropertyStore<T> create(ZKClient zKClient, Codec<T> codec) {
        return new ZKPropertyStore<>(zKClient, codec);
    }

    public static <T> ZKPropertyStore<T> create(ZKClient zKClient, String str, Codec<T> codec) {
        return new ZKPropertyStore<>(ZKClients.namespace(zKClient, str), codec);
    }

    private ZKPropertyStore(ZKClient zKClient, Codec<T> codec) {
        this.zkClient = zKClient;
        this.codec = codec;
    }

    @Override // co.cask.cdap.common.conf.PropertyStore
    public ListenableFuture<T> update(String str, PropertyUpdater<T> propertyUpdater) {
        return ZKExtOperations.updateOrCreate(this.zkClient, getPath(str), propertyUpdater, this.codec);
    }

    @Override // co.cask.cdap.common.conf.PropertyStore
    public ListenableFuture<T> set(String str, T t) {
        return ZKExtOperations.setOrCreate(this.zkClient, getPath(str), Suppliers.ofInstance(t), this.codec, 10);
    }

    @Override // co.cask.cdap.common.conf.AbstractPropertyStore
    protected synchronized boolean listenerAdded(String str) {
        if (!this.watchedSet.add(str)) {
            return true;
        }
        existsAndWatch(str);
        return false;
    }

    private String getPath(String str) {
        return "/" + str;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void getDataAndWatch(final String str) {
        Futures.addCallback(this.zkClient.getData(getPath(str), new Watcher() { // from class: co.cask.cdap.common.zookeeper.store.ZKPropertyStore.1
            @Override // org.apache.zookeeper.Watcher
            public void process(WatchedEvent watchedEvent) {
                if (ZKPropertyStore.this.isClosed()) {
                    return;
                }
                if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted) {
                    ZKPropertyStore.this.existsAndWatch(str);
                } else {
                    ZKPropertyStore.this.getDataAndWatch(str);
                }
            }
        }), new FutureCallback<NodeData>() { // from class: co.cask.cdap.common.zookeeper.store.ZKPropertyStore.2
            @Override // com.google.common.util.concurrent.FutureCallback
            public void onSuccess(NodeData nodeData) {
                byte[] data = nodeData.getData();
                if (data == null) {
                    ZKPropertyStore.this.updateAndNotify(str, null);
                    return;
                }
                try {
                    ZKPropertyStore.this.updateAndNotify(str, ZKPropertyStore.this.codec.decode(data));
                } catch (IOException e) {
                    ZKPropertyStore.LOG.error("Failed to decode property data for {}: {}", new Object[]{str, Bytes.toStringBinary(data), e});
                    ZKPropertyStore.this.notifyError(str, e);
                }
            }

            @Override // com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                if (th instanceof KeeperException.NoNodeException) {
                    ZKPropertyStore.this.existsAndWatch(str);
                } else {
                    ZKPropertyStore.LOG.error("Failed to get property data for {}", str, th);
                    ZKPropertyStore.this.notifyError(str, th);
                }
            }
        }, Threads.SAME_THREAD_EXECUTOR);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void existsAndWatch(final String str) {
        Futures.addCallback(this.zkClient.exists(getPath(str), new Watcher() { // from class: co.cask.cdap.common.zookeeper.store.ZKPropertyStore.3
            @Override // org.apache.zookeeper.Watcher
            public void process(WatchedEvent watchedEvent) {
                if (!ZKPropertyStore.this.isClosed() && watchedEvent.getType() == Watcher.Event.EventType.NodeCreated) {
                    ZKPropertyStore.this.getDataAndWatch(str);
                }
            }
        }), new FutureCallback<Stat>() { // from class: co.cask.cdap.common.zookeeper.store.ZKPropertyStore.4
            @Override // com.google.common.util.concurrent.FutureCallback
            public void onSuccess(Stat stat) {
                if (stat != null) {
                    ZKPropertyStore.this.getDataAndWatch(str);
                }
            }

            @Override // com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                ZKPropertyStore.LOG.error("Failed to check exists for property data for {}", str, th);
                ZKPropertyStore.this.notifyError(str, th);
            }
        }, Threads.SAME_THREAD_EXECUTOR);
    }
}
