package me.ehp246.aufkafka.core.consumer;

import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import me.ehp246.aufkafka.api.consumer.ConsumerExceptionListener;
import me.ehp246.aufkafka.api.consumer.InboundConsumerListener;
import me.ehp246.aufkafka.api.consumer.InboundEndpointConsumer;
import me.ehp246.aufkafka.api.consumer.Invocable;
import me.ehp246.aufkafka.api.consumer.InvocableDispatcher;
import me.ehp246.aufkafka.api.consumer.InvocableFactory;
import me.ehp246.aufkafka.api.consumer.UnmatchedConsumer;
import me.ehp246.aufkafka.api.exception.UnknownKeyException;
import me.ehp246.aufkafka.api.spi.MsgMDCContext;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.spi.LoggingEventBuilder;

/* loaded from: input_file:me/ehp246/aufkafka/core/consumer/InboundConsumerRunner.class */
final class InboundConsumerRunner implements Runnable, InboundEndpointConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(InboundConsumerRunner.class);
    private final Consumer<String, String> consumer;
    private final InvocableDispatcher dispatcher;
    private final InvocableFactory invocableFactory;
    private final List<InboundConsumerListener.DispatchingListener> onDispatching;
    private final UnmatchedConsumer onUnmatched;
    private final ConsumerExceptionListener onException;

    /* JADX INFO: Access modifiers changed from: package-private */
    public InboundConsumerRunner(Consumer<String, String> consumer, InvocableDispatcher invocableDispatcher, InvocableFactory invocableFactory, List<InboundConsumerListener.DispatchingListener> list, UnmatchedConsumer unmatchedConsumer, ConsumerExceptionListener consumerExceptionListener) {
        this.consumer = consumer;
        this.dispatcher = invocableDispatcher;
        this.invocableFactory = invocableFactory;
        this.onDispatching = list == null ? List.of() : list;
        this.onUnmatched = unmatchedConsumer;
        this.onException = consumerExceptionListener;
    }

    @Override // java.lang.Runnable
    public void run() {
        loop0: while (true) {
            ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(100L));
            if (poll.count() > 1) {
                LoggingEventBuilder message = LOGGER.atWarn().setMessage("Polled count: {}");
                Objects.requireNonNull(poll);
                message.addArgument(poll::count).log();
            }
            Iterator it = poll.iterator();
            while (it.hasNext()) {
                final ConsumerRecord<String, String> consumerRecord = (ConsumerRecord) it.next();
                try {
                    AutoCloseable autoCloseable = MsgMDCContext.set(consumerRecord);
                    try {
                        this.onDispatching.stream().forEach(dispatchingListener -> {
                            dispatchingListener.onDispatching(consumerRecord);
                        });
                        Invocable invocable = this.invocableFactory.get(consumerRecord);
                        if (invocable != null) {
                            this.dispatcher.dispatch(invocable, consumerRecord);
                        } else {
                            if (this.onUnmatched == null) {
                                throw new UnknownKeyException(consumerRecord);
                                break loop0;
                            }
                            this.onUnmatched.accept(consumerRecord);
                        }
                        if (autoCloseable != null) {
                            autoCloseable.close();
                        }
                    } catch (Throwable th) {
                        if (autoCloseable == null) {
                            break loop0;
                        }
                        try {
                            autoCloseable.close();
                            break loop0;
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                } catch (Exception e) {
                    LoggingEventBuilder message2 = LOGGER.atError().setCause(e).setMessage(this.onException.getClass().getSimpleName() + " failed, ignored: {}, {}, {} because of {}");
                    Objects.requireNonNull(consumerRecord);
                    LoggingEventBuilder addArgument = message2.addArgument(consumerRecord::topic);
                    Objects.requireNonNull(consumerRecord);
                    LoggingEventBuilder addArgument2 = addArgument.addArgument(consumerRecord::key);
                    Objects.requireNonNull(consumerRecord);
                    LoggingEventBuilder addArgument3 = addArgument2.addArgument(consumerRecord::offset);
                    Objects.requireNonNull(e);
                    addArgument3.addArgument(e::getMessage).log();
                    if (this.onException != null) {
                        this.onException.onException(new ConsumerExceptionListener.Context() { // from class: me.ehp246.aufkafka.core.consumer.InboundConsumerRunner.1
                            @Override // me.ehp246.aufkafka.api.consumer.ConsumerExceptionListener.Context
                            public Consumer<String, String> consumer() {
                                return InboundConsumerRunner.this.consumer;
                            }

                            @Override // me.ehp246.aufkafka.api.consumer.ConsumerExceptionListener.Context
                            public ConsumerRecord<String, String> message() {
                                return consumerRecord;
                            }

                            @Override // me.ehp246.aufkafka.api.consumer.ConsumerExceptionListener.Context
                            public Exception thrown() {
                                return e;
                            }
                        });
                    }
                }
            }
            if (poll.count() > 0) {
                this.consumer.commitSync();
            }
        }
        throw th;
    }

    @Override // me.ehp246.aufkafka.api.consumer.InboundEndpointConsumer
    public Consumer<String, String> consumer() {
        return this.consumer;
    }
}
