package org.apache.kafka.connect.runtime.distributed;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocol;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@PrepareForTest({DistributedHerder.class, Plugins.class})
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*"})
/* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.class */
public class DistributedHerderTest {
    private static final Map<String, String> HERDER_CONFIG = new HashMap();
    private static final String MEMBER_URL = "memberUrl";
    private static final String CONN1 = "sourceA";
    private static final String CONN2 = "sourceB";
    private static final ConnectorTaskId TASK0;
    private static final ConnectorTaskId TASK1;
    private static final ConnectorTaskId TASK2;
    private static final Integer MAX_TASKS;
    private static final Map<String, String> CONN1_CONFIG;
    private static final Map<String, String> CONN1_CONFIG_UPDATED;
    private static final Map<String, String> CONN2_CONFIG;
    private static final Map<String, String> TASK_CONFIG;
    private static final List<Map<String, String>> TASK_CONFIGS;
    private static final HashMap<ConnectorTaskId, Map<String, String>> TASK_CONFIGS_MAP;
    private static final ClusterConfigState SNAPSHOT;
    private static final ClusterConfigState SNAPSHOT_PAUSED_CONN1;
    private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG;
    private static final String WORKER_ID = "localhost:8083";

    @Mock
    private ConfigBackingStore configBackingStore;

    @Mock
    private StatusBackingStore statusBackingStore;

    @Mock
    private WorkerGroupMember member;
    private MockTime time;
    private DistributedHerder herder;

    @Mock
    private Worker worker;

    @Mock
    private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback;

    @Mock
    private Plugins plugins;

    @Mock
    private PluginClassLoader pluginLoader;

    @Mock
    private DelegatingClassLoader delegatingLoader;
    private ConfigBackingStore.UpdateListener configUpdateListener;
    private WorkerRebalanceListener rebalanceListener;

    /* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/DistributedHerderTest$BogusSourceConnector.class */
    private abstract class BogusSourceConnector extends SourceConnector {
        private BogusSourceConnector() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/DistributedHerderTest$BogusSourceTask.class */
    private abstract class BogusSourceTask extends SourceTask {
        private BogusSourceTask() {
        }
    }

    @Before
    public void setUp() throws Exception {
        this.worker = (Worker) PowerMock.createMock(Worker.class);
        EasyMock.expect(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).andStubReturn(Boolean.FALSE);
        this.time = new MockTime();
        this.herder = (DistributedHerder) PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff", "updateDeletedConnectorStatus"}, new Object[]{new DistributedConfig(HERDER_CONFIG), this.worker, WORKER_ID, this.statusBackingStore, this.configBackingStore, this.member, MEMBER_URL, this.time});
        DistributedHerder distributedHerder = this.herder;
        distributedHerder.getClass();
        this.configUpdateListener = new DistributedHerder.ConfigUpdateListener(distributedHerder);
        DistributedHerder distributedHerder2 = this.herder;
        distributedHerder2.getClass();
        this.rebalanceListener = new DistributedHerder.RebalanceListener(distributedHerder2);
        this.plugins = (Plugins) PowerMock.createMock(Plugins.class);
        this.pluginLoader = (PluginClassLoader) PowerMock.createMock(PluginClassLoader.class);
        this.delegatingLoader = (DelegatingClassLoader) PowerMock.createMock(DelegatingClassLoader.class);
        PowerMock.mockStatic(Plugins.class);
        PowerMock.expectPrivate(this.herder, "updateDeletedConnectorStatus", new Object[0]).andVoid().anyTimes();
    }

