package com.starrocks.connector.spark.sql.write;

import com.starrocks.connector.spark.sql.conf.WriteStarRocksConfig;
import com.starrocks.connector.spark.sql.schema.CsvRowStringConverter;
import com.starrocks.connector.spark.sql.schema.JSONRowStringConverter;
import com.starrocks.connector.spark.sql.schema.RowStringConverter;
import com.starrocks.connector.spark.util.EnvUtils;
import com.starrocks.data.load.stream.StreamLoadManager;
import com.starrocks.data.load.stream.v2.StreamLoadManagerV2;
import java.io.IOException;
import java.io.Serializable;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/starrocks/connector/spark/sql/write/StarRocksDataWriter.class */
public class StarRocksDataWriter implements DataWriter<InternalRow>, Serializable {
    private static final Logger log = LoggerFactory.getLogger(StarRocksDataWriter.class);
    private final WriteStarRocksConfig config;
    private final int partitionId;
    private final long taskId;
    private final long epochId;
    private final RowStringConverter converter;
    private final StreamLoadManager manager;

    public StarRocksDataWriter(WriteStarRocksConfig writeStarRocksConfig, StructType structType, int i, long j, long j2) {
        this.config = writeStarRocksConfig;
        this.partitionId = i;
        this.taskId = j;
        this.epochId = j2;
        if ("csv".equalsIgnoreCase(writeStarRocksConfig.getFormat())) {
            this.converter = new CsvRowStringConverter(structType, writeStarRocksConfig.getColumnSeparator(), writeStarRocksConfig.getTimeZone());
        } else {
            if (!"json".equalsIgnoreCase(writeStarRocksConfig.getFormat())) {
                throw new RuntimeException("Unsupported format " + writeStarRocksConfig.getFormat());
            }
            this.converter = new JSONRowStringConverter(structType, writeStarRocksConfig.getStreamLoadColumnNames(), writeStarRocksConfig.getTimeZone());
        }
        this.manager = new StreamLoadManagerV2(writeStarRocksConfig.toStreamLoadProperties(), true);
    }

    public void open() {
        this.manager.init();
        log.info("Open data writer for partition: {}, task: {}, epoch: {}, {}", new Object[]{Integer.valueOf(this.partitionId), Long.valueOf(this.taskId), Long.valueOf(this.epochId), EnvUtils.getGitInformation()});
    }

    public void write(InternalRow internalRow) throws IOException {
        String fromRow = this.converter.fromRow(internalRow);
        this.manager.write(null, this.config.getDatabase(), this.config.getTable(), fromRow);
        log.debug("partitionId: {}, taskId: {}, epochId: {}, receive raw row: {}", new Object[]{Integer.valueOf(this.partitionId), Long.valueOf(this.taskId), Long.valueOf(this.epochId), internalRow});
        log.debug("partitionId: {}, taskId: {}, epochId: {}, receive converted row: {}", new Object[]{Integer.valueOf(this.partitionId), Long.valueOf(this.taskId), Long.valueOf(this.epochId), fromRow});
    }

    public WriterCommitMessage commit() throws IOException {
        log.info("partitionId: {}, taskId: {}, epochId: {} commit", new Object[]{Integer.valueOf(this.partitionId), Long.valueOf(this.taskId), Long.valueOf(this.epochId)});
        try {
            this.manager.flush();
            return new StarRocksWriterCommitMessage(this.partitionId, this.taskId, this.epochId, null);
        } catch (Exception e) {
            String format = String.format("Failed to commit, partitionId: %s, taskId: %s, epochId: %s", Integer.valueOf(this.partitionId), Long.valueOf(this.taskId), Long.valueOf(this.epochId));
            log.error("{}", format, e);
            throw new IOException(format, e);
        }
    }

    public void abort() throws IOException {
        log.info("partitionId: {}, taskId: {}, epochId: {} abort", new Object[]{Integer.valueOf(this.partitionId), Long.valueOf(this.taskId), Long.valueOf(this.epochId)});
        try {
            if (this.manager.abort(this.manager.snapshot())) {
            } else {
                throw new IOException("abort not successful");
            }
        } catch (Exception e) {
            String format = String.format("Failed to abort, partitionId: %s, taskId: %s, epochId: %s", Integer.valueOf(this.partitionId), Long.valueOf(this.taskId), Long.valueOf(this.epochId));
            log.error("{}", format, e);
            throw new IOException(format, e);
        }
    }

    public void close() throws IOException {
        log.info("partitionId: {}, taskId: {}, epochId: {} close", new Object[]{Integer.valueOf(this.partitionId), Long.valueOf(this.taskId), Long.valueOf(this.epochId)});
        this.manager.close();
    }
}
