package org.reactivecommons.async.rabbit.listeners;

import java.util.logging.Level;
import java.util.logging.Logger;
import lombok.Generated;
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
import org.reactivecommons.async.rabbit.RabbitMessage;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.ResourcesSpecification;

/* loaded from: input_file:org/reactivecommons/async/rabbit/listeners/ApplicationReplyListener.class */
public class ApplicationReplyListener {

    @Generated
    private static final Logger log = Logger.getLogger(ApplicationReplyListener.class.getName());
    private final ReactiveReplyRouter router;
    private final Receiver receiver;
    private final TopologyCreator creator;
    private final String queueName;

    public ApplicationReplyListener(ReactiveReplyRouter reactiveReplyRouter, ReactiveMessageListener reactiveMessageListener, String str) {
        this.router = reactiveReplyRouter;
        this.queueName = str;
        this.receiver = reactiveMessageListener.getReceiver();
        this.creator = reactiveMessageListener.getTopologyCreator();
    }

    public void startListening(String str) {
        this.creator.declare(ResourcesSpecification.exchange("globalReply").type("topic").durable(true)).then(this.creator.declare(ResourcesSpecification.queue(this.queueName).durable(false).autoDelete(true).exclusive(true))).then(this.creator.bind(ResourcesSpecification.binding("globalReply", str, this.queueName))).thenMany(this.receiver.consumeAutoAck(this.queueName).doOnNext(delivery -> {
            try {
                String obj = delivery.getProperties().getHeaders().get("x-correlation-id").toString();
                if (delivery.getProperties().getHeaders().get("x-empty-completion") != null) {
                    this.router.routeEmpty(obj);
                } else {
                    this.router.routeReply(obj, RabbitMessage.fromDelivery(delivery));
                }
            } catch (Exception e) {
                log.log(Level.SEVERE, "Error in reply reception", (Throwable) e);
            }
        })).subscribe();
    }
}
