package org.apache.samza.table.remote.couchbase;

import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.util.ReferenceCountUtil;
import com.couchbase.client.java.document.BinaryDocument;
import com.couchbase.client.java.document.Document;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonObject;
import com.google.common.base.Preconditions;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.context.Context;
import org.apache.samza.table.AsyncReadWriteUpdateTable;
import org.apache.samza.table.remote.TableReadFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.SingleSubscriber;

/* loaded from: input_file:org/apache/samza/table/remote/couchbase/CouchbaseTableReadFunction.class */
public class CouchbaseTableReadFunction<V> extends BaseCouchbaseTableFunction<V> implements TableReadFunction<String, V> {
    private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseTableReadFunction.class);
    protected final Class<? extends Document<?>> documentType;

    public CouchbaseTableReadFunction(String str, Class<V> cls, String... strArr) {
        super(str, cls, strArr);
        this.documentType = JsonObject.class.isAssignableFrom(cls) ? JsonDocument.class : BinaryDocument.class;
    }

    @Override // org.apache.samza.table.remote.couchbase.BaseCouchbaseTableFunction
    public void init(Context context, AsyncReadWriteUpdateTable asyncReadWriteUpdateTable) {
        super.init(context, asyncReadWriteUpdateTable);
        LOGGER.info("Read function for bucket {} initialized successfully", this.bucketName);
    }

    public CompletableFuture<V> getAsync(final String str) {
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "key must not be null, empty or blank");
        final CompletableFuture<V> completableFuture = new CompletableFuture<>();
        this.bucket.async().get(str, this.documentType, this.timeout.toMillis(), TimeUnit.MILLISECONDS).toSingle().subscribe(new SingleSubscriber<Document<?>>() { // from class: org.apache.samza.table.remote.couchbase.CouchbaseTableReadFunction.1
            public void onSuccess(Document<?> document) {
                if (document == null) {
                    completableFuture.completeExceptionally(new SamzaException(String.format("Got unexpected null value from key %s", str)));
                } else if (document instanceof BinaryDocument) {
                    CouchbaseTableReadFunction.this.handleGetAsyncBinaryDocument((BinaryDocument) document, completableFuture, str);
                } else {
                    completableFuture.complete(document.content());
                }
            }

            public void onError(Throwable th) {
                if (th instanceof NoSuchElementException) {
                    completableFuture.complete(null);
                } else {
                    completableFuture.completeExceptionally(new SamzaException(String.format("Failed to get key %s", str), th));
                }
            }
        });
        return completableFuture;
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected void handleGetAsyncBinaryDocument(BinaryDocument binaryDocument, CompletableFuture<V> completableFuture, String str) {
        byte[] bArr;
        ByteBuf byteBuf = (ByteBuf) binaryDocument.content();
        try {
            try {
                if (byteBuf.hasArray() && byteBuf.arrayOffset() == 0 && byteBuf.readableBytes() == byteBuf.array().length) {
                    bArr = byteBuf.array();
                } else {
                    bArr = new byte[byteBuf.readableBytes()];
                    byteBuf.readBytes(bArr);
                }
                completableFuture.complete(this.valueSerde.fromBytes(bArr));
                ReferenceCountUtil.release(byteBuf);
            } catch (Exception e) {
                completableFuture.completeExceptionally(new SamzaException(String.format("Failed to deserialize value of key %s with given serde", str), e));
                ReferenceCountUtil.release(byteBuf);
            }
        } catch (Throwable th) {
            ReferenceCountUtil.release(byteBuf);
            throw th;
        }
    }
}