    @Test
    public void testJoinAssignment() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins);
        expectRebalance(1L, Arrays.asList(CONN1), Arrays.asList(TASK1));
        expectPostRebalanceCatchup(SNAPSHOT);
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (ConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.worker.isRunning(CONN1))).andReturn(true);
        EasyMock.expect(this.worker.connectorTaskConfigs(CONN1, MAX_TASKS.intValue(), (List) null)).andReturn(TASK_CONFIGS);
        this.worker.startTask((ConnectorTaskId) EasyMock.eq(TASK1), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testRebalance() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins);
        expectRebalance(1L, Arrays.asList(CONN1), Arrays.asList(TASK1));
        expectPostRebalanceCatchup(SNAPSHOT);
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (ConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.worker.isRunning(CONN1))).andReturn(true);
        EasyMock.expect(this.worker.connectorTaskConfigs(CONN1, MAX_TASKS.intValue(), (List) null)).andReturn(TASK_CONFIGS);
        this.worker.startTask((ConnectorTaskId) EasyMock.eq(TASK1), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins);
        expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1), (short) 0, 1L, Arrays.asList(CONN1), Arrays.asList(new ConnectorTaskId[0]));
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (ConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.worker.isRunning(CONN1))).andReturn(true);
        EasyMock.expect(this.worker.connectorTaskConfigs(CONN1, MAX_TASKS.intValue(), (List) null)).andReturn(TASK_CONFIGS);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testRebalanceFailedConnector() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins);
        expectRebalance(1L, Arrays.asList(CONN1), Arrays.asList(TASK1));
        expectPostRebalanceCatchup(SNAPSHOT);
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (ConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.worker.isRunning(CONN1))).andReturn(true);
        EasyMock.expect(this.worker.connectorTaskConfigs(CONN1, MAX_TASKS.intValue(), (List) null)).andReturn(TASK_CONFIGS);
        this.worker.startTask((ConnectorTaskId) EasyMock.eq(TASK1), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1), (short) 0, 1L, Arrays.asList(CONN1), Arrays.asList(new ConnectorTaskId[0]));
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (ConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.worker.isRunning(CONN1))).andReturn(false);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testHaltCleansUpWorker() {
        EasyMock.expect(this.worker.connectorNames()).andReturn(Collections.singleton(CONN1));
        this.worker.stopConnector(CONN1);
        PowerMock.expectLastCall().andReturn(true);
        EasyMock.expect(this.worker.taskIds()).andReturn(Collections.singleton(TASK1));
        this.worker.stopAndAwaitTask(TASK1);
        PowerMock.expectLastCall();
        this.member.stop();
        PowerMock.expectLastCall();
        this.configBackingStore.stop();
        PowerMock.expectLastCall();
        this.statusBackingStore.stop();
        PowerMock.expectLastCall();
        this.worker.stop();
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.halt();
        PowerMock.verifyAll();
    }

    @Test
    public void testCreateConnector() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectPostRebalanceCatchup(SNAPSHOT);
        this.member.wakeup();
        PowerMock.expectLastCall();
        Connector connector = (Connector) PowerMock.createMock(Connector.class);
        EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins).times(3);
        EasyMock.expect(this.plugins.compareAndSwapLoaders(connector)).andReturn(this.delegatingLoader);
        EasyMock.expect(this.plugins.newConnector(EasyMock.anyString())).andReturn(connector);
        EasyMock.expect(connector.config()).andReturn(new ConfigDef());
        EasyMock.expect(connector.validate(CONN2_CONFIG)).andReturn(new Config(Collections.emptyList()));
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.delegatingLoader)).andReturn(this.pluginLoader);
        this.configBackingStore.putConnectorConfig(CONN2, CONN2_CONFIG);
        PowerMock.expectLastCall();
        this.putConnectorCallback.onCompletion((Throwable) null, new Herder.Created(true, new ConnectorInfo(CONN2, CONN2_CONFIG, Collections.emptyList())));
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.putConnectorConfig(CONN2, CONN2_CONFIG, false, this.putConnectorCallback);
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testCreateConnectorFailedBasicValidation() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectPostRebalanceCatchup(SNAPSHOT);
        HashMap hashMap = new HashMap(CONN2_CONFIG);
        hashMap.remove("name");
        this.member.wakeup();
        PowerMock.expectLastCall();
        Connector connector = (Connector) PowerMock.createMock(Connector.class);
        EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins).times(3);
        EasyMock.expect(this.plugins.compareAndSwapLoaders(connector)).andReturn(this.delegatingLoader);
        EasyMock.expect(this.plugins.newConnector(EasyMock.anyString())).andReturn(connector);
        EasyMock.expect(connector.config()).andStubReturn(new ConfigDef());
        EasyMock.expect(connector.validate(hashMap)).andReturn(new Config(Collections.singletonList(new ConfigValue("foo.bar"))));
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.delegatingLoader)).andReturn(this.pluginLoader);
        Capture newCapture = EasyMock.newCapture();
        this.putConnectorCallback.onCompletion((Throwable) EasyMock.capture(newCapture), EasyMock.isNull());
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.putConnectorConfig(CONN2, hashMap, false, this.putConnectorCallback);
        this.herder.tick();
        Assert.assertTrue(newCapture.hasCaptured());
        Assert.assertTrue(newCapture.getValue() instanceof BadRequestException);
        PowerMock.verifyAll();
    }

    @Test
    public void testCreateConnectorFailedCustomValidation() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectPostRebalanceCatchup(SNAPSHOT);
        this.member.wakeup();
        PowerMock.expectLastCall();
        Connector connector = (Connector) PowerMock.createMock(Connector.class);
        EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins).times(3);
        EasyMock.expect(this.plugins.compareAndSwapLoaders(connector)).andReturn(this.delegatingLoader);
        EasyMock.expect(this.plugins.newConnector(EasyMock.anyString())).andReturn(connector);
        ConfigDef configDef = new ConfigDef();
        configDef.define("foo.bar", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "foo.bar doc");
        EasyMock.expect(connector.config()).andReturn(configDef);
        ConfigValue configValue = new ConfigValue("foo.bar");
        configValue.addErrorMessage("Failed foo.bar validation");
        EasyMock.expect(connector.validate(CONN2_CONFIG)).andReturn(new Config(Collections.singletonList(configValue)));
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.delegatingLoader)).andReturn(this.pluginLoader);
        Capture newCapture = EasyMock.newCapture();
        this.putConnectorCallback.onCompletion((Throwable) EasyMock.capture(newCapture), EasyMock.isNull());
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.putConnectorConfig(CONN2, CONN2_CONFIG, false, this.putConnectorCallback);
        this.herder.tick();
        Assert.assertTrue(newCapture.hasCaptured());
        Assert.assertTrue(newCapture.getValue() instanceof BadRequestException);
        PowerMock.verifyAll();
    }

    @Test
    public void testConnectorNameConflictsWithWorkerGroupId() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectPostRebalanceCatchup(SNAPSHOT);
        this.member.wakeup();
        PowerMock.expectLastCall();
        HashMap hashMap = new HashMap(CONN2_CONFIG);
        hashMap.put("name", "test-group");
        Connector connector = (Connector) PowerMock.createMock(SinkConnector.class);
        EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins).times(3);
        EasyMock.expect(this.plugins.compareAndSwapLoaders(connector)).andReturn(this.delegatingLoader);
        EasyMock.expect(this.plugins.newConnector(EasyMock.anyString())).andReturn(connector);
        EasyMock.expect(connector.config()).andReturn(new ConfigDef());
        EasyMock.expect(connector.validate(hashMap)).andReturn(new Config(Collections.emptyList()));
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.delegatingLoader)).andReturn(this.pluginLoader);
        Capture newCapture = EasyMock.newCapture();
        this.putConnectorCallback.onCompletion((Throwable) EasyMock.capture(newCapture), EasyMock.isNull(Herder.Created.class));
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.putConnectorConfig(CONN2, hashMap, false, this.putConnectorCallback);
        this.herder.tick();
        Assert.assertTrue(newCapture.hasCaptured());
        Assert.assertTrue(newCapture.getValue() instanceof BadRequestException);
        PowerMock.verifyAll();
    }

    @Test
    public void testCreateConnectorAlreadyExists() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectPostRebalanceCatchup(SNAPSHOT);
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.putConnectorCallback.onCompletion((Throwable) EasyMock.anyObject(), EasyMock.isNull());
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.putConnectorConfig(CONN1, CONN1_CONFIG, false, this.putConnectorCallback);
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testDestroyConnector() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins);
        expectRebalance(1L, Arrays.asList(CONN1), Collections.emptyList());
        expectPostRebalanceCatchup(SNAPSHOT);
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (ConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.worker.isRunning(CONN1))).andReturn(true);
        EasyMock.expect(this.worker.connectorTaskConfigs(CONN1, MAX_TASKS.intValue(), (List) null)).andReturn(TASK_CONFIGS);
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.configBackingStore.removeConnectorConfig(CONN1);
        PowerMock.expectLastCall();
        this.putConnectorCallback.onCompletion((Throwable) null, new Herder.Created(false, (Object) null));
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.deleteConnectorConfig(CONN1, this.putConnectorCallback);
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testRestartConnector() throws Exception {
        EasyMock.expect(this.worker.connectorTaskConfigs(CONN1, MAX_TASKS.intValue(), (List) null)).andStubReturn(TASK_CONFIGS);
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins);
        expectRebalance(1L, Collections.singletonList(CONN1), Collections.emptyList());
        expectPostRebalanceCatchup(SNAPSHOT);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (ConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.worker.isRunning(CONN1))).andReturn(true);
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.worker.stopConnector(CONN1);
        PowerMock.expectLastCall().andReturn(true);
        EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins);
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (ConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.worker.isRunning(CONN1))).andReturn(true);
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnector(CONN1, futureCallback);
        this.herder.tick();
        futureCallback.get(1000L, TimeUnit.MILLISECONDS);
        PowerMock.verifyAll();
    }

    @Test
    public void testRestartUnknownConnector() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectPostRebalanceCatchup(SNAPSHOT);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnector(CONN2, futureCallback);
        this.herder.tick();
        try {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
            Assert.fail("Expected NotLeaderException to be raised");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof NotFoundException);
        }
        PowerMock.verifyAll();
    }

    @Test
    public void testRestartConnectorRedirectToLeader() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectPostRebalanceCatchup(SNAPSHOT);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnector(CONN1, futureCallback);
        this.herder.tick();
        try {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
            Assert.fail("Expected NotLeaderException to be raised");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof NotLeaderException);
        }
        PowerMock.verifyAll();
    }

    @Test
    public void testRestartConnectorRedirectToOwner() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectPostRebalanceCatchup(SNAPSHOT);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        EasyMock.expect(this.member.ownerUrl(CONN1)).andReturn("ownerUrl");
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnector(CONN1, futureCallback);
        this.herder.tick();
        try {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
            Assert.fail("Expected NotLeaderException to be raised");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof NotAssignedException);
            Assert.assertEquals("ownerUrl", e.getCause().forwardUrl());
        }
        PowerMock.verifyAll();
    }

    @Test
    public void testRestartTask() throws Exception {
        EasyMock.expect(this.worker.connectorTaskConfigs(CONN1, MAX_TASKS.intValue(), (List) null)).andStubReturn(TASK_CONFIGS);
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        expectRebalance(1L, Collections.emptyList(), Collections.singletonList(TASK0));
        expectPostRebalanceCatchup(SNAPSHOT);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.worker.startTask((ConnectorTaskId) EasyMock.eq(TASK0), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.worker.stopAndAwaitTask(TASK0);
        PowerMock.expectLastCall();
        this.worker.startTask((ConnectorTaskId) EasyMock.eq(TASK0), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartTask(TASK0, futureCallback);
        this.herder.tick();
        futureCallback.get(1000L, TimeUnit.MILLISECONDS);
        PowerMock.verifyAll();
    }

    @Test
    public void testRestartUnknownTask() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectPostRebalanceCatchup(SNAPSHOT);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        FutureCallback futureCallback = new FutureCallback();
        this.herder.tick();
        this.herder.restartTask(new ConnectorTaskId("blah", 0), futureCallback);
        this.herder.tick();
        try {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
            Assert.fail("Expected NotLeaderException to be raised");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof NotFoundException);
        }
        PowerMock.verifyAll();
    }

    @Test
    public void testRequestProcessingOrder() throws Exception {
        DistributedHerder.HerderRequest addRequest = this.herder.addRequest(100L, (Callable) null, (Callback) null);
        DistributedHerder.HerderRequest addRequest2 = this.herder.addRequest(10L, (Callable) null, (Callback) null);
        DistributedHerder.HerderRequest addRequest3 = this.herder.addRequest(200L, (Callable) null, (Callback) null);
        DistributedHerder.HerderRequest addRequest4 = this.herder.addRequest(200L, (Callable) null, (Callback) null);
        Assert.assertEquals(addRequest2, this.herder.requests.pollFirst());
        Assert.assertEquals(addRequest, this.herder.requests.pollFirst());
        Assert.assertEquals(addRequest3, this.herder.requests.pollFirst());
        Assert.assertEquals(addRequest4, this.herder.requests.pollFirst());
    }

    @Test
    public void testRestartTaskRedirectToLeader() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectPostRebalanceCatchup(SNAPSHOT);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartTask(TASK0, futureCallback);
        this.herder.tick();
        try {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
            Assert.fail("Expected NotLeaderException to be raised");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof NotLeaderException);
        }
        PowerMock.verifyAll();
    }

    @Test
    public void testRestartTaskRedirectToOwner() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectPostRebalanceCatchup(SNAPSHOT);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        EasyMock.expect(this.member.ownerUrl(TASK0)).andReturn("ownerUrl");
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartTask(TASK0, futureCallback);
        this.herder.tick();
        try {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
            Assert.fail("Expected NotLeaderException to be raised");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof NotAssignedException);
            Assert.assertEquals("ownerUrl", e.getCause().forwardUrl());
        }
        PowerMock.verifyAll();
    }

    @Test
    public void testConnectorConfigAdded() {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        expectRebalance(-1L, Collections.emptyList(), Collections.emptyList());
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        EasyMock.expect(this.configBackingStore.snapshot()).andReturn(SNAPSHOT);
        this.member.requestRejoin();
        PowerMock.expectLastCall();
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, Arrays.asList(CONN1), Collections.emptyList());
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (ConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins);
        EasyMock.expect(Boolean.valueOf(this.worker.isRunning(CONN1))).andReturn(true);
        EasyMock.expect(this.worker.connectorTaskConfigs(CONN1, MAX_TASKS.intValue(), (List) null)).andReturn(TASK_CONFIGS);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.configUpdateListener.onConnectorConfigUpdate(CONN1);
        this.herder.tick();
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testConnectorConfigUpdate() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(this.worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
        expectRebalance(1L, Arrays.asList(CONN1), Collections.emptyList());
        expectPostRebalanceCatchup(SNAPSHOT);
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (ConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins);
        EasyMock.expect(Boolean.valueOf(this.worker.isRunning(CONN1))).andReturn(true);
        EasyMock.expect(this.worker.connectorTaskConfigs(CONN1, MAX_TASKS.intValue(), (List) null)).andReturn(TASK_CONFIGS);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        EasyMock.expect(this.configBackingStore.snapshot()).andReturn(SNAPSHOT);
        this.worker.stopConnector(CONN1);
        PowerMock.expectLastCall().andReturn(true);
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (ConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins);
        EasyMock.expect(Boolean.valueOf(this.worker.isRunning(CONN1))).andReturn(true);
        EasyMock.expect(this.worker.connectorTaskConfigs(CONN1, MAX_TASKS.intValue(), (List) null)).andReturn(TASK_CONFIGS);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.configUpdateListener.onConnectorConfigUpdate(CONN1);
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testConnectorPaused() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(this.worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
        expectRebalance(1L, Arrays.asList(CONN1), Collections.emptyList());
        expectPostRebalanceCatchup(SNAPSHOT);
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (ConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins);
        EasyMock.expect(Boolean.valueOf(this.worker.isRunning(CONN1))).andReturn(true);
        EasyMock.expect(this.worker.connectorTaskConfigs(CONN1, MAX_TASKS.intValue(), (List) null)).andReturn(TASK_CONFIGS);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        EasyMock.expect(this.configBackingStore.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1);
        PowerMock.expectLastCall();
        this.worker.setTargetState(CONN1, TargetState.PAUSED);
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.configUpdateListener.onConnectorTargetStateChange(CONN1);
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testConnectorResumed() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(this.worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
        expectRebalance(1L, Arrays.asList(CONN1), Collections.emptyList());
        expectPostRebalanceCatchup(SNAPSHOT_PAUSED_CONN1);
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (ConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.PAUSED));
        PowerMock.expectLastCall().andReturn(true);
        EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        EasyMock.expect(this.configBackingStore.snapshot()).andReturn(SNAPSHOT);
        PowerMock.expectLastCall();
        EasyMock.expect(Boolean.valueOf(this.worker.isRunning(CONN1))).andReturn(true);
        EasyMock.expect(this.worker.connectorTaskConfigs(CONN1, MAX_TASKS.intValue(), (List) null)).andReturn(TASK_CONFIGS);
        this.worker.setTargetState(CONN1, TargetState.STARTED);
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.configUpdateListener.onConnectorTargetStateChange(CONN1);
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testUnknownConnectorPaused() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(this.worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
        expectRebalance(1L, Collections.emptyList(), Collections.singletonList(TASK0));
        expectPostRebalanceCatchup(SNAPSHOT);
        this.worker.startTask((ConnectorTaskId) EasyMock.eq(TASK0), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        EasyMock.expect(this.configBackingStore.snapshot()).andReturn(SNAPSHOT);
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.configUpdateListener.onConnectorTargetStateChange("unknown-connector");
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testConnectorPausedRunningTaskOnly() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(this.worker.connectorNames()).andStubReturn(Collections.emptySet());
        expectRebalance(1L, Collections.emptyList(), Collections.singletonList(TASK0));
        expectPostRebalanceCatchup(SNAPSHOT);
        this.worker.startTask((ConnectorTaskId) EasyMock.eq(TASK0), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        EasyMock.expect(this.configBackingStore.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1);
        PowerMock.expectLastCall();
        this.worker.setTargetState(CONN1, TargetState.PAUSED);
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.configUpdateListener.onConnectorTargetStateChange(CONN1);
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testConnectorResumedRunningTaskOnly() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(this.worker.connectorNames()).andStubReturn(Collections.emptySet());
        expectRebalance(1L, Collections.emptyList(), Collections.singletonList(TASK0));
        expectPostRebalanceCatchup(SNAPSHOT_PAUSED_CONN1);
        this.worker.startTask((ConnectorTaskId) EasyMock.eq(TASK0), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.PAUSED));
        PowerMock.expectLastCall().andReturn(true);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        EasyMock.expect(this.configBackingStore.snapshot()).andReturn(SNAPSHOT);
        PowerMock.expectLastCall();
        this.worker.setTargetState(CONN1, TargetState.STARTED);
        PowerMock.expectLastCall();
        EasyMock.expect(Boolean.valueOf(this.worker.isRunning(CONN1))).andReturn(false);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.configUpdateListener.onConnectorTargetStateChange(CONN1);
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testTaskConfigAdded() {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        expectRebalance(-1L, Collections.emptyList(), Collections.emptyList());
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        EasyMock.expect(this.configBackingStore.snapshot()).andReturn(SNAPSHOT);
        this.member.requestRejoin();
        PowerMock.expectLastCall();
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, Collections.emptyList(), Arrays.asList(TASK0));
        this.worker.startTask((ConnectorTaskId) EasyMock.eq(TASK0), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK0, TASK1, TASK2));
        this.herder.tick();
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testJoinLeaderCatchUpFails() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 1, 1L, Collections.emptyList(), Collections.emptyList());
        this.configBackingStore.refresh(EasyMock.anyLong(), (TimeUnit) EasyMock.anyObject(TimeUnit.class));
        EasyMock.expectLastCall().andThrow(new TimeoutException());
        this.member.maybeLeaveGroup();
        EasyMock.expectLastCall();
        PowerMock.expectPrivate(this.herder, "backoff", new Object[]{300000});
        this.member.requestRejoin();
        expectRebalance(1L, Arrays.asList(CONN1), Arrays.asList(TASK1));
        expectPostRebalanceCatchup(SNAPSHOT);
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (ConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins);
        EasyMock.expect(this.worker.connectorTaskConfigs(CONN1, MAX_TASKS.intValue(), (List) null)).andReturn(TASK_CONFIGS);
        this.worker.startTask((ConnectorTaskId) EasyMock.eq(TASK1), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.worker.isRunning(CONN1))).andReturn(true);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testAccessors() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectPostRebalanceCatchup(SNAPSHOT);
        this.member.wakeup();
        PowerMock.expectLastCall().anyTimes();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        FutureCallback futureCallback = new FutureCallback();
        this.herder.connectors(futureCallback);
        FutureCallback futureCallback2 = new FutureCallback();
        this.herder.connectorInfo(CONN1, futureCallback2);
        FutureCallback futureCallback3 = new FutureCallback();
        this.herder.connectorConfig(CONN1, futureCallback3);
        FutureCallback futureCallback4 = new FutureCallback();
        this.herder.taskConfigs(CONN1, futureCallback4);
        this.herder.tick();
        Assert.assertTrue(futureCallback.isDone());
        Assert.assertEquals(Collections.singleton(CONN1), futureCallback.get());
        Assert.assertTrue(futureCallback2.isDone());
        Assert.assertEquals(new ConnectorInfo(CONN1, CONN1_CONFIG, Arrays.asList(TASK0, TASK1, TASK2)), futureCallback2.get());
        Assert.assertTrue(futureCallback3.isDone());
        Assert.assertEquals(CONN1_CONFIG, futureCallback3.get());
        Assert.assertTrue(futureCallback4.isDone());
        Assert.assertEquals(Arrays.asList(new TaskInfo(TASK0, TASK_CONFIG), new TaskInfo(TASK1, TASK_CONFIG), new TaskInfo(TASK2, TASK_CONFIG)), futureCallback4.get());
        PowerMock.verifyAll();
    }

    @Test
    public void testPutConnectorConfig() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        expectRebalance(1L, Arrays.asList(CONN1), Collections.emptyList());
        expectPostRebalanceCatchup(SNAPSHOT);
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (ConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.worker.isRunning(CONN1))).andReturn(true);
        EasyMock.expect(this.worker.connectorTaskConfigs(CONN1, MAX_TASKS.intValue(), (List) null)).andReturn(TASK_CONFIGS);
        this.member.wakeup();
        PowerMock.expectLastCall().anyTimes();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        Connector connector = (Connector) PowerMock.createMock(Connector.class);
        EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins).times(5);
        EasyMock.expect(this.plugins.compareAndSwapLoaders(connector)).andReturn(this.delegatingLoader);
        EasyMock.expect(this.plugins.newConnector(EasyMock.anyString())).andReturn(connector);
        EasyMock.expect(connector.config()).andReturn(new ConfigDef());
        EasyMock.expect(connector.validate(CONN1_CONFIG_UPDATED)).andReturn(new Config(Collections.emptyList()));
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.delegatingLoader)).andReturn(this.pluginLoader);
        this.configBackingStore.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED);
        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerderTest.1
            public Object answer() throws Throwable {
                DistributedHerderTest.this.configUpdateListener.onConnectorConfigUpdate(DistributedHerderTest.CONN1);
                return null;
            }
        });
        EasyMock.expect(this.configBackingStore.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG);
        this.worker.stopConnector(CONN1);
        PowerMock.expectLastCall().andReturn(true);
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (ConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.worker.isRunning(CONN1))).andReturn(true);
        EasyMock.expect(this.worker.connectorTaskConfigs(CONN1, MAX_TASKS.intValue(), (List) null)).andReturn(TASK_CONFIGS);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        FutureCallback futureCallback = new FutureCallback();
        this.herder.connectorConfig(CONN1, futureCallback);
        this.herder.tick();
        Assert.assertTrue(futureCallback.isDone());
        Assert.assertEquals(CONN1_CONFIG, futureCallback.get());
        FutureCallback futureCallback2 = new FutureCallback();
        this.herder.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED, true, futureCallback2);
        this.herder.tick();
        Assert.assertTrue(futureCallback2.isDone());
        Assert.assertEquals(new Herder.Created(false, new ConnectorInfo(CONN1, CONN1_CONFIG_UPDATED, Arrays.asList(TASK0, TASK1, TASK2))), futureCallback2.get());
        FutureCallback futureCallback3 = new FutureCallback();
        this.herder.connectorConfig(CONN1, futureCallback3);
        this.herder.tick();
        Assert.assertTrue(futureCallback3.isDone());
        Assert.assertEquals(CONN1_CONFIG_UPDATED, futureCallback3.get());
        PowerMock.verifyAll();
    }

    @Test
    public void testInconsistentConfigs() throws Exception {
    }

    private void expectRebalance(long j, List<String> list, List<ConnectorTaskId> list2) {
        expectRebalance(null, null, (short) 0, j, list, list2);
    }

    private void expectRebalance(final Collection<String> collection, final List<ConnectorTaskId> list, final short s, final long j, final List<String> list2, final List<ConnectorTaskId> list3) {
        this.member.ensureActive();
        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerderTest.2
            public Object answer() throws Throwable {
                if (collection != null) {
                    DistributedHerderTest.this.rebalanceListener.onRevoked("leader", collection, list);
                }
                DistributedHerderTest.this.rebalanceListener.onAssigned(new ConnectProtocol.Assignment(s, "leader", "leaderUrl", j, list2, list3), 0);
                return null;
            }
        });
        if (collection != null) {
            Iterator<String> it = collection.iterator();
            while (it.hasNext()) {
                this.worker.stopConnector(it.next());
                PowerMock.expectLastCall().andReturn(true);
            }
        }
        if (list != null && !list.isEmpty()) {
            this.worker.stopAndAwaitTask((ConnectorTaskId) EasyMock.anyObject(ConnectorTaskId.class));
            PowerMock.expectLastCall();
        }
        if (collection != null) {
            this.statusBackingStore.flush();
            PowerMock.expectLastCall();
        }
        this.member.wakeup();
        PowerMock.expectLastCall();
    }

    private void expectPostRebalanceCatchup(ClusterConfigState clusterConfigState) throws TimeoutException {
        this.configBackingStore.refresh(EasyMock.anyLong(), (TimeUnit) EasyMock.anyObject(TimeUnit.class));
        EasyMock.expectLastCall();
        EasyMock.expect(this.configBackingStore.snapshot()).andReturn(clusterConfigState);
    }

    static {
        HERDER_CONFIG.put("status.storage.topic", "status-topic");
        HERDER_CONFIG.put("config.storage.topic", "config-topic");
        HERDER_CONFIG.put("bootstrap.servers", "localhost:9092");
        HERDER_CONFIG.put("group.id", "connect-test-group");
        HERDER_CONFIG.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        HERDER_CONFIG.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        HERDER_CONFIG.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
        HERDER_CONFIG.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
        HERDER_CONFIG.put("offset.storage.topic", "connect-offsets");
        TASK0 = new ConnectorTaskId(CONN1, 0);
        TASK1 = new ConnectorTaskId(CONN1, 1);
        TASK2 = new ConnectorTaskId(CONN1, 2);
        MAX_TASKS = 3;
        CONN1_CONFIG = new HashMap();
        CONN1_CONFIG.put("name", CONN1);
        CONN1_CONFIG.put("tasks.max", MAX_TASKS.toString());
        CONN1_CONFIG.put("topics", "foo,bar");
        CONN1_CONFIG.put("connector.class", BogusSourceConnector.class.getName());
        CONN1_CONFIG_UPDATED = new HashMap(CONN1_CONFIG);
        CONN1_CONFIG_UPDATED.put("topics", "foo,bar,baz");
        CONN2_CONFIG = new HashMap();
        CONN2_CONFIG.put("name", CONN2);
        CONN2_CONFIG.put("tasks.max", MAX_TASKS.toString());
        CONN2_CONFIG.put("topics", "foo,bar");
        CONN2_CONFIG.put("connector.class", BogusSourceConnector.class.getName());
        TASK_CONFIG = new HashMap();
        TASK_CONFIG.put("task.class", BogusSourceTask.class.getName());
        TASK_CONFIGS = new ArrayList();
        TASK_CONFIGS.add(TASK_CONFIG);
        TASK_CONFIGS.add(TASK_CONFIG);
        TASK_CONFIGS.add(TASK_CONFIG);
        TASK_CONFIGS_MAP = new HashMap<>();
        TASK_CONFIGS_MAP.put(TASK0, TASK_CONFIG);
        TASK_CONFIGS_MAP.put(TASK1, TASK_CONFIG);
        TASK_CONFIGS_MAP.put(TASK2, TASK_CONFIG);
        SNAPSHOT = new ClusterConfigState(1L, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), TASK_CONFIGS_MAP, Collections.emptySet());
        SNAPSHOT_PAUSED_CONN1 = new ClusterConfigState(1L, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.PAUSED), TASK_CONFIGS_MAP, Collections.emptySet());
        SNAPSHOT_UPDATED_CONN1_CONFIG = new ClusterConfigState(1L, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED), Collections.singletonMap(CONN1, TargetState.STARTED), TASK_CONFIGS_MAP, Collections.emptySet());
    }
}
