package group.rxcloud.capa.spi.aws.log.handle;

import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import group.rxcloud.capa.infrastructure.hook.Mixer;
import group.rxcloud.capa.spi.aws.log.configuration.CapaComponentLogConfiguration;
import group.rxcloud.capa.spi.aws.log.manager.CustomLogManager;
import group.rxcloud.capa.spi.aws.log.service.CloudWatchLogsService;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import software.amazon.awssdk.utils.CollectionUtils;

/* loaded from: input_file:group/rxcloud/capa/spi/aws/log/handle/MessageSender.class */
public class MessageSender extends Thread {
    private static final int MAX_COUNT_PER_CHUNK = 100;
    private static final long WAIT_INTERVAL = 20;
    private static final int MAX_SIZE_PER_CHUNK = 1048576;
    private static final String PUT_LOG_EVENTS_RESOURCE_NAME = "CloudWatchLogs.putLogEvents";
    private static final String MESSAGE_SENDER_ERROR_NAMESPACE = "LogMessageSenderError";
    private static final String MESSAGE_SENDER_ERROR_METRIC_NAME = "LogsSenderError";
    private static final String LOG_STREAM_COUNT_NAME = "logStreamCount";
    private static final int DEFAULT_MAX_RULE_COUNT = 10;
    private static final String CLOUD_WATCH_AGENT_SWITCH_NAME = "cloudWatchAgentSwitch";
    private static Optional<LongCounter> LONG_COUNTER = Optional.empty();
    private final ChunkQueue chunkQueue;
    private volatile CountDownLatch shutdownLatch;
    private volatile boolean running = true;
    private final LinkedList<CompressedChunk> readCompressedChunk = new LinkedList<>();

    public MessageSender(ChunkQueue chunkQueue) {
        this.chunkQueue = chunkQueue;
    }

    private static void initFlowRules() {
        ArrayList arrayList = new ArrayList();
        AtomicInteger atomicInteger = new AtomicInteger(DEFAULT_MAX_RULE_COUNT);
        CapaComponentLogConfiguration.getInstanceOpt().ifPresent(capaComponentLogConfiguration -> {
            if (capaComponentLogConfiguration.containsKey(LOG_STREAM_COUNT_NAME)) {
                atomicInteger.set(Integer.parseInt(LOG_STREAM_COUNT_NAME));
            }
        });
        for (int i = 0; i < atomicInteger.get(); i++) {
            FlowRule flowRule = new FlowRule();
            flowRule.setResource("CloudWatchLogs.putLogEvents_" + i);
            flowRule.setGrade(1);
            flowRule.setCount(5.0d);
            arrayList.add(flowRule);
        }
        FlowRuleManager.loadRules(arrayList);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (this.running) {
            try {
                buildCompressedChunk();
                if (this.readCompressedChunk == null || this.readCompressedChunk.isEmpty()) {
                    try {
                        Thread.sleep(WAIT_INTERVAL);
                    } catch (InterruptedException e) {
                    }
                } else {
                    putLogToCloudWatch(getMessage());
                }
            } catch (Throwable th) {
                CustomLogManager.error("MessageSender build chunk error.", th);
                LONG_COUNTER.ifPresent(longCounter -> {
                    longCounter.bind(Attributes.of(AttributeKey.stringKey("BuildCompressedChunkError"), th.getClass().getName())).add(1L);
                });
            }
        }
        long currentTimeMillis = System.currentTimeMillis() + 60000;
        while (System.currentTimeMillis() <= currentTimeMillis) {
            buildCompressedChunk();
            if (this.readCompressedChunk == null || this.readCompressedChunk.isEmpty()) {
                break;
            } else {
                putLogToCloudWatch(getMessage());
            }
        }
        this.shutdownLatch.countDown();
    }

    private void putLogToCloudWatch(List<String> list) {
        Optional<CapaComponentLogConfiguration> instanceOpt = CapaComponentLogConfiguration.getInstanceOpt();
        if (instanceOpt.isPresent() && instanceOpt.get().containsKey(CLOUD_WATCH_AGENT_SWITCH_NAME) && !Boolean.TRUE.toString().equalsIgnoreCase(instanceOpt.get().get(CLOUD_WATCH_AGENT_SWITCH_NAME))) {
            putLogsByApi(list);
        } else {
            putLogsByAgent(list);
        }
    }

    private void putLogsByApi(List<String> list) {
        try {
            List<String> logStreamNames = CloudWatchLogsService.getLogStreamNames();
            int nextInt = new Random().nextInt(logStreamNames.size());
            try {
                Entry entry = SphU.entry("CloudWatchLogs.putLogEvents_" + nextInt);
                Throwable th = null;
                try {
                    CloudWatchLogsService.putLogEvents(list, logStreamNames.get(nextInt));
                    if (entry != null) {
                        if (0 != 0) {
                            try {
                                entry.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            entry.close();
                        }
                    }
                } catch (Throwable th3) {
                    if (entry != null) {
                        if (0 != 0) {
                            try {
                                entry.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            entry.close();
                        }
                    }
                    throw th3;
                }
            } catch (BlockException e) {
                try {
                    Thread.sleep(WAIT_INTERVAL);
                    putLogsByApi(list);
                } catch (Exception e2) {
                }
            }
        } catch (Throwable th5) {
            CustomLogManager.error("MessageSender send message error.", th5);
            LONG_COUNTER.ifPresent(longCounter -> {
                longCounter.bind(Attributes.of(AttributeKey.stringKey("SenderPutLogEventsError"), th5.getClass().getName())).add(1L);
            });
        }
    }

    private void putLogsByAgent(List<String> list) {
        if (CollectionUtils.isNullOrEmpty(list)) {
            return;
        }
        list.forEach(str -> {
            System.out.println(str);
        });
    }

    private List<String> getMessage() {
        ArrayList arrayList = new ArrayList();
        while (true) {
            CompressedChunk pollChunk = pollChunk();
            if (pollChunk == null) {
                return arrayList;
            }
            arrayList.add(pollChunk.getMessage());
        }
    }

    private boolean buildCompressedChunk() {
        if (this.chunkQueue.isEmpty()) {
            return false;
        }
        this.chunkQueue.drainTo(this.readCompressedChunk, MAX_COUNT_PER_CHUNK, MAX_SIZE_PER_CHUNK);
        return true;
    }

    private CompressedChunk pollChunk() {
        return this.readCompressedChunk.poll();
    }

    public void shutdown() {
        this.shutdownLatch = new CountDownLatch(1);
        try {
            this.running = false;
            interrupt();
            this.shutdownLatch.await();
        } catch (InterruptedException e) {
        }
    }

    static {
        initFlowRules();
        Mixer.telemetryHooksNullable().ifPresent(telemetryHooks -> {
            LONG_COUNTER = Optional.ofNullable(((Meter) telemetryHooks.buildMeter(MESSAGE_SENDER_ERROR_NAMESPACE).block()).counterBuilder(MESSAGE_SENDER_ERROR_METRIC_NAME).build());
        });
    }
}
