package xin.manong.stream.framework.runner;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.Feature;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.ServiceLoader;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import xin.manong.stream.framework.annotation.StreamApplication;
import xin.manong.stream.framework.common.StreamManager;
import xin.manong.stream.framework.processor.ProcessorGraph;
import xin.manong.stream.framework.processor.ProcessorGraphFactory;
import xin.manong.stream.framework.receiver.ReceiveControllerConfig;
import xin.manong.stream.framework.receiver.ReceiveManager;
import xin.manong.stream.framework.resource.ResourceConfig;
import xin.manong.stream.framework.resource.ResourceManager;
import xin.manong.stream.sdk.common.UnacceptableException;
import xin.manong.weapon.alarm.Alarm;
import xin.manong.weapon.alarm.AlarmConfig;
import xin.manong.weapon.alarm.AlarmSender;
import xin.manong.weapon.alarm.AlarmStatus;
import xin.manong.weapon.base.secret.DynamicSecretListener;
import xin.manong.weapon.base.util.FileUtil;
import xin.manong.weapon.base.util.ReflectParams;
import xin.manong.weapon.base.util.ReflectUtil;

/* loaded from: input_file:xin/manong/stream/framework/runner/StreamRunner.class */
public class StreamRunner {
    private static final Logger logger = LoggerFactory.getLogger(StreamRunner.class);
    private static final String CLASS_PATH_PREFIX = "classpath:";
    private StreamRunnerConfig config;
    private ReceiveManager receiveManager;
    private AlarmSender alarmSender;

    public StreamRunner(StreamRunnerConfig streamRunnerConfig) {
        if (streamRunnerConfig == null || !streamRunnerConfig.check()) {
            throw new RuntimeException("check stream config failed");
        }
        this.config = streamRunnerConfig;
    }

    public boolean start() throws Exception {
        logger.info("stream[{}] is starting ...", this.config.name);
        Iterator it = ServiceLoader.load(DynamicSecretListener.class).iterator();
        while (it.hasNext()) {
            ((DynamicSecretListener) it.next()).start();
        }
        if (!startAlarmSender()) {
            return false;
        }
        StreamManager.buildStreamLogger(this.config.loggerFile, this.config.loggerKeys);
        if (this.config.resources != null) {
            Iterator<ResourceConfig> it2 = this.config.resources.iterator();
            while (it2.hasNext()) {
                ResourceManager.registerResource(it2.next());
            }
        }
        if (!checkProcessorGraph()) {
            return false;
        }
        this.receiveManager = new ReceiveManager(this.config.receivers, this.config.processors);
        this.receiveManager.setAppName(this.config.name);
        this.receiveManager.setAlarmSender(this.alarmSender);
        if (!this.receiveManager.init() || !this.receiveManager.start()) {
            return false;
        }
        if (this.alarmSender != null) {
            this.alarmSender.send(new Alarm(String.format("stream app[%s] has been started", this.config.name), AlarmStatus.INFO).setAppName(this.config.name).setTitle("应用启动通知"));
        }
        logger.info("stream[{}] has been started", this.config.name);
        return true;
    }

    public void stop() {
        logger.info("stream[{}] is stopping ...", this.config.name);
        if (this.receiveManager != null) {
            this.receiveManager.destroy();
        }
        ProcessorGraphFactory.sweep();
        ResourceManager.unregisterAllResources();
        if (this.alarmSender != null) {
            this.alarmSender.send(new Alarm(String.format("stream app[%s] has been stopped", this.config.name), AlarmStatus.INFO).setAppName(this.config.name).setTitle("应用停止通知"));
            this.alarmSender.stop();
        }
        logger.info("stream[{}] has been stopped", this.config.name);
    }

    private boolean startAlarmSender() {
        if (this.config.alarmConfig == null) {
            logger.info("alarm config is null, ignore start alarm sender request");
            return true;
        }
        if (!this.config.alarmConfig.check()) {
            logger.error("invalid alarm config");
            return false;
        }
        try {
            this.alarmSender = (AlarmSender) ReflectUtil.newInstance(this.config.alarmConfig.alarmSenderClass, new ReflectParams(new Class[]{AlarmConfig.class}, new Object[]{this.config.alarmConfig}));
            if (this.alarmSender.start()) {
                logger.info("start alarm sender[{}] success", this.config.alarmConfig.alarmSenderClass);
                return true;
            }
            logger.error("start alarm sender[{}] failed", this.config.alarmConfig.alarmSenderClass);
            return false;
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            logger.error("start alarm sender[{}] failed", this.config.alarmConfig.alarmSenderClass);
            return false;
        }
    }

