package cn.wjchang.common.clickhouse.core;

import cn.wjchang.common.clickhouse.common.ChThreadFactory;
import cn.wjchang.common.clickhouse.common.ClickHouseConfig;
import io.netty.handler.codec.http.HttpHeaderNames;
import java.io.Closeable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/wjchang/common/clickhouse/core/BatchProcessor.class */
public class BatchProcessor implements Closeable {
    private static final Logger logger = LoggerFactory.getLogger(BatchProcessor.class);
    private final AsyncHttpClient client;
    private final String targetTable;
    private final int maxFlushRows;
    private final int flushIntervalSec;
    private final Listener listener;
    private final ClickHouseConfig config;
    private TableRowsBuffer tableRowsBuffer;
    private final ScheduledThreadPoolExecutor scheduler;
    private final ExecutorService callbackService;
    private final ScheduledFuture scheduledFuture;
    private volatile boolean closed = false;
    private final AtomicLong executionIdGen = new AtomicLong();

    /* loaded from: input_file:cn/wjchang/common/clickhouse/core/BatchProcessor$Listener.class */
    public interface Listener {
        void handleResponse(long j, TableRowsBuffer tableRowsBuffer, Response response);

        void handleExceptionWhenGettingResponse(long j, TableRowsBuffer tableRowsBuffer, Throwable th);
    }

    /* loaded from: input_file:cn/wjchang/common/clickhouse/core/BatchProcessor$TimeFlusher.class */
    class TimeFlusher implements Runnable {
        TimeFlusher() {
        }

        @Override // java.lang.Runnable
        public void run() {
            synchronized (BatchProcessor.this) {
                if (!BatchProcessor.this.closed && BatchProcessor.this.tableRowsBuffer.bufferSize() != 0) {
                    BatchProcessor.this.execute();
                }
            }
        }
    }

    public BatchProcessor(AsyncHttpClient asyncHttpClient, ClickHouseConfig clickHouseConfig, int i, String str, int i2, Listener listener) {
        this.client = asyncHttpClient;
        this.targetTable = str;
        this.maxFlushRows = i2;
        this.config = clickHouseConfig;
        this.flushIntervalSec = clickHouseConfig.getFlushInterval();
        this.listener = listener;
        this.tableRowsBuffer = new TableRowsBuffer(str);
        this.callbackService = new ThreadPoolExecutor(Math.max(Runtime.getRuntime().availableProcessors() / 4, 2), Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ChThreadFactory("writer-callback", i));
        this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, new ChThreadFactory((str != null ? "[" + str + "]" : "") + "timer-flusher", i));
        this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(new TimeFlusher(), this.flushIntervalSec, this.flushIntervalSec, TimeUnit.SECONDS);
    }

    public synchronized void add(String str) {
        ensureOpen();
        this.tableRowsBuffer.add(str);
        executeIfNeeded();
    }

    private void executeIfNeeded() {
        ensureOpen();
        if (isOverTheLimit()) {
            execute();
        }
    }

    private boolean isOverTheLimit() {
        return this.maxFlushRows != -1 && this.tableRowsBuffer.bufferSize() >= this.maxFlushRows;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void execute() {
        TableRowsBuffer tableRowsBuffer = this.tableRowsBuffer;
        long incrementAndGet = this.executionIdGen.incrementAndGet();
        this.tableRowsBuffer = new TableRowsBuffer(this.targetTable);
        Request buildRequest = buildRequest(tableRowsBuffer);
        logger.info("Ready to load data to {}, size = {}", tableRowsBuffer.getTable(), Integer.valueOf(tableRowsBuffer.getRows().size()));
        boolean z = false;
        try {
            ListenableFuture<Response> executeRequest = this.client.executeRequest(buildRequest);
            z = true;
            executeRequest.addListener(responseCallback(executeRequest, incrementAndGet, tableRowsBuffer), this.callbackService);
        } catch (Exception e) {
            if (z) {
                return;
            }
            this.listener.handleExceptionWhenGettingResponse(incrementAndGet, tableRowsBuffer, e);
        }
    }

    private Runnable responseCallback(ListenableFuture<Response> listenableFuture, long j, TableRowsBuffer tableRowsBuffer) {
        return () -> {
            try {
                this.listener.handleResponse(j, tableRowsBuffer, (Response) listenableFuture.get());
            } catch (Exception e) {
                logger.error("Error while executing callback", e);
                this.listener.handleExceptionWhenGettingResponse(j, tableRowsBuffer, e);
            }
        };
    }

    private Request buildRequest(TableRowsBuffer tableRowsBuffer) {
        BoundRequestBuilder body = this.client.preparePost(this.config.getConnectConfig().getRandomHostUrl()).setHeader(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=utf-8").setBody(String.format("INSERT INTO %s VALUES %s", tableRowsBuffer.getTable(), String.join(" , ", tableRowsBuffer.getRows())));
        body.setHeader(HttpHeaderNames.AUTHORIZATION, "Basic " + this.config.getConnectConfig().getCredentials());
        return body.build();
    }

    public synchronized void flush() {
        ensureOpen();
        if (this.tableRowsBuffer.bufferSize() > 0) {
            execute();
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (isOpen()) {
            this.closed = true;
            if (this.scheduledFuture != null) {
                cancel(this.scheduledFuture);
                this.scheduler.shutdown();
            }
            if (this.tableRowsBuffer.bufferSize() > 0) {
                execute();
            }
            if (this.callbackService != null) {
                this.callbackService.shutdown();
            }
        }
    }

    public static boolean cancel(Future<?> future) {
        if (future != null) {
            return future.cancel(false);
        }
        return false;
    }

    boolean isOpen() {
        return !this.closed;
    }

    protected void ensureOpen() {
        if (this.closed) {
            throw new IllegalStateException("batch process already closed");
        }
    }
}
