package co.cask.cdap.data.stream;

import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.common.async.ExecutorUtils;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.conf.PropertyChangeListener;
import co.cask.cdap.common.conf.PropertyStore;
import co.cask.cdap.common.conf.PropertyUpdater;
import co.cask.cdap.common.io.Codec;
import co.cask.cdap.common.io.Locations;
import co.cask.cdap.data2.dataset2.lib.table.leveldb.KeyValue;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.data2.transaction.stream.StreamConfig;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.IOException;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data/stream/AbstractStreamCoordinatorClient.class */
public abstract class AbstractStreamCoordinatorClient extends AbstractIdleService implements StreamCoordinatorClient {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractStreamCoordinatorClient.class);
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Schema.class, new SchemaTypeAdapter()).create();
    private final CConfiguration cConf;
    private final StreamAdmin streamAdmin;
    private final Supplier<PropertyStore<StreamProperty>> propertyStore = Suppliers.memoize(new Supplier<PropertyStore<StreamProperty>>() { // from class: co.cask.cdap.data.stream.AbstractStreamCoordinatorClient.1
        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public PropertyStore<StreamProperty> m12get() {
            return AbstractStreamCoordinatorClient.this.createPropertyStore(new StreamPropertyCodec());
        }
    });
    private final Executor updateExecutor = ExecutorUtils.newThreadExecutor(Threads.createDaemonThreadFactory("stream-coordinator-update-%d"));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data/stream/AbstractStreamCoordinatorClient$StreamProperty.class */
    public static final class StreamProperty {
        private final int generation;
        private final long ttl;
        private final int threshold;

        private StreamProperty(int i, long j, int i2) {
            this.generation = i;
            this.ttl = j;
            this.threshold = i2;
        }

        public int getGeneration() {
            return this.generation;
        }

        public long getTTL() {
            return this.ttl;
        }

        public int getThreshold() {
            return this.threshold;
        }

        public String toString() {
            return Objects.toStringHelper(this).add("generation", this.generation).add("ttl", this.ttl).add("threshold", this.threshold).toString();
        }
    }

    /* loaded from: input_file:co/cask/cdap/data/stream/AbstractStreamCoordinatorClient$StreamPropertyChangeListener.class */
    private final class StreamPropertyChangeListener extends StreamPropertyListener implements PropertyChangeListener<StreamProperty> {
        private final StreamPropertyListener listener;
        private StreamProperty currentProperty;

        private StreamPropertyChangeListener(StreamAdmin streamAdmin, String str, StreamPropertyListener streamPropertyListener) {
            this.listener = streamPropertyListener;
            try {
                StreamConfig config = streamAdmin.getConfig(str);
                this.currentProperty = new StreamProperty(StreamUtils.getGeneration(config), config.getTTL(), config.getNotificationThresholdMB().intValue());
            } catch (Exception e) {
                this.currentProperty = new StreamProperty(0, KeyValue.LATEST_TIMESTAMP, AbstractStreamCoordinatorClient.this.cConf.getInt("stream.notification.threshold"));
            }
        }

        public void onChange(String str, StreamProperty streamProperty) {
            try {
                if (streamProperty != null) {
                    if (this.currentProperty == null || this.currentProperty.getGeneration() < streamProperty.getGeneration()) {
                        generationChanged(str, streamProperty.getGeneration());
                    }
                    if (this.currentProperty == null || this.currentProperty.getTTL() != streamProperty.getTTL()) {
                        ttlChanged(str, streamProperty.getTTL());
                    }
                    if (this.currentProperty == null || this.currentProperty.getThreshold() != streamProperty.getThreshold()) {
                        thresholdChanged(str, streamProperty.getThreshold());
                    }
                } else {
                    generationDeleted(str);
                    ttlDeleted(str);
                }
            } finally {
                this.currentProperty = streamProperty;
            }
        }

        public void onError(String str, Throwable th) {
            AbstractStreamCoordinatorClient.LOG.error("Exception on PropertyChangeListener for stream {}", str, th);
        }

        @Override // co.cask.cdap.data.stream.StreamPropertyListener
        public void generationChanged(String str, int i) {
            try {
                this.listener.generationChanged(str, i);
            } catch (Throwable th) {
                AbstractStreamCoordinatorClient.LOG.error("Exception while calling StreamPropertyListener.generationChanged", th);
            }
        }

        @Override // co.cask.cdap.data.stream.StreamPropertyListener
        public void generationDeleted(String str) {
            try {
                this.listener.generationDeleted(str);
            } catch (Throwable th) {
                AbstractStreamCoordinatorClient.LOG.error("Exception while calling StreamPropertyListener.generationDeleted", th);
            }
        }

        @Override // co.cask.cdap.data.stream.StreamPropertyListener
        public void ttlChanged(String str, long j) {
            try {
                this.listener.ttlChanged(str, j);
            } catch (Throwable th) {
                AbstractStreamCoordinatorClient.LOG.error("Exception while calling StreamPropertyListener.ttlChanged", th);
            }
        }

        @Override // co.cask.cdap.data.stream.StreamPropertyListener
        public void ttlDeleted(String str) {
            try {
                this.listener.ttlDeleted(str);
            } catch (Throwable th) {
                AbstractStreamCoordinatorClient.LOG.error("Exception while calling StreamPropertyListener.ttlDeleted", th);
            }
        }

        @Override // co.cask.cdap.data.stream.StreamPropertyListener
        public void thresholdChanged(String str, int i) {
            try {
                this.listener.thresholdChanged(str, i);
            } catch (Throwable th) {
                AbstractStreamCoordinatorClient.LOG.error("Exception while calling StreamPropertyListener.thresholdChanged", th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data/stream/AbstractStreamCoordinatorClient$StreamPropertyCodec.class */
    public static final class StreamPropertyCodec implements Codec<StreamProperty> {
        private static final Gson GSON = new Gson();

        private StreamPropertyCodec() {
        }

        public byte[] encode(StreamProperty streamProperty) throws IOException {
            return GSON.toJson(streamProperty).getBytes(Charsets.UTF_8);
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public StreamProperty m14decode(byte[] bArr) throws IOException {
            return (StreamProperty) GSON.fromJson(new String(bArr, Charsets.UTF_8), StreamProperty.class);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractStreamCoordinatorClient(CConfiguration cConfiguration, StreamAdmin streamAdmin) {
        this.cConf = cConfiguration;
        this.streamAdmin = streamAdmin;
    }

    protected abstract <T> PropertyStore<T> createPropertyStore(Codec<T> codec);

    @Override // co.cask.cdap.data.stream.StreamCoordinatorClient
    public ListenableFuture<Integer> nextGeneration(final StreamConfig streamConfig, final int i) {
        return Futures.transform(((PropertyStore) this.propertyStore.get()).update(streamConfig.getName(), new PropertyUpdater<StreamProperty>() { // from class: co.cask.cdap.data.stream.AbstractStreamCoordinatorClient.2
            public ListenableFuture<StreamProperty> apply(@Nullable final StreamProperty streamProperty) {
                final SettableFuture create = SettableFuture.create();
                AbstractStreamCoordinatorClient.this.updateExecutor.execute(new Runnable() { // from class: co.cask.cdap.data.stream.AbstractStreamCoordinatorClient.2.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            long ttl = streamProperty == null ? streamConfig.getTTL() : streamProperty.getTTL();
                            int generation = (streamProperty == null ? i : streamProperty.getGeneration()) + 1;
                            int intValue = streamProperty == null ? streamConfig.getNotificationThresholdMB().intValue() : streamProperty.getThreshold();
                            Locations.mkdirsIfNotExists(StreamUtils.createGenerationLocation(streamConfig.getLocation(), generation));
                            create.set(new StreamProperty(generation, ttl, intValue));
                        } catch (IOException e) {
                            create.setException(e);
                        }
                    }
                });
                return create;
            }
        }), new Function<StreamProperty, Integer>() { // from class: co.cask.cdap.data.stream.AbstractStreamCoordinatorClient.3
            public Integer apply(StreamProperty streamProperty) {
                return Integer.valueOf(streamProperty.getGeneration());
            }
        });
    }

    @Override // co.cask.cdap.data.stream.StreamCoordinatorClient
    public ListenableFuture<Long> changeTTL(final String str, final long j) {
        return Futures.transform(((PropertyStore) this.propertyStore.get()).update(str, new PropertyUpdater<StreamProperty>() { // from class: co.cask.cdap.data.stream.AbstractStreamCoordinatorClient.4
            public ListenableFuture<StreamProperty> apply(@Nullable final StreamProperty streamProperty) {
                final SettableFuture create = SettableFuture.create();
                AbstractStreamCoordinatorClient.this.updateExecutor.execute(new Runnable() { // from class: co.cask.cdap.data.stream.AbstractStreamCoordinatorClient.4.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            StreamConfig config = AbstractStreamCoordinatorClient.this.streamAdmin.getConfig(str);
                            create.set(new StreamProperty(streamProperty == null ? StreamUtils.getGeneration(config) : streamProperty.getGeneration(), j, streamProperty == null ? config.getNotificationThresholdMB().intValue() : streamProperty.getThreshold()));
                        } catch (IOException e) {
                            create.setException(e);
                        }
                    }
                });
                return create;
            }
        }), new Function<StreamProperty, Long>() { // from class: co.cask.cdap.data.stream.AbstractStreamCoordinatorClient.5
            public Long apply(StreamProperty streamProperty) {
                return Long.valueOf(streamProperty.getTTL());
            }
        });
    }

    @Override // co.cask.cdap.data.stream.StreamCoordinatorClient
    public ListenableFuture<Integer> changeThreshold(final String str, final int i) {
        return Futures.transform(((PropertyStore) this.propertyStore.get()).update(str, new PropertyUpdater<StreamProperty>() { // from class: co.cask.cdap.data.stream.AbstractStreamCoordinatorClient.6
            public ListenableFuture<StreamProperty> apply(@Nullable final StreamProperty streamProperty) {
                final SettableFuture create = SettableFuture.create();
                AbstractStreamCoordinatorClient.this.updateExecutor.execute(new Runnable() { // from class: co.cask.cdap.data.stream.AbstractStreamCoordinatorClient.6.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            StreamConfig config = AbstractStreamCoordinatorClient.this.streamAdmin.getConfig(str);
                            create.set(new StreamProperty(streamProperty == null ? StreamUtils.getGeneration(config) : streamProperty.getGeneration(), streamProperty == null ? config.getTTL() : streamProperty.getTTL(), i));
                        } catch (IOException e) {
                            create.setException(e);
                        }
                    }
                });
                return create;
            }
        }), new Function<StreamProperty, Integer>() { // from class: co.cask.cdap.data.stream.AbstractStreamCoordinatorClient.7
            public Integer apply(StreamProperty streamProperty) {
                return Integer.valueOf(streamProperty.getThreshold());
            }
        });
    }

    @Override // co.cask.cdap.data.stream.StreamCoordinatorClient
    public Cancellable addListener(String str, StreamPropertyListener streamPropertyListener) {
        return ((PropertyStore) this.propertyStore.get()).addChangeListener(str, new StreamPropertyChangeListener(this.streamAdmin, str, streamPropertyListener));
    }

    protected final void shutDown() throws Exception {
        ((PropertyStore) this.propertyStore.get()).close();
        doShutDown();
    }

    protected abstract void doShutDown() throws Exception;
}
