package com.alibaba.ververica.connectors.common.sink;

import java.io.IOException;
import java.util.List;
import org.apache.commons.pool2.impl.BaseObjectPoolConfig;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/ververica/connectors/common/sink/TupleOutputFormatSinkFunction.class */
public class TupleOutputFormatSinkFunction<RECORD> extends RichSinkFunction<Tuple2<Boolean, RECORD>> implements ListCheckpointed<byte[]> {
    private static final Logger LOG = LoggerFactory.getLogger(TupleOutputFormatSinkFunction.class);
    private static final long RETRY_INTERVAL = 100;
    private OutputFormat<Tuple2<Boolean, RECORD>> outputFormat;
    private long retryTimeout = BaseObjectPoolConfig.DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS;

    public TupleOutputFormatSinkFunction(OutputFormat<Tuple2<Boolean, RECORD>> outputFormat) {
        this.outputFormat = outputFormat;
    }

    public void open(Configuration configuration) throws IOException {
        if (RichOutputFormat.class.isAssignableFrom(this.outputFormat.getClass())) {
            this.outputFormat.setRuntimeContext(getRuntimeContext());
        }
        this.outputFormat.configure(configuration);
        this.outputFormat.open(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
        if (this.outputFormat instanceof HasRetryTimeout) {
            this.retryTimeout = this.outputFormat.getRetryTimeout();
        }
        LOG.info("Initialized OutputFormatSinkFunction of {}/{} task.", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), Integer.valueOf(getRuntimeContext().getNumberOfParallelSubtasks()));
    }

    public void close() throws IOException {
        LOG.info("Closing OutputFormatSinkFunction.");
        this.outputFormat.close();
    }

    public void invoke(Tuple2<Boolean, RECORD> tuple2) throws Exception {
        this.outputFormat.writeRecord(tuple2);
    }

    public OutputFormat<Tuple2<Boolean, RECORD>> getOutputFormat() {
        return this.outputFormat;
    }

    public List<byte[]> snapshotState(long j, long j2) throws Exception {
        if (!(this.outputFormat instanceof Syncable)) {
            return null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        do {
            try {
                this.outputFormat.sync();
                return null;
            } catch (IOException e) {
                LOG.error("Sync output format failed", e);
                try {
                    Thread.sleep(RETRY_INTERVAL);
                } catch (InterruptedException e2) {
                }
            }
        } while (System.currentTimeMillis() - currentTimeMillis <= this.retryTimeout);
        throw new IOException("Output retry timeout.");
    }

    public void restoreState(List<byte[]> list) throws Exception {
    }

    public String toString() {
        return getClass().getSimpleName() + ":" + this.outputFormat.toString();
    }
}
