package ca.bc.gov.tno.services.kafka;

import ca.bc.gov.tno.models.NlpContent;
import ca.bc.gov.tno.models.Tag;
import ca.bc.gov.tno.services.ServiceState;
import ca.bc.gov.tno.services.ServiceStatus;
import ca.bc.gov.tno.services.events.ErrorEvent;
import ca.bc.gov.tno.services.events.ServiceStartEvent;
import ca.bc.gov.tno.services.events.ServiceStopEvent;
import ca.bc.gov.tno.services.kafka.config.KafkaConsumerConfig;
import ca.bc.gov.tno.services.kafka.events.ConsumerPauseEvent;
import ca.bc.gov.tno.services.kafka.events.ConsumerRecordReceivedEvent;
import ca.bc.gov.tno.services.kafka.events.ConsumerResumeEvent;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.scheduling.annotation.Async;

@Async
/* loaded from: input_file:ca/bc/gov/tno/services/kafka/BaseKafkaConsumerService.class */
public class BaseKafkaConsumerService<K, R> implements ApplicationListener<ServiceStartEvent> {
    private static final Logger logger = LogManager.getLogger(BaseKafkaConsumerService.class);
    protected final ServiceState state;
    protected final ApplicationEventPublisher eventPublisher;
    protected final KafkaConsumer<K, R> consumer;
    protected final List<String> topics;
    protected final Duration timeout;

    @EventListener
    public void handlePauseEvent(ConsumerPauseEvent consumerPauseEvent) {
        logger.info("Pause consumer requested");
        this.state.setStatus(ServiceStatus.pause);
    }

    @EventListener
    public void handleResumeEvent(ConsumerResumeEvent consumerResumeEvent) {
        logger.info("Resume consumer requested");
        this.state.setStatus(ServiceStatus.resume);
    }

    @EventListener
    public void handleResumeEvent(ServiceStopEvent serviceStopEvent) {
        logger.info("Stop service requested");
        this.state.setStatus(ServiceStatus.sleeping);
    }

    public BaseKafkaConsumerService(ServiceState serviceState, KafkaConsumerConfig kafkaConsumerConfig, ApplicationEventPublisher applicationEventPublisher) {
        this.state = serviceState;
        this.eventPublisher = applicationEventPublisher;
        this.topics = Arrays.asList(kafkaConsumerConfig.getConsumerTopics().trim().split("\\s*,\\s*"));
        this.timeout = Duration.ofMillis(kafkaConsumerConfig.getPollTimeout());
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaConsumerConfig.getBootstrapServers());
        properties.put("group.id", kafkaConsumerConfig.getGroupId());
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", JsonDeserializer.class);
        properties.put("spring.json.type.mapping", String.format("Content:%s, Tag:%s", NlpContent.class.getName(), Tag.class.getName()));
        properties.put("spring.json.key.default.type", String.class.getName());
        properties.put("spring.json.value.default.type", NlpContent.class.getName());
        properties.put("enable.auto.commit", Boolean.valueOf(kafkaConsumerConfig.isEnableAutoCommit()));
        properties.put("auto.offset.reset", kafkaConsumerConfig.getAutoOffsetRest());
        this.consumer = new KafkaConsumer<>(properties);
    }

    public void onApplicationEvent(ServiceStartEvent serviceStartEvent) {
        try {
            logger.info("Consumer started.");
            this.state.setStatus(ServiceStatus.running);
            this.consumer.subscribe(this.topics);
            Set assignment = this.consumer.assignment();
            String join = String.join(", ", this.topics);
            while (this.state.getStatus() != ServiceStatus.sleeping) {
                if (this.state.getStatus() == ServiceStatus.pause) {
                    this.consumer.pause(assignment);
                    this.state.setStatus(ServiceStatus.paused);
                } else if (this.state.getStatus() == ServiceStatus.resume) {
                    this.consumer.resume(assignment);
                    this.state.setStatus(ServiceStatus.running);
                }
                logger.debug(String.format("Polling: (%s) topic: %s", this.state.getStatus(), join));
                ConsumerRecords poll = this.consumer.poll(this.timeout);
                this.state.setFailedAttempts(0);
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    logger.info(String.format("Record received: key: %s", consumerRecord.key()));
                    this.eventPublisher.publishEvent(new ConsumerRecordReceivedEvent(this, consumerRecord));
                }
            }
        } catch (InvalidOffsetException | WakeupException | InterruptException | ArithmeticException e) {
            this.state.setStatus(ServiceStatus.sleeping);
            this.eventPublisher.publishEvent(new ErrorEvent(this, e));
        }
    }

    public void finalize() {
        this.consumer.close();
    }
}
