package xin.manong.weapon.aliyun.ots;

import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.DescribeTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelInfo;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:xin/manong/weapon/aliyun/ots/OTSTunnelWorker.class */
public class OTSTunnelWorker {
    private static final Logger logger = LoggerFactory.getLogger(OTSTunnelWorker.class);
    private OTSTunnelWorkerConfig config;
    private TunnelWorkerConfig workerConfig;
    private TunnelClient tunnelClient;
    private TunnelWorker worker;

    public OTSTunnelWorker(OTSTunnelWorkerConfig oTSTunnelWorkerConfig, TunnelClient tunnelClient) {
        this.config = oTSTunnelWorkerConfig;
        this.tunnelClient = tunnelClient;
        if (!check()) {
            throw new RuntimeException("invalid OTS tunnel worker config");
        }
    }

    private boolean check() {
        if (this.config != null) {
            return this.config.check();
        }
        logger.error("OTS tunnel worker config is null");
        return false;
    }

    public boolean start() {
        logger.info("OTS tunnel worker[{}/{}] is starting ...", this.config.table, this.config.tunnel);
        try {
            TunnelInfo tunnelInfo = this.tunnelClient.describeTunnel(new DescribeTunnelRequest(this.config.table, this.config.tunnel)).getTunnelInfo();
            int i = this.config.consumeThreadNum;
            this.workerConfig = new TunnelWorkerConfig(createThreadPoolExecutor("tunnel_reader", i), createThreadPoolExecutor("tunnel_processor", i), this.config.channelProcessor);
            this.workerConfig.setMaxRetryIntervalInMillis(this.config.maxRetryIntervalMs);
            this.workerConfig.setHeartbeatIntervalInSec(this.config.heartBeatIntervalSec);
            if (this.config.maxChannelParallel > 0) {
                this.workerConfig.setMaxChannelParallel(this.config.maxChannelParallel);
            }
            this.worker = new TunnelWorker(tunnelInfo.getTunnelId(), this.tunnelClient, this.workerConfig);
            this.worker.connectAndWorking();
            logger.info("OTS tunnel worker[{}/{}] has been started", this.config.table, this.config.tunnel);
            return true;
        } catch (Exception e) {
            logger.error("start OTS tunnel worker[{}/{}] failed", this.config.table, this.config.tunnel);
            logger.error(e.getMessage(), e);
            return false;
        }
    }

    public void stop() {
        logger.info("OTS tunnel worker[{}/{}] is stopping ...", this.config.table, this.config.tunnel);
        if (this.worker != null) {
            this.worker.shutdown();
        }
        if (this.workerConfig != null) {
            this.workerConfig.shutdown();
        }
        logger.info("OTS tunnel worker[{}/{}] has been stopped", this.config.table, this.config.tunnel);
    }

    private ThreadPoolExecutor createThreadPoolExecutor(final String str, int i) {
        logger.info("create thread pool executor[{}:{}]", str, Integer.valueOf(i));
        return new ThreadPoolExecutor(i, i, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue(16), new ThreadFactory() { // from class: xin.manong.weapon.aliyun.ots.OTSTunnelWorker.1
            private final AtomicInteger counter = new AtomicInteger();

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                String format = String.format("%s-%d", str, Integer.valueOf(this.counter.getAndIncrement()));
                OTSTunnelWorker.logger.info("create channel receiver thread[{}] success", format);
                return new Thread(runnable, format);
            }
        }, new ThreadPoolExecutor.CallerRunsPolicy());
    }
}
