package cn.wjchang.common.clickhouse;

import cn.wjchang.common.clickhouse.common.ClickHouseConfig;
import cn.wjchang.common.clickhouse.common.ClickhouseConstants;
import cn.wjchang.common.clickhouse.core.BatchProcessor;
import cn.wjchang.common.clickhouse.core.TableRowsBuffer;
import cn.wjchang.common.clickhouse.core.clickhouseRowCollector;
import cn.wjchang.common.clickhouse.exception.ClickhouseException;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/wjchang/common/clickhouse/ClickhouseSink.class */
public class ClickhouseSink extends RichSinkFunction<String> implements CheckpointedFunction {
    private static final long serialVersionUID = 8882341085011937977L;
    private static final Logger logger = LoggerFactory.getLogger(ClickhouseSink.class);
    private final Properties props;
    private final ClickHouseConfig config;
    private transient AsyncHttpClient client;
    private transient BatchProcessor batchProcessor;
    private transient clickhouseRowCollector clickhouseRowCollector;
    private AtomicLong numPendingRows = new AtomicLong(0);
    private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
    private boolean flushOnCheckpoint = true;

    /* loaded from: input_file:cn/wjchang/common/clickhouse/ClickhouseSink$BatchProcessorListener.class */
    private class BatchProcessorListener implements BatchProcessor.Listener {
        private static final int HTTP_OK = 200;

        private BatchProcessorListener() {
        }

        @Override // cn.wjchang.common.clickhouse.core.BatchProcessor.Listener
        public void handleResponse(long j, TableRowsBuffer tableRowsBuffer, Response response) {
            if (response.getStatusCode() != HTTP_OK) {
                String responseBody = response.getResponseBody();
                ClickhouseSink.logger.error("Failed to send data to ClickHouse,  ClickHouse response = {}. ", responseBody);
                ClickhouseSink.this.failureThrowable.compareAndSet(null, new ClickhouseException(responseBody));
            } else {
                ClickhouseSink.logger.info("Successful send data to ClickHouse, batch size = {}, target table = {}", Integer.valueOf(tableRowsBuffer.bufferSize()), tableRowsBuffer.getTable());
            }
            if (ClickhouseSink.this.flushOnCheckpoint) {
                ClickhouseSink.this.numPendingRows.getAndAdd(-tableRowsBuffer.bufferSize());
            }
        }

        @Override // cn.wjchang.common.clickhouse.core.BatchProcessor.Listener
        public void handleExceptionWhenGettingResponse(long j, TableRowsBuffer tableRowsBuffer, Throwable th) {
            ClickhouseSink.logger.error("Failed to send data to ClickHouse: {}", th.getMessage(), th.getCause());
            ClickhouseSink.this.failureThrowable.compareAndSet(null, new ClickhouseException(th.getMessage(), th.getCause()));
            if (ClickhouseSink.this.flushOnCheckpoint) {
                ClickhouseSink.this.numPendingRows.getAndAdd(-tableRowsBuffer.bufferSize());
            }
        }
    }

    public ClickhouseSink(Properties properties) {
        this.props = properties;
        this.config = new ClickHouseConfig(properties);
    }

    public void open(Configuration configuration) throws Exception {
        this.client = Dsl.asyncHttpClient();
        this.batchProcessor = new BatchProcessor(this.client, this.config, getRuntimeContext().getIndexOfThisSubtask(), this.props.getProperty(ClickhouseConstants.TARGET_TABLE_NAME, ""), Integer.parseInt(this.props.getProperty(ClickhouseConstants.BATCH_SIZE, "10000")), new BatchProcessorListener());
        this.clickhouseRowCollector = new clickhouseRowCollector(this.batchProcessor, this.flushOnCheckpoint, this.numPendingRows);
    }

    public void invoke(String str, SinkFunction.Context context) throws Exception {
        checkErrorAndRethrow();
        this.clickhouseRowCollector.collect(str);
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        checkErrorAndRethrow();
        if (this.flushOnCheckpoint) {
            while (this.numPendingRows.get() != 0) {
                this.batchProcessor.flush();
                checkErrorAndRethrow();
            }
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    }

    public void close() throws Exception {
        if (this.batchProcessor != null) {
            this.batchProcessor.close();
            this.batchProcessor = null;
        }
        if (this.client != null) {
            this.client.close();
            this.client = null;
        }
        checkErrorAndRethrow();
    }

    private void checkErrorAndRethrow() {
        Throwable th = this.failureThrowable.get();
        if (th != null) {
            throw new RuntimeException("An error occurred in ClickhouseSink.", th);
        }
    }
}
