package org.apache.kafka.connect.runtime;

import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.StrictStubs.class)
/* loaded from: input_file:org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.class */
public class SourceTaskOffsetCommitterTest {
    private final ConcurrentHashMap<ConnectorTaskId, ScheduledFuture<?>> committers = new ConcurrentHashMap<>();

    @Mock
    private ScheduledExecutorService executor;

    @Mock
    private ScheduledFuture<?> commitFuture;

    @Mock
    private ScheduledFuture<?> taskFuture;

    @Mock
    private ConnectorTaskId taskId;

    @Mock
    private WorkerSourceTask task;
    private SourceTaskOffsetCommitter committer;
    private static final long DEFAULT_OFFSET_COMMIT_INTERVAL_MS = 1000;

    @Before
    public void setup() {
        HashMap hashMap = new HashMap();
        hashMap.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("offset.storage.file.filename", "/tmp/connect.offsets");
        hashMap.put("offset.flush.interval.ms", Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));
        this.committer = new SourceTaskOffsetCommitter(new StandaloneConfig(hashMap), this.executor, this.committers);
    }

    @Test
    public void testSchedule() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Runnable.class);
        Mockito.when(this.executor.scheduleWithFixedDelay((Runnable) forClass.capture(), ArgumentMatchers.eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), ArgumentMatchers.eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), (TimeUnit) ArgumentMatchers.eq(TimeUnit.MILLISECONDS))).thenReturn(this.commitFuture);
        this.committer.schedule(this.taskId, this.task);
        Assert.assertNotNull(forClass.getValue());
        Assert.assertEquals(Collections.singletonMap(this.taskId, this.commitFuture), this.committers);
    }

    @Test
    public void testCloseTimeout() throws Exception {
        Mockito.when(Boolean.valueOf(this.executor.awaitTermination(DEFAULT_OFFSET_COMMIT_INTERVAL_MS, TimeUnit.MILLISECONDS))).thenReturn(false);
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(ThreadUtils.class);
        Throwable th = null;
        try {
            try {
                this.committer.close(DEFAULT_OFFSET_COMMIT_INTERVAL_MS);
                Assert.assertTrue(createAndRegister.getEvents().stream().anyMatch(event -> {
                    return event.getLevel().equals("ERROR");
                }));
                if (createAndRegister != null) {
                    if (0 != 0) {
                        try {
                            createAndRegister.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createAndRegister.close();
                    }
                }
                ((ScheduledExecutorService) Mockito.verify(this.executor)).shutdown();
            } finally {
            }
        } catch (Throwable th3) {
            if (createAndRegister != null) {
                if (th != null) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testCloseInterrupted() throws InterruptedException {
        Mockito.when(Boolean.valueOf(this.executor.awaitTermination(DEFAULT_OFFSET_COMMIT_INTERVAL_MS, TimeUnit.MILLISECONDS))).thenThrow(new Throwable[]{new InterruptedException()});
        this.committer.close(DEFAULT_OFFSET_COMMIT_INTERVAL_MS);
        ((ScheduledExecutorService) Mockito.verify(this.executor)).shutdown();
    }

    @Test
    public void testRemoveNonExistentTask() {
        Assert.assertTrue(this.committers.isEmpty());
        this.committer.remove(this.taskId);
        Assert.assertTrue(this.committers.isEmpty());
    }

    @Test
    public void testRemoveSuccess() {
        expectRemove();
        this.committers.put(this.taskId, this.taskFuture);
        this.committer.remove(this.taskId);
        Assert.assertTrue(this.committers.isEmpty());
    }

    @Test
    public void testRemoveCancelledTask() throws ExecutionException, InterruptedException {
        expectRemove();
        Mockito.when(this.taskFuture.get()).thenThrow(new Throwable[]{new CancellationException()});
        this.committers.put(this.taskId, this.taskFuture);
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(SourceTaskOffsetCommitter.class);
        Throwable th = null;
        try {
            LogCaptureAppender.setClassLoggerToTrace(SourceTaskOffsetCommitter.class);
            this.committer.remove(this.taskId);
            Assert.assertTrue(createAndRegister.getEvents().stream().anyMatch(event -> {
                return event.getLevel().equals("TRACE");
            }));
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            Assert.assertTrue(this.committers.isEmpty());
        } catch (Throwable th3) {
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testRemoveTaskAndInterrupted() throws ExecutionException, InterruptedException {
        expectRemove();
        Mockito.when(this.taskFuture.get()).thenThrow(new Throwable[]{new InterruptedException()});
        this.committers.put(this.taskId, this.taskFuture);
        Assert.assertThrows(ConnectException.class, () -> {
            this.committer.remove(this.taskId);
        });
    }

    private void expectRemove() {
        Mockito.when(Boolean.valueOf(this.taskFuture.cancel(false))).thenReturn(false);
        Mockito.when(Boolean.valueOf(this.taskFuture.isDone())).thenReturn(false);
        Mockito.when(this.taskId.connector()).thenReturn("MyConnector");
        Mockito.when(Integer.valueOf(this.taskId.task())).thenReturn(1);
    }
}
