package io.debezium.connector.common;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:io/debezium/connector/common/BaseSourceTaskTest.class */
public class BaseSourceTaskTest {
    private final MyBaseSourceTask baseSourceTask = new MyBaseSourceTask();

    /* loaded from: input_file:io/debezium/connector/common/BaseSourceTaskTest$MyBaseSourceTask.class */
    public static class MyBaseSourceTask extends BaseSourceTask<Partition, OffsetContext> {
        final List<SourceRecord> records = new ArrayList();
        final AtomicInteger startCount = new AtomicInteger();
        final AtomicInteger stopCount = new AtomicInteger();
        final ChangeEventSourceCoordinator<Partition, OffsetContext> coordinator = (ChangeEventSourceCoordinator) Mockito.mock(ChangeEventSourceCoordinator.class);

        protected ChangeEventSourceCoordinator<Partition, OffsetContext> start(Configuration configuration) {
            this.startCount.incrementAndGet();
            return this.coordinator;
        }

        protected List<SourceRecord> doPoll() {
            return this.records;
        }

        protected void doStop() {
            this.stopCount.incrementAndGet();
        }

        protected Iterable<Field> getAllConfigurationFields() {
            return List.of(Field.create("f1"));
        }

        public String version() {
            return "1.0";
        }
    }

    @Before
    public void setup() {
        this.baseSourceTask.initialize((SourceTaskContext) Mockito.mock(SourceTaskContext.class));
    }

    @Test
    public void verifyTaskStartsAndStops() throws InterruptedException {
        this.baseSourceTask.start(new HashMap());
        Assert.assertEquals(BaseSourceTask.State.INITIAL, this.baseSourceTask.getTaskState());
        this.baseSourceTask.poll();
        Assert.assertEquals(BaseSourceTask.State.RUNNING, this.baseSourceTask.getTaskState());
        this.baseSourceTask.stop();
        Assert.assertEquals(BaseSourceTask.State.STOPPED, this.baseSourceTask.getTaskState());
        Assert.assertEquals(1L, this.baseSourceTask.startCount.get());
        Assert.assertEquals(1L, this.baseSourceTask.stopCount.get());
        ((ChangeEventSourceCoordinator) Mockito.verify(this.baseSourceTask.coordinator)).stop();
    }

    @Test
    public void verifyStartAndStopWithoutPolling() {
        this.baseSourceTask.initialize((SourceTaskContext) Mockito.mock(SourceTaskContext.class));
        this.baseSourceTask.start(new HashMap());
        Assert.assertEquals(BaseSourceTask.State.INITIAL, this.baseSourceTask.getTaskState());
        this.baseSourceTask.stop();
        Assert.assertEquals(BaseSourceTask.State.STOPPED, this.baseSourceTask.getTaskState());
        Assert.assertEquals(0L, this.baseSourceTask.startCount.get());
        Assert.assertEquals(1L, this.baseSourceTask.stopCount.get());
    }

    @Test
    public void verifyTaskCanBeStartedAfterStopped() throws InterruptedException {
        this.baseSourceTask.start(new HashMap());
        Assert.assertEquals(BaseSourceTask.State.INITIAL, this.baseSourceTask.getTaskState());
        this.baseSourceTask.poll();
        Assert.assertEquals(BaseSourceTask.State.RUNNING, this.baseSourceTask.getTaskState());
        this.baseSourceTask.stop();
        Assert.assertEquals(BaseSourceTask.State.STOPPED, this.baseSourceTask.getTaskState());
        this.baseSourceTask.start(new HashMap());
        Assert.assertEquals(BaseSourceTask.State.INITIAL, this.baseSourceTask.getTaskState());
        this.baseSourceTask.poll();
        Assert.assertEquals(BaseSourceTask.State.RUNNING, this.baseSourceTask.getTaskState());
        this.baseSourceTask.stop();
        Assert.assertEquals(BaseSourceTask.State.STOPPED, this.baseSourceTask.getTaskState());
        Assert.assertEquals(2L, this.baseSourceTask.startCount.get());
        Assert.assertEquals(2L, this.baseSourceTask.stopCount.get());
        ((ChangeEventSourceCoordinator) Mockito.verify(this.baseSourceTask.coordinator, Mockito.times(2))).stop();
    }

    @Test
    public void verifyTaskRestartsSuccessfully() throws InterruptedException {
        MyBaseSourceTask myBaseSourceTask = new MyBaseSourceTask() { // from class: io.debezium.connector.common.BaseSourceTaskTest.1
            @Override // io.debezium.connector.common.BaseSourceTaskTest.MyBaseSourceTask
            protected ChangeEventSourceCoordinator<Partition, OffsetContext> start(Configuration configuration) {
                ChangeEventSourceCoordinator<Partition, OffsetContext> start = super.start(configuration);
                if (this.startCount.get() < 3) {
                    throw new RetriableException("Retry " + this.startCount.get());
                }
                return start;
            }
        };
        myBaseSourceTask.initialize((SourceTaskContext) Mockito.mock(SourceTaskContext.class));
        myBaseSourceTask.start(Map.of(CommonConnectorConfig.RETRIABLE_RESTART_WAIT.name(), "1"));
        Assert.assertEquals(BaseSourceTask.State.INITIAL, myBaseSourceTask.getTaskState());
        pollAndIgnoreRetryException(myBaseSourceTask);
        Assert.assertEquals(BaseSourceTask.State.RESTARTING, myBaseSourceTask.getTaskState());
        sleep(100L);
        pollAndIgnoreRetryException(myBaseSourceTask);
        Assert.assertEquals(BaseSourceTask.State.RESTARTING, myBaseSourceTask.getTaskState());
        sleep(100L);
        myBaseSourceTask.poll();
        Assert.assertEquals(BaseSourceTask.State.RUNNING, myBaseSourceTask.getTaskState());
        myBaseSourceTask.stop();
        Assert.assertEquals(BaseSourceTask.State.STOPPED, myBaseSourceTask.getTaskState());
        Assert.assertEquals(3L, myBaseSourceTask.startCount.get());
        Assert.assertEquals(3L, myBaseSourceTask.stopCount.get());
        ((ChangeEventSourceCoordinator) Mockito.verify(myBaseSourceTask.coordinator, Mockito.times(1))).stop();
    }

    @Test
    public void verifyOutOfOrderPollDoesNotStartTask() throws InterruptedException {
        this.baseSourceTask.start(new HashMap());
        Assert.assertEquals(BaseSourceTask.State.INITIAL, this.baseSourceTask.getTaskState());
        this.baseSourceTask.stop();
        Assert.assertEquals(BaseSourceTask.State.STOPPED, this.baseSourceTask.getTaskState());
        this.baseSourceTask.poll();
        Assert.assertEquals(BaseSourceTask.State.STOPPED, this.baseSourceTask.getTaskState());
        Assert.assertEquals(0L, this.baseSourceTask.startCount.get());
        Assert.assertEquals(1L, this.baseSourceTask.stopCount.get());
    }

    private static void pollAndIgnoreRetryException(BaseSourceTask<Partition, OffsetContext> baseSourceTask) throws InterruptedException {
        try {
            baseSourceTask.poll();
        } catch (RetriableException e) {
        }
    }

    private static void sleep(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException();
        }
    }
}
