package com.faunadb.client.streaming;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.faunadb.client.HttpResponses;
import com.faunadb.client.errors.StreamingException;
import com.faunadb.client.types.Field;
import com.faunadb.client.types.Value;
import com.faunadb.common.Connection;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/faunadb/client/streaming/BodyValueFlowProcessor.class */
public class BodyValueFlowProcessor extends SubmissionPublisher<Value> implements Flow.Processor<List<ByteBuffer>, Value> {
    private static Value ErrorValue = new Value.StringV("error");
    private static Field<Long> TxnField = Field.at("txn").to(Long.class);
    private ObjectMapper json;
    private Connection connection;
    private final Logger log = LoggerFactory.getLogger(getClass());
    private Flow.Subscription subscription = null;
    private Flow.Subscriber<? super Value> subscriber = null;

    public BodyValueFlowProcessor(ObjectMapper objectMapper, Connection connection) {
        this.json = objectMapper;
        this.connection = connection;
    }

    private void requestOne() {
        this.subscription.request(1L);
    }

    @Override // java.util.concurrent.SubmissionPublisher, java.util.concurrent.Flow.Publisher
    public void subscribe(Flow.Subscriber<? super Value> subscriber) {
        if (this.subscriber != null) {
            throw new IllegalStateException("BodyValueFlowProcessor can have only one subscriber");
        }
        this.subscriber = subscriber;
        super.subscribe(subscriber);
        requestOne();
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(List<ByteBuffer> list) {
        String str = (String) list.stream().map(byteBuffer -> {
            return StandardCharsets.UTF_8.decode(byteBuffer).toString();
        }).collect(Collectors.joining());
        try {
            JsonNode readTree = this.json.readTree(str);
            Value value = (Value) this.json.treeToValue(readTree, Value.class);
            value.getOptional(TxnField).ifPresent(l -> {
                this.connection.syncLastTxnTime(l.longValue());
            });
            if (((Boolean) value.at("type").getOptional().map(value2 -> {
                return Boolean.valueOf(value2.equals(ErrorValue));
            }).orElse(false)).booleanValue()) {
                HttpResponses.QueryError queryError = (HttpResponses.QueryError) this.json.treeToValue(readTree.get("event"), HttpResponses.QueryError.class);
                this.subscriber.onError(new StreamingException(queryError.code() + ": " + queryError.description()));
                this.subscription.cancel();
            } else {
                submit(value);
            }
        } catch (Exception e) {
            this.log.error("could not parse event " + str, e);
            this.subscriber.onError(e);
            this.subscription.cancel();
        }
        requestOne();
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        this.log.error("unrecoverable error encountered by subscription", th);
        this.subscriber.onError(th);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        this.log.debug("subscription completed");
        this.subscriber.onComplete();
    }
}
