package org.zodic.kafka.availability;

import org.springframework.context.ApplicationListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.lang.NonNull;
import org.zodiac.commons.util.Colls;
import org.zodiac.core.application.availability.ReadinessState;
import org.zodiac.core.event.application.AppAvailabilityChangeEvent;

/* loaded from: input_file:org/zodic/kafka/availability/KafkaAvailabilityEventListener.class */
public class KafkaAvailabilityEventListener implements ApplicationListener<AppAvailabilityChangeEvent> {
    private KafkaListenerEndpointRegistry registry;
    private KafkaAvailability availability;

    public KafkaAvailabilityEventListener(KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry, KafkaAvailability kafkaAvailability) {
        this.registry = kafkaListenerEndpointRegistry;
        this.availability = kafkaAvailability;
    }

    public void onApplicationEvent(@NonNull AppAvailabilityChangeEvent appAvailabilityChangeEvent) {
        if (appAvailabilityChangeEvent.isReadinessEvent() && this.availability.getReadiness().isEnabled()) {
            ReadinessState state = appAvailabilityChangeEvent.getState();
            if (state.isAcceptTraffic()) {
                this.registry.getAllListenerContainers().stream().filter(messageListenerContainer -> {
                    return isMatch(messageListenerContainer) && !messageListenerContainer.isRunning();
                }).forEach(messageListenerContainer2 -> {
                    messageListenerContainer2.start();
                    if (messageListenerContainer2.isContainerPaused()) {
                        messageListenerContainer2.resume();
                    }
                });
            } else if (state.isRefusingTraffic() && this.registry.isRunning()) {
                this.registry.getAllListenerContainers().stream().filter(messageListenerContainer3 -> {
                    return isMatch(messageListenerContainer3) && messageListenerContainer3.isRunning();
                }).forEach((v0) -> {
                    v0.stop();
                });
            }
        }
    }

    private boolean isMatch(MessageListenerContainer messageListenerContainer) {
        return this.availability.getReadiness().isMatched(Colls.set(messageListenerContainer.getContainerProperties().getTopics()));
    }
}
