package me.ehp246.aufkafka.core.consumer;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import me.ehp246.aufkafka.api.AufKafkaConstant;
import me.ehp246.aufkafka.api.consumer.BoundInvocable;
import me.ehp246.aufkafka.api.consumer.Invocable;
import me.ehp246.aufkafka.api.consumer.InvocableBinder;
import me.ehp246.aufkafka.api.consumer.InvocableDispatcher;
import me.ehp246.aufkafka.api.consumer.InvocationListener;
import me.ehp246.aufkafka.api.consumer.InvocationModel;
import me.ehp246.aufkafka.api.consumer.Invoked;
import me.ehp246.aufkafka.api.exception.BoundInvocationFailedException;
import me.ehp246.aufkafka.api.spi.MsgMDCContext;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.slf4j.spi.LoggingEventBuilder;
import org.springframework.lang.Nullable;

/* loaded from: input_file:me/ehp246/aufkafka/core/consumer/DefaultInvocableDispatcher.class */
final class DefaultInvocableDispatcher implements InvocableDispatcher {
    private static final Logger LOGGER = LoggerFactory.getLogger(InvocableDispatcher.class);
    private final Executor executor;
    private final InvocableBinder binder;
    private final List<InvocationListener.InvokingListener> invoking = new ArrayList();
    private final List<InvocationListener.CompletedListener> completed = new ArrayList();
    private final List<InvocationListener.FailedListener> failed = new ArrayList();

    public DefaultInvocableDispatcher(InvocableBinder invocableBinder, @Nullable List<InvocationListener> list, @Nullable Executor executor) {
        this.binder = invocableBinder;
        this.executor = executor;
        for (InvocationListener invocationListener : list == null ? List.of() : list) {
            if (invocationListener instanceof InvocationListener.InvokingListener) {
                this.invoking.add((InvocationListener.InvokingListener) invocationListener);
            }
            if (invocationListener instanceof InvocationListener.CompletedListener) {
                this.completed.add((InvocationListener.CompletedListener) invocationListener);
            }
            if (invocationListener instanceof InvocationListener.FailedListener) {
                this.failed.add((InvocationListener.FailedListener) invocationListener);
            }
        }
    }

    @Override // me.ehp246.aufkafka.api.consumer.InvocableDispatcher
    public void dispatch(Invocable invocable, ConsumerRecord<String, String> consumerRecord) {
        BoundInvocable[] boundInvocableArr = {null};
        Runnable runnable = () -> {
            try {
                boundInvocableArr[0] = this.binder.bind(invocable, consumerRecord);
                BoundInvocable boundInvocable = boundInvocableArr[0];
                Optional.ofNullable(boundInvocable.mdcMap()).map((v0) -> {
                    return v0.entrySet();
                }).filter(set -> {
                    return !set.isEmpty();
                }).ifPresent(set2 -> {
                    set2.stream().forEach(entry -> {
                        MDC.put((String) entry.getKey(), (String) entry.getValue());
                    });
                });
                this.invoking.forEach(invokingListener -> {
                    invokingListener.onInvoking(boundInvocable);
                });
                Invoked invoke = boundInvocable.invoke();
                if (!(invoke instanceof Invoked.Failed)) {
                    Invoked.Completed completed = (Invoked.Completed) invoke;
                    this.completed.forEach(completedListener -> {
                        completedListener.onCompleted(completed);
                    });
                    return;
                }
                Invoked.Failed failed = (Invoked.Failed) invoke;
                Iterator<InvocationListener.FailedListener> it = this.failed.iterator();
                while (it.hasNext()) {
                    try {
                        it.next().onFailed(failed);
                    } catch (Exception e) {
                        failed.thrown().addSuppressed(e);
                    }
                }
                Throwable thrown = failed.thrown();
                if (!(thrown instanceof RuntimeException)) {
                    throw new BoundInvocationFailedException(failed.thrown());
                }
                throw ((RuntimeException) thrown);
            } finally {
                if (invocable != null) {
                    try {
                        invocable.close();
                    } catch (Exception e2) {
                        LoggingEventBuilder message = LOGGER.atWarn().setCause(e2).addMarker(AufKafkaConstant.EXCEPTION).setMessage("Ignored: {}");
                        Objects.requireNonNull(e2);
                        message.addArgument(e2::getMessage).log();
                        Optional.ofNullable(boundInvocableArr[(char) 0]).map((v0) -> {
                            return v0.mdcMap();
                        }).map((v0) -> {
                            return v0.keySet();
                        }).map((v0) -> {
                            return v0.stream();
                        }).ifPresent(stream -> {
                            stream.forEach(MDC::remove);
                        });
                    }
                }
                Optional.ofNullable(boundInvocableArr[(char) 0]).map((v0) -> {
                    return v0.mdcMap();
                }).map((v0) -> {
                    return v0.keySet();
                }).map((v0) -> {
                    return v0.stream();
                }).ifPresent(stream2 -> {
                    stream2.forEach(MDC::remove);
                });
            }
        };
        if (this.executor == null || invocable.invocationModel() == InvocationModel.INLINE) {
            runnable.run();
        } else {
            this.executor.execute(() -> {
                try {
                    AutoCloseable autoCloseable = MsgMDCContext.set(consumerRecord);
                    try {
                        runnable.run();
                        if (autoCloseable != null) {
                            autoCloseable.close();
                        }
                    } finally {
                    }
                } catch (Exception e) {
                    LoggingEventBuilder message = LOGGER.atWarn().setCause(e).addMarker(AufKafkaConstant.EXCEPTION).setMessage("Ignored: {}");
                    Objects.requireNonNull(e);
                    message.addArgument(e::getMessage).log();
                }
            });
        }
    }
}
