package org.apache.kafka.connect.runtime;

import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.ThreadedTest;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;

@RunWith(PowerMockRunner.class)
/* loaded from: input_file:org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.class */
public class SourceTaskOffsetCommitterTest extends ThreadedTest {
    private final ConcurrentHashMap committers = new ConcurrentHashMap();

    @Mock
    private ScheduledExecutorService executor;

    @Mock
    private Logger mockLog;

    @Mock
    private ScheduledFuture commitFuture;

    @Mock
    private ScheduledFuture taskFuture;

    @Mock
    private ConnectorTaskId taskId;

    @Mock
    private WorkerSourceTask task;
    private SourceTaskOffsetCommitter committer;
    private static final long DEFAULT_OFFSET_COMMIT_INTERVAL_MS = 1000;

    @Override // org.apache.kafka.connect.util.ThreadedTest
    public void setup() {
        super.setup();
        HashMap hashMap = new HashMap();
        hashMap.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("internal.key.converter.schemas.enable", "false");
        hashMap.put("internal.value.converter.schemas.enable", "false");
        hashMap.put("offset.storage.file.filename", "/tmp/connect.offsets");
        hashMap.put("offset.flush.interval.ms", Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));
        this.committer = new SourceTaskOffsetCommitter(new StandaloneConfig(hashMap), this.executor, this.committers);
        Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", this.mockLog);
    }

    @Test
    public void testSchedule() {
        Capture newCapture = EasyMock.newCapture();
        EasyMock.expect(this.executor.scheduleWithFixedDelay((Runnable) EasyMock.capture(newCapture), EasyMock.eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), EasyMock.eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), (TimeUnit) EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn(this.commitFuture);
        PowerMock.replayAll(new Object[0]);
        this.committer.schedule(this.taskId, this.task);
        Assert.assertTrue(newCapture.hasCaptured());
        Assert.assertNotNull(newCapture.getValue());
        Assert.assertEquals(Collections.singletonMap(this.taskId, this.commitFuture), this.committers);
        PowerMock.verifyAll();
    }

    @Test
    public void testClose() throws Exception {
        this.executor.shutdown();
        PowerMock.expectLastCall();
        EasyMock.expect(Boolean.valueOf(this.executor.awaitTermination(EasyMock.eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), (TimeUnit) EasyMock.eq(TimeUnit.MILLISECONDS)))).andReturn(false);
        this.mockLog.error(EasyMock.anyString());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.committer.close(DEFAULT_OFFSET_COMMIT_INTERVAL_MS);
        PowerMock.verifyAll();
        PowerMock.resetAll(new Object[0]);
        this.executor.shutdown();
        PowerMock.expectLastCall();
        EasyMock.expect(Boolean.valueOf(this.executor.awaitTermination(EasyMock.eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), (TimeUnit) EasyMock.eq(TimeUnit.MILLISECONDS)))).andThrow(new InterruptedException());
        PowerMock.replayAll(new Object[0]);
        this.committer.close(DEFAULT_OFFSET_COMMIT_INTERVAL_MS);
        PowerMock.verifyAll();
    }

    @Test
    public void testRemove() throws Exception {
        PowerMock.replayAll(new Object[0]);
        Assert.assertTrue(this.committers.isEmpty());
        this.committer.remove(this.taskId);
        Assert.assertTrue(this.committers.isEmpty());
        PowerMock.verifyAll();
        PowerMock.resetAll(new Object[0]);
        EasyMock.expect(Boolean.valueOf(this.taskFuture.cancel(EasyMock.eq(false)))).andReturn(false);
        EasyMock.expect(Boolean.valueOf(this.taskFuture.isDone())).andReturn(false);
        EasyMock.expect(this.taskFuture.get()).andReturn((Object) null);
        PowerMock.replayAll(new Object[0]);
        this.committers.put(this.taskId, this.taskFuture);
        this.committer.remove(this.taskId);
        Assert.assertTrue(this.committers.isEmpty());
        PowerMock.verifyAll();
        PowerMock.resetAll(new Object[0]);
        EasyMock.expect(Boolean.valueOf(this.taskFuture.cancel(EasyMock.eq(false)))).andReturn(false);
        EasyMock.expect(Boolean.valueOf(this.taskFuture.isDone())).andReturn(false);
        EasyMock.expect(this.taskFuture.get()).andThrow(new CancellationException());
        this.mockLog.trace(EasyMock.anyString(), EasyMock.anyObject());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.committers.put(this.taskId, this.taskFuture);
        this.committer.remove(this.taskId);
        Assert.assertTrue(this.committers.isEmpty());
        PowerMock.verifyAll();
        PowerMock.resetAll(new Object[0]);
        EasyMock.expect(Boolean.valueOf(this.taskFuture.cancel(EasyMock.eq(false)))).andReturn(false);
        EasyMock.expect(Boolean.valueOf(this.taskFuture.isDone())).andReturn(false);
        EasyMock.expect(this.taskFuture.get()).andThrow(new InterruptedException());
        PowerMock.replayAll(new Object[0]);
        try {
            this.committers.put(this.taskId, this.taskFuture);
            this.committer.remove(this.taskId);
            Assert.fail("Expected ConnectException to be raised");
        } catch (ConnectException e) {
        }
        PowerMock.verifyAll();
    }
}
