package xin.manong.weapon.aliyun.ots;

import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.ChannelInfo;
import com.alicloud.openservices.tablestore.model.tunnel.DescribeTunnelRequest;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import xin.manong.weapon.alarm.Alarm;
import xin.manong.weapon.alarm.AlarmSender;
import xin.manong.weapon.alarm.AlarmStatus;

/* loaded from: input_file:xin/manong/weapon/aliyun/ots/OTSTunnelMonitor.class */
public class OTSTunnelMonitor implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(OTSTunnelMonitor.class);
    private static final long DEFAULT_CHECK_TIME_INTERVAL_MS = 600000;
    private boolean running = false;
    private long checkTimeIntervalMs = DEFAULT_CHECK_TIME_INTERVAL_MS;
    private String appName;
    private OTSTunnelConfig tunnelConfig;
    private TunnelClient tunnelClient;
    private AlarmSender alarmSender;
    private Thread workThread;

    public OTSTunnelMonitor(OTSTunnelConfig oTSTunnelConfig, TunnelClient tunnelClient) {
        this.tunnelConfig = oTSTunnelConfig;
        this.tunnelClient = tunnelClient;
    }

    public void start() {
        logger.info("OTSTunnel monitor is starting ...");
        this.running = true;
        this.workThread = new Thread(this, "TunnelMonitor");
        this.workThread.start();
        logger.info("tunnel monitor has been started");
    }

    public void stop() {
        logger.info("tunnel monitor is stopping ...");
        this.running = false;
        if (this.workThread.isAlive()) {
            this.workThread.interrupt();
        }
        try {
            this.workThread.join();
        } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
        }
        logger.info("tunnel monitor has been stopped");
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.running) {
            Iterator<OTSTunnelWorkerConfig> it = this.tunnelConfig.workerConfigs.iterator();
            while (it.hasNext()) {
                check(it.next());
            }
            logger.info("tunnel monitor is running, sleep {} ms", Long.valueOf(this.checkTimeIntervalMs));
            try {
                Thread.sleep(this.checkTimeIntervalMs);
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
        }
    }

    private void check(OTSTunnelWorkerConfig oTSTunnelWorkerConfig) {
        List<ChannelInfo> channelInfos = this.tunnelClient.describeTunnel(new DescribeTunnelRequest(oTSTunnelWorkerConfig.table, oTSTunnelWorkerConfig.tunnel)).getChannelInfos();
        int i = 0;
        long currentTimeMillis = System.currentTimeMillis();
        for (ChannelInfo channelInfo : channelInfos) {
            long time = channelInfo.getChannelConsumePoint().getTime();
            if (time > 0) {
                long j = currentTimeMillis - time;
                if (j >= oTSTunnelWorkerConfig.maxConsumeDelayMs) {
                    logger.warn("consume delay[{}] for channel[{}] in tunnel[{}] of table[{}]", new Object[]{Long.valueOf(j), channelInfo.getChannelId(), oTSTunnelWorkerConfig.tunnel, oTSTunnelWorkerConfig.table, Long.valueOf(j)});
                    i++;
                }
            }
        }
        if (i > 0) {
            Alarm title = new Alarm(String.format("OTS通道[%s:%s]数据堆积: 堆积channel数量[%d], 超过最大消费延时[%d]ms", oTSTunnelWorkerConfig.table, oTSTunnelWorkerConfig.tunnel, Integer.valueOf(i), Long.valueOf(oTSTunnelWorkerConfig.maxConsumeDelayMs)), AlarmStatus.ERROR).setAppName(this.appName).setTitle("OTS通道数据堆积报警");
            if (this.alarmSender != null) {
                this.alarmSender.send(title);
            }
        }
    }

    public void setAlarmSender(AlarmSender alarmSender) {
        this.alarmSender = alarmSender;
    }

    public void setAppName(String str) {
        this.appName = str;
    }
}
