package org.apache.camel.component.reactor.engine;

import java.io.Closeable;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.camel.Exchange;
import org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy;
import org.apache.camel.component.reactive.streams.ReactiveStreamsDiscardedException;
import org.apache.camel.component.reactive.streams.ReactiveStreamsHelper;
import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
import org.apache.camel.util.ObjectHelper;
import org.reactivestreams.Publisher;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.SynchronousSink;
import reactor.util.concurrent.Queues;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/camel/component/reactor/engine/ReactorCamelProcessor.class */
public final class ReactorCamelProcessor implements Closeable {
    private final String name;
    private final ReactorStreamsService service;
    private ReactiveStreamsProducer camelProducer = null;
    private final AtomicReference<FluxSink<Exchange>> camelSink = new AtomicReference<>();
    private final EmitterProcessor<Exchange> publisher = EmitterProcessor.create(1, false);

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReactorCamelProcessor(ReactorStreamsService reactorStreamsService, String str) {
        this.service = reactorStreamsService;
        this.name = str;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Publisher<Exchange> getPublisher() {
        return this.publisher;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void attach(ReactiveStreamsProducer reactiveStreamsProducer) {
        Objects.requireNonNull(reactiveStreamsProducer, "producer cannot be null, use the detach method");
        if (this.camelProducer != null) {
            throw new IllegalStateException("A producer is already attached to the stream '" + this.name + "'");
        }
        if (this.camelProducer != reactiveStreamsProducer) {
            detach();
            ReactiveStreamsBackpressureStrategy backpressureStrategy = reactiveStreamsProducer.getEndpoint().getBackpressureStrategy();
            AtomicReference<FluxSink<Exchange>> atomicReference = this.camelSink;
            Objects.requireNonNull(atomicReference);
            Flux create = Flux.create((v1) -> {
                r0.set(v1);
            }, FluxSink.OverflowStrategy.IGNORE);
            (ObjectHelper.equal(backpressureStrategy, ReactiveStreamsBackpressureStrategy.OLDEST) ? create.onBackpressureDrop(this::onBackPressure).handle(this::onItemEmitted) : ObjectHelper.equal(backpressureStrategy, ReactiveStreamsBackpressureStrategy.LATEST) ? create.handle(this::onItemEmitted).onBackpressureLatest() : create.onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, this::onBackPressure).handle(this::onItemEmitted)).subscribe(this.publisher);
            this.camelProducer = reactiveStreamsProducer;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void detach() {
        this.camelProducer = null;
        this.camelSink.set(null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void send(Exchange exchange) {
        if (this.service.isRunAllowed()) {
            ((FluxSink) ObjectHelper.notNull(this.camelSink.get(), "FluxSink")).next(exchange);
        }
    }

    private void onItemEmitted(Exchange exchange, SynchronousSink<Exchange> synchronousSink) {
        if (this.service.isRunAllowed()) {
            synchronousSink.next(exchange);
            ReactiveStreamsHelper.invokeDispatchCallback(exchange);
        }
    }

    private void onBackPressure(Exchange exchange) {
        ReactiveStreamsHelper.invokeDispatchCallback(exchange, new ReactiveStreamsDiscardedException("Discarded by back pressure strategy", exchange, this.name));
    }
}
