package xin.manong.stream.boost.receiver.kafka;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import xin.manong.stream.sdk.receiver.Receiver;
import xin.manong.weapon.base.kafka.KafkaConsumeConfig;
import xin.manong.weapon.base.kafka.KafkaConsumeGroup;

/* loaded from: input_file:xin/manong/stream/boost/receiver/kafka/KafkaReceiver.class */
public class KafkaReceiver extends Receiver {
    private static final Logger logger = LoggerFactory.getLogger(KafkaReceiver.class);
    private KafkaProcessor processor;
    private KafkaConsumeGroup consumeGroup;

    public KafkaReceiver(Map<String, Object> map) {
        super(map);
    }

    public boolean start() {
        logger.info("kafka receiver is starting ...");
        KafkaConsumeConfig kafkaConsumeConfig = (KafkaConsumeConfig) JSON.toJavaObject(new JSONObject(this.configMap), KafkaConsumeConfig.class);
        if (kafkaConsumeConfig == null) {
            logger.error("parse kafka consume config failed");
            return false;
        }
        if (!kafkaConsumeConfig.check()) {
            return false;
        }
        if (this.receiveProcessor == null) {
            logger.error("receive processor is null");
            return false;
        }
        this.processor = new KafkaProcessor(this.receiveProcessor);
        this.consumeGroup = new KafkaConsumeGroup(kafkaConsumeConfig, this.processor);
        if (this.consumeGroup.start()) {
            logger.info("kafka receiver has been started");
            return true;
        }
        logger.error("start kafka consume group failed");
        return false;
    }

    public void stop() {
        logger.info("kafka receiver is stopping ...");
        if (this.consumeGroup != null) {
            this.consumeGroup.stop();
        }
        logger.info("kafka receiver has been stopped");
    }
}