    private boolean checkProcessorGraph() throws UnacceptableException {
        ProcessorGraph make = ProcessorGraphFactory.make(this.config.processors);
        for (ReceiveControllerConfig receiveControllerConfig : this.config.receivers) {
            if (!receiveControllerConfig.check()) {
                return false;
            }
            for (String str : receiveControllerConfig.processors) {
                if (!make.containsProcessor(str)) {
                    logger.error("processor[{}] is not found for receiver[{}]", str, receiveControllerConfig.name);
                    return false;
                }
            }
        }
        ProcessorGraphFactory.sweep();
        return true;
    }

    private static String parseCommands(String[] strArr) throws ParseException {
        Options options = new Options();
        options.addOption(Option.builder("h").longOpt("help").desc("help information for stream runner").build());
        options.addOption(Option.builder("c").hasArg().required().desc("stream config file path").build());
        DefaultParser defaultParser = new DefaultParser();
        HelpFormatter helpFormatter = new HelpFormatter();
        CommandLine parse = defaultParser.parse(options, strArr);
        if (parse.hasOption("h")) {
            helpFormatter.printHelp(StreamRunner.class.getName(), options);
            System.exit(0);
        }
        return parse.getOptionValue("c");
    }

    private static void checkStreamApplication(StreamApplication streamApplication, Class cls) {
        if (streamApplication == null) {
            logger.error("resource class[{}] is not stream application resource", cls.getName());
            throw new RuntimeException(String.format("resource class[%s] is not stream application resource", cls.getName()));
        }
        if (StringUtils.isEmpty(streamApplication.name())) {
            logger.error("stream application name is empty");
            throw new RuntimeException("stream application name is empty");
        }
        if (StringUtils.isEmpty(streamApplication.configFile()) || !streamApplication.configFile().startsWith(CLASS_PATH_PREFIX)) {
            logger.error("invalid stream config file[{}], must start with prefix[{}]", streamApplication.configFile(), CLASS_PATH_PREFIX);
            throw new RuntimeException(String.format("invalid stream config file[%s], must start with prefix[%s]", streamApplication.configFile(), CLASS_PATH_PREFIX));
        }
    }

    private static StreamRunnerConfig parseStreamConfig(Class cls, String[] strArr) throws Exception {
        try {
            return (StreamRunnerConfig) JSON.toJavaObject(JSON.parseObject(FileUtil.read(parseCommands(strArr), Charset.forName("UTF-8"))), StreamRunnerConfig.class);
        } catch (ParseException e) {
            if (cls == null) {
                throw e;
            }
            StreamApplication streamApplication = (StreamApplication) cls.getAnnotation(StreamApplication.class);
            checkStreamApplication(streamApplication, cls);
            String substring = streamApplication.configFile().substring(CLASS_PATH_PREFIX.length());
            String format = substring.startsWith("/") ? substring : String.format("/%s", substring);
            InputStream resourceAsStream = cls.getResourceAsStream(format);
            if (resourceAsStream == null) {
                logger.error("stream application config is not found for path[{}]", format);
                throw new RuntimeException(String.format("stream application config is not found for path[%s]", format));
            }
            byte[] bArr = new byte[4096];
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(4096);
            while (true) {
                int read = resourceAsStream.read(bArr, 0, bArr.length);
                if (read == -1) {
                    String str = new String(byteArrayOutputStream.toByteArray(), Charset.forName("UTF-8"));
                    byteArrayOutputStream.close();
                    resourceAsStream.close();
                    StreamRunnerConfig streamRunnerConfig = (StreamRunnerConfig) JSON.toJavaObject(JSON.parseObject(str), StreamRunnerConfig.class);
                    streamRunnerConfig.name = streamApplication.name();
                    return streamRunnerConfig;
                }
                byteArrayOutputStream.write(bArr, 0, read);
            }
        }
    }

    public static void run(Class cls, String[] strArr) throws Exception {
        JSON.DEFAULT_PARSER_FEATURE &= Feature.UseBigDecimal.getMask() ^ (-1);
        StreamRunnerConfig parseStreamConfig = parseStreamConfig(cls, strArr);
        StreamRunner streamRunner = new StreamRunner(parseStreamConfig);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            streamRunner.stop();
            countDownLatch.countDown();
        }));
        if (!streamRunner.start()) {
            logger.error("start stream[{}] failed", parseStreamConfig.name);
            System.exit(1);
        }
        logger.info("stream[{}] is working ...", parseStreamConfig.name);
        countDownLatch.await();
        logger.info("stream[{}] finished working", parseStreamConfig.name);
    }
}
