package xin.manong.stream.boost.receiver.memory;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import xin.manong.stream.sdk.annotation.Resource;
import xin.manong.stream.sdk.receiver.Receiver;
import xin.manong.weapon.base.record.KVRecords;

/* loaded from: input_file:xin/manong/stream/boost/receiver/memory/MemoryReceiver.class */
public class MemoryReceiver extends Receiver {
    private static final Logger logger = LoggerFactory.getLogger(MemoryReceiver.class);
    private MemoryReceiveHandler[] handlers;

    @Resource(name = "${recordQueue}")
    protected BlockingQueue<KVRecords> recordQueue;

    public MemoryReceiver(Map<String, Object> map) {
        super(map);
    }

    public boolean start() {
        logger.info("memory receiver is starting ...");
        MemoryReceiverConfig memoryReceiverConfig = (MemoryReceiverConfig) JSON.toJavaObject(new JSONObject(this.configMap), MemoryReceiverConfig.class);
        if (!memoryReceiverConfig.check()) {
            return false;
        }
        this.handlers = new MemoryReceiveHandler[memoryReceiverConfig.threadNum];
        for (int i = 0; i < memoryReceiverConfig.threadNum; i++) {
            this.handlers[i] = new MemoryReceiveHandler(String.format("%s-%d", "memory-receive-handler", Integer.valueOf(i)), this.recordQueue, this.receiveProcessor);
            this.handlers[i].start();
        }
        logger.info("memory receiver has been started");
        return true;
    }

    public void stop() {
        logger.info("memory receiver is stopping ...");
        for (int i = 0; i < this.handlers.length; i++) {
            this.handlers[i].stop();
        }
        logger.info("memory receiver has been stopped");
    }
}
