package com.tvd12.ezymq.kafka.endpoint;

import com.tvd12.ezyfox.concurrent.EzyExecutors;
import com.tvd12.ezyfox.util.EzyCloseable;
import com.tvd12.ezyfox.util.EzyProcessor;
import com.tvd12.ezyfox.util.EzyStartable;
import com.tvd12.ezymq.kafka.endpoint.EzyKafkaEndpoint;
import com.tvd12.ezymq.kafka.handler.EzyKafkaRecordsHandler;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.serialization.Deserializer;

/* loaded from: input_file:com/tvd12/ezymq/kafka/endpoint/EzyKafkaServer.class */
public class EzyKafkaServer extends EzyKafkaEndpoint implements EzyStartable, EzyCloseable {
    protected final long pollTimeOut;
    protected final Consumer consumer;
    protected volatile boolean active;
    protected final Thread poolRecordThread;
    protected final ExecutorService executorService;
    protected EzyKafkaRecordsHandler recordsHandler;

    /* loaded from: input_file:com/tvd12/ezymq/kafka/endpoint/EzyKafkaServer$Builder.class */
    public static class Builder extends EzyKafkaEndpoint.Builder<Builder> {
        protected Consumer consumer;
        protected int threadPoolSize;
        protected long pollTimeOut = 100;
        protected Deserializer deserializer;
        protected EzyKafkaRecordsHandler recordsHandler;

        public Builder pollTimeOut(long j) {
            this.pollTimeOut = j;
            return this;
        }

        public Builder threadPoolSize(int i) {
            this.threadPoolSize = i;
            return this;
        }

        public Builder consumer(Consumer consumer) {
            this.consumer = consumer;
            return this;
        }

        public Builder deserializer(Deserializer deserializer) {
            this.deserializer = deserializer;
            return this;
        }

        public Builder recordsHandler(EzyKafkaRecordsHandler ezyKafkaRecordsHandler) {
            this.recordsHandler = ezyKafkaRecordsHandler;
            return this;
        }

        /* renamed from: build, reason: merged with bridge method [inline-methods] */
        public EzyKafkaServer m7build() {
            if (this.consumer == null) {
                this.consumer = newConsumer(this.deserializer);
            }
            return new EzyKafkaServer(this.topic, this.consumer, this.pollTimeOut, this.threadPoolSize);
        }
    }

    public EzyKafkaServer(String str, Consumer consumer, long j) {
        this(str, consumer, j, 1);
    }

    public EzyKafkaServer(String str, Consumer consumer, long j, int i) {
        this(str, consumer, j, newExecutorService(str, i));
    }

    public EzyKafkaServer(String str, Consumer consumer, long j, ExecutorService executorService) {
        super(str);
        this.consumer = consumer;
        this.pollTimeOut = j;
        this.executorService = executorService;
        this.poolRecordThread = newPoolRecordThread(str);
    }

    protected Thread newPoolRecordThread(String str) {
        return EzyExecutors.newThreadFactory("kafka-consumer-pool-" + str).newThread(() -> {
            loop();
        });
    }

    protected static ExecutorService newExecutorService(String str, int i) {
        ExecutorService newFixedThreadPool = EzyExecutors.newFixedThreadPool(i, "kafka-consumer-" + str);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            newFixedThreadPool.shutdown();
        }));
        return newFixedThreadPool;
    }

    public void start() throws Exception {
        this.active = true;
        this.consumer.subscribe(Collections.singletonList(this.topic));
        this.poolRecordThread.start();
    }

    protected void loop() {
        while (this.active) {
            pollRecords();
        }
    }

    protected void pollRecords() {
        ConsumerRecords poll;
        try {
            ConsumerRecords consumerRecords = ConsumerRecords.EMPTY;
            synchronized (this) {
                poll = this.consumer.poll(Duration.ofMillis(this.pollTimeOut));
            }
            poll.forEach(obj -> {
                this.executorService.execute(() -> {
                    try {
                        this.recordsHandler.handleRecord((ConsumerRecord) obj);
                    } catch (Throwable th) {
                        if (this.active) {
                            this.logger.warn("handle record: {} error", obj, th);
                        }
                    }
                });
            });
        } catch (Exception e) {
            if (this.active) {
                this.logger.warn("poll records error", e);
            }
        }
    }

    public void close() {
        this.active = false;
        synchronized (this) {
            EzyProcessor.processWithLogException(() -> {
                this.consumer.close();
            });
        }
        this.executorService.shutdown();
    }

    public static Builder builder() {
        return new Builder();
    }

    public void setRecordsHandler(EzyKafkaRecordsHandler ezyKafkaRecordsHandler) {
        this.recordsHandler = ezyKafkaRecordsHandler;
    }
}
