package me.escoffier.fluid.camel.sink;

import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.internal.operators.completable.CompletableFromObservable;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import me.escoffier.fluid.config.Config;
import me.escoffier.fluid.models.Message;
import me.escoffier.fluid.models.Sink;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.impl.DefaultCamelContext;

/* loaded from: input_file:me/escoffier/fluid/camel/sink/CamelSink.class */
public class CamelSink<T> implements Sink<T> {
    private final String endpoint;
    private final CamelContext camelContext = new DefaultCamelContext();
    private final ProducerTemplate producerTemplate;
    private final String name;

    public CamelSink(String str, Config config) {
        this.endpoint = (String) config.getString("endpoint").orElseThrow(() -> {
            return new IllegalArgumentException("The `endpoint` must be set");
        });
        try {
            this.camelContext.start();
            this.producerTemplate = this.camelContext.createProducerTemplate();
            this.name = str;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public String name() {
        return this.name;
    }

    public Completable dispatch(Message<T> message) {
        return new CompletableFromObservable(toObservable(this.producerTemplate.asyncSendBody(this.endpoint, message.payload())));
    }

    public CamelContext camelContext() {
        return this.camelContext;
    }

    private static <T> Observable<T> toObservable(CompletableFuture<T> completableFuture) {
        return Observable.create(observableEmitter -> {
            completableFuture.whenComplete((BiConsumer) (obj, th) -> {
                if (th != null) {
                    observableEmitter.onError(th);
                } else {
                    observableEmitter.onNext(obj);
                    observableEmitter.onComplete();
                }
            });
        });
    }
}
