package org.reactivecommons.async.rabbit.listeners;

import com.rabbitmq.client.Delivery;
import java.util.logging.Level;
import java.util.logging.Logger;
import lombok.Generated;
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
import org.reactivecommons.async.commons.utils.LoggerSubscriber;
import org.reactivecommons.async.rabbit.RabbitMessage;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.ResourcesSpecification;

/* loaded from: input_file:org/reactivecommons/async/rabbit/listeners/ApplicationReplyListener.class */
public class ApplicationReplyListener {

    @Generated
    private static final Logger log = Logger.getLogger(ApplicationReplyListener.class.getName());
    private final ReactiveReplyRouter router;
    private final Receiver receiver;
    private final TopologyCreator creator;
    private final String queueName;
    private final String exchangeName;
    private final boolean createTopology;
    private volatile Flux<Delivery> deliveryFlux;

    public ApplicationReplyListener(ReactiveReplyRouter reactiveReplyRouter, ReactiveMessageListener reactiveMessageListener, String str, String str2, boolean z) {
        this.router = reactiveReplyRouter;
        this.queueName = str;
        this.exchangeName = str2;
        this.receiver = reactiveMessageListener.getReceiver();
        this.creator = reactiveMessageListener.getTopologyCreator();
        this.createTopology = z;
    }

    public void startListening(String str) {
        Mono empty = Mono.empty();
        if (this.createTopology) {
            empty = this.creator.declare(ResourcesSpecification.exchange(this.exchangeName).type("topic").durable(true)).then();
        }
        this.deliveryFlux = empty.then(this.creator.declare(ResourcesSpecification.queue(this.queueName).durable(false).autoDelete(true).exclusive(true))).then(this.creator.bind(ResourcesSpecification.binding(this.exchangeName, str, this.queueName))).thenMany(this.receiver.consumeAutoAck(this.queueName).doOnNext(delivery -> {
            try {
                String obj = delivery.getProperties().getHeaders().get("x-correlation-id").toString();
                if (delivery.getProperties().getHeaders().get("x-empty-completion") != null) {
                    this.router.routeEmpty(obj);
                } else {
                    this.router.routeReply(obj, RabbitMessage.fromDelivery(delivery));
                }
            } catch (Exception e) {
                log.log(Level.SEVERE, "Error in reply reception", (Throwable) e);
            }
        }));
        onTerminate();
    }

    private void onTerminate() {
        this.deliveryFlux.doOnTerminate(this::onTerminate).subscribe(new LoggerSubscriber(getClass().getName()));
    }
}
