package org.apache.samza.table.remote.couchbase;

import com.couchbase.client.deps.io.netty.buffer.Unpooled;
import com.couchbase.client.java.document.BinaryDocument;
import com.couchbase.client.java.document.Document;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonObject;
import com.google.common.base.Preconditions;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.context.Context;
import org.apache.samza.table.AsyncReadWriteUpdateTable;
import org.apache.samza.table.remote.TableWriteFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.SingleSubscriber;

/* loaded from: input_file:org/apache/samza/table/remote/couchbase/CouchbaseTableWriteFunction.class */
public class CouchbaseTableWriteFunction<V> extends BaseCouchbaseTableFunction<V> implements TableWriteFunction<String, V, Object> {
    private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseTableWriteFunction.class);

    public CouchbaseTableWriteFunction(String str, Class<V> cls, String... strArr) {
        super(str, cls, strArr);
    }

    @Override // org.apache.samza.table.remote.couchbase.BaseCouchbaseTableFunction
    public void init(Context context, AsyncReadWriteUpdateTable asyncReadWriteUpdateTable) {
        super.init(context, asyncReadWriteUpdateTable);
        LOGGER.info("Write function for bucket {} initialized successfully", this.bucketName);
    }

    public CompletableFuture<Void> putAsync(String str, V v) {
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "key must not be null, empty or blank");
        Preconditions.checkArgument(!str.contains(" "), String.format("key should not contain spaces: %s", str));
        Preconditions.checkNotNull(v);
        return asyncWriteHelper(this.bucket.async().upsert(v instanceof JsonObject ? JsonDocument.create(str, (int) this.ttl.getSeconds(), (JsonObject) v) : BinaryDocument.create(str, (int) this.ttl.getSeconds(), Unpooled.copiedBuffer(this.valueSerde.toBytes(v))), this.timeout.toMillis(), TimeUnit.MILLISECONDS), String.format("Failed to insert key %s into bucket %s", str, this.bucketName));
    }

    public CompletableFuture<Void> updateAsync(String str, Object obj) {
        throw new SamzaException("Update is unsupported");
    }

    public CompletableFuture<Void> deleteAsync(String str) {
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "key must not be null, empty or blank");
        return asyncWriteHelper(this.bucket.async().remove(str, this.timeout.toMillis(), TimeUnit.MILLISECONDS), String.format("Failed to delete key %s from bucket %s.", str, this.bucketName));
    }

    protected CompletableFuture<Void> asyncWriteHelper(Observable<? extends Document> observable, String str) {
        return asyncWriteHelper(observable, str, true);
    }

    protected <T> CompletableFuture<T> asyncWriteHelper(Observable<? extends Document> observable, final String str, final boolean z) {
        final CompletableFuture<T> completableFuture = new CompletableFuture<>();
        observable.toSingle().subscribe(new SingleSubscriber<Document>() { // from class: org.apache.samza.table.remote.couchbase.CouchbaseTableWriteFunction.1
            public void onSuccess(Document document) {
                if (z) {
                    completableFuture.complete(null);
                } else {
                    completableFuture.complete(document.content());
                }
            }

            public void onError(Throwable th) {
                completableFuture.completeExceptionally(new SamzaException(str, th));
            }
        });
        return completableFuture;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public /* bridge */ /* synthetic */ CompletableFuture putAsync(Object obj, Object obj2) {
        return putAsync((String) obj, (String) obj2);
    }
}
