package org.apache.paimon.flink.sink;

import java.time.Duration;
import java.util.HashMap;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.utils.FailingFileIO;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/paimon/flink/sink/SinkSavepointITCase.class */
public class SinkSavepointITCase extends AbstractTestBase {
    private String path;
    private String failingName;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.paimon.flink.sink.SinkSavepointITCase$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/paimon/flink/sink/SinkSavepointITCase$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$types$RowKind = new int[RowKind.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.INSERT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.UPDATE_AFTER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.DELETE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.UPDATE_BEFORE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    @BeforeEach
    public void before() throws Exception {
        this.path = getTempDirPath();
        this.failingName = UUID.randomUUID().toString();
    }

    @Timeout(180)
    @Test
    public void testRecoverFromSavepoint() throws Exception {
        String failingPath = FailingFileIO.getFailingPath(this.failingName, this.path);
        String str = null;
        if (ThreadLocalRandom.current().nextBoolean()) {
            FailingFileIO.reset(this.failingName, 100, 500);
        } else {
            FailingFileIO.reset(this.failingName, 0, 1);
        }
        while (true) {
            JobClient runRecoverFromSavepointJob = runRecoverFromSavepointJob(failingPath, str);
            while (true) {
                Thread.sleep(r0.nextInt(5000));
                if (runRecoverFromSavepointJob.getJobStatus().get() == JobStatus.FINISHED) {
                    checkRecoverFromSavepointBatchResult();
                    checkRecoverFromSavepointStreamingResult();
                    return;
                }
                try {
                    str = (String) runRecoverFromSavepointJob.stopWithSavepoint(false, this.path + "/savepoint", SavepointFormatType.DEFAULT).get();
                    break;
                } catch (Exception e) {
                    Optional findThrowable = ExceptionUtils.findThrowable(e, StopWithSavepointStoppingException.class);
                    if (findThrowable.isPresent()) {
                        str = ((StopWithSavepointStoppingException) findThrowable.get()).getSavepointPath();
                        break;
                    }
                }
            }
            while (!((JobStatus) runRecoverFromSavepointJob.getJobStatus().get()).isGloballyTerminalState()) {
                Thread.sleep(1000L);
            }
        }
    }

    private JobClient runRecoverFromSavepointJob(String str, String str2) throws Exception {
        Configuration configuration = new Configuration();
        if (str2 != null) {
            SavepointRestoreSettings.toConfiguration(SavepointRestoreSettings.forPath(str2, false), configuration);
        }
        StreamTableEnvironment create = StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment(configuration), EnvironmentSettings.newInstance().inStreamingMode().build());
        create.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMillis(500L));
        create.getConfig().getConfiguration().set(StateBackendOptions.STATE_BACKEND, "filesystem");
        create.getConfig().getConfiguration().set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://" + this.path + "/checkpoint");
        create.getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
        String join = String.join("\n", "CREATE CATALOG my_catalog WITH (", "  'type' = 'paimon',", "  'warehouse' = '" + str + "'", ")");
        FailingFileIO.retryArtificialException(() -> {
            return create.executeSql(join);
        });
        create.executeSql("USE CATALOG my_catalog");
        create.executeSql(String.join("\n", "CREATE TEMPORARY TABLE S (", "  a INT", ") WITH (", "  'connector' = 'datagen',", "  'rows-per-second' = '10000',", "  'fields.a.kind' = 'sequence',", "  'fields.a.start' = '0',", "  'fields.a.end' = '99999'", ")"));
        String join2 = String.join("\n", "CREATE TABLE IF NOT EXISTS T (", "  k INT,", "  v INT,", "  PRIMARY KEY (k) NOT ENFORCED", ") WITH (", "  'bucket' = '4',", "  'file.format' = 'avro',", "  'changelog-producer' = 'full-compaction',", "  'full-compaction.delta-commits' = '3'", ")");
        FailingFileIO.retryArtificialException(() -> {
            return create.executeSql(join2);
        });
        String format = String.format("INSERT INTO T /*+ OPTIONS('sink.parallelism' = '%d') */ SELECT (a %% 15000) AS k, a AS v FROM S", Integer.valueOf(ThreadLocalRandom.current().nextInt(3) + 2));
        JobClient jobClient = (JobClient) ((TableResult) FailingFileIO.retryArtificialException(() -> {
            return create.executeSql(format);
        })).getJobClient().get();
        while (jobClient.getJobStatus().get() == JobStatus.INITIALIZING) {
            Thread.sleep(1000L);
        }
        return jobClient;
    }

    private void checkRecoverFromSavepointBatchResult() throws Exception {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
        create.executeSql(String.join("\n", "CREATE CATALOG my_catalog WITH (", "  'type' = 'paimon',", "  'warehouse' = '" + this.path + "'", ")"));
        create.executeSql("USE CATALOG my_catalog");
        HashMap hashMap = new HashMap();
        for (int i = 0; i < 100000; i++) {
            hashMap.put(Integer.valueOf(i % 15000), Integer.valueOf(i));
        }
        HashMap hashMap2 = new HashMap();
        CloseableIterator collect = create.executeSql("SELECT * FROM T").collect();
        Throwable th = null;
        while (collect.hasNext()) {
            try {
                try {
                    Row row = (Row) collect.next();
                    Assertions.assertThat(row.getArity()).isEqualTo(2);
                    hashMap2.put((Integer) row.getField(0), (Integer) row.getField(1));
                } catch (Throwable th2) {
                    if (collect != null) {
                        if (th != null) {
                            try {
                                collect.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            collect.close();
                        }
                    }
                    throw th2;
                }
            } finally {
            }
        }
        if (collect != null) {
            if (0 != 0) {
                try {
                    collect.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                collect.close();
            }
        }
        Assertions.assertThat(hashMap2).isEqualTo(hashMap);
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:11:0x00fa. Please report as an issue. */
    /* JADX WARN: Removed duplicated region for block: B:15:0x0187 A[Catch: Throwable -> 0x01c6, all -> 0x01cf, TryCatch #2 {Throwable -> 0x01c6, blocks: (B:8:0x00af, B:10:0x00b9, B:11:0x00fa, B:12:0x0118, B:13:0x0178, B:15:0x0187, B:16:0x018a, B:34:0x013b, B:36:0x0159, B:37:0x0177), top: B:7:0x00af, outer: #3 }] */
    /* JADX WARN: Removed duplicated region for block: B:23:0x01a1  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void checkRecoverFromSavepointStreamingResult() throws java.lang.Exception {
        /*
            Method dump skipped, instructions count: 518
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.paimon.flink.sink.SinkSavepointITCase.checkRecoverFromSavepointStreamingResult():void");
    }
}
