package org.apache.kafka.coordinator.group;

import java.net.InetAddress;
import java.util.Collections;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.InvalidFetchSizeException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.class */
public class GroupCoordinatorServiceTest {
    private CoordinatorRuntime<GroupCoordinatorShard, Record> mockRuntime() {
        return (CoordinatorRuntime) Mockito.mock(CoordinatorRuntime.class);
    }

    private GroupCoordinatorConfig createConfig() {
        return new GroupCoordinatorConfig(1, 45, 5, Integer.MAX_VALUE, Collections.singletonList(new RangeAssignor()), 1000, 4096, Integer.MAX_VALUE, 3000, 300000, 120, 50000);
    }

    @Test
    public void testStartupShutdown() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, Record> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime);
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        groupCoordinatorService.shutdown();
        ((CoordinatorRuntime) Mockito.verify(mockRuntime, Mockito.times(1))).close();
    }

    @Test
    public void testConsumerGroupHeartbeatWhenNotStarted() {
        TestUtils.assertFutureThrows(new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime()).consumerGroupHeartbeat(TestUtil.requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT), new ConsumerGroupHeartbeatRequestData().setGroupId("foo")), CoordinatorNotAvailableException.class);
    }

    @Test
    public void testConsumerGroupHeartbeat() throws ExecutionException, InterruptedException, TimeoutException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime);
        ConsumerGroupHeartbeatRequestData groupId = new ConsumerGroupHeartbeatRequestData().setGroupId("foo");
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("consumer-group-heartbeat"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(new ConsumerGroupHeartbeatResponseData()));
        Assertions.assertEquals(new ConsumerGroupHeartbeatResponseData(), groupCoordinatorService.consumerGroupHeartbeat(TestUtil.requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT), groupId).get(5L, TimeUnit.SECONDS));
    }

    private static Stream<Arguments> testConsumerGroupHeartbeatWithExceptionSource() {
        return Stream.of((Object[]) new Arguments[]{Arguments.arguments(new Object[]{new UnknownTopicOrPartitionException(), Short.valueOf(Errors.COORDINATOR_NOT_AVAILABLE.code()), null}), Arguments.arguments(new Object[]{new NotEnoughReplicasException(), Short.valueOf(Errors.COORDINATOR_NOT_AVAILABLE.code()), null}), Arguments.arguments(new Object[]{new NotLeaderOrFollowerException(), Short.valueOf(Errors.NOT_COORDINATOR.code()), null}), Arguments.arguments(new Object[]{new KafkaStorageException(), Short.valueOf(Errors.NOT_COORDINATOR.code()), null}), Arguments.arguments(new Object[]{new RecordTooLargeException(), Short.valueOf(Errors.UNKNOWN_SERVER_ERROR.code()), null}), Arguments.arguments(new Object[]{new RecordBatchTooLargeException(), Short.valueOf(Errors.UNKNOWN_SERVER_ERROR.code()), null}), Arguments.arguments(new Object[]{new InvalidFetchSizeException(""), Short.valueOf(Errors.UNKNOWN_SERVER_ERROR.code()), null}), Arguments.arguments(new Object[]{new InvalidRequestException("Invalid"), Short.valueOf(Errors.INVALID_REQUEST.code()), "Invalid"})});
    }

    @MethodSource({"testConsumerGroupHeartbeatWithExceptionSource"})
    @ParameterizedTest
    public void testConsumerGroupHeartbeatWithException(Throwable th, short s, String str) throws ExecutionException, InterruptedException, TimeoutException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime);
        ConsumerGroupHeartbeatRequestData groupId = new ConsumerGroupHeartbeatRequestData().setGroupId("foo");
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("consumer-group-heartbeat"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(FutureUtils.failedFuture(th));
        Assertions.assertEquals(new ConsumerGroupHeartbeatResponseData().setErrorCode(s).setErrorMessage(str), groupCoordinatorService.consumerGroupHeartbeat(TestUtil.requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT), groupId).get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testPartitionFor() {
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime());
        Assertions.assertThrows(CoordinatorNotAvailableException.class, () -> {
            groupCoordinatorService.partitionFor("foo");
        });
        groupCoordinatorService.startup(() -> {
            return 10;
        });
        Assertions.assertEquals(Utils.abs("foo".hashCode()) % 10, groupCoordinatorService.partitionFor("foo"));
    }

    @Test
    public void testGroupMetadataTopicConfigs() {
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime());
        Properties properties = new Properties();
        properties.put("cleanup.policy", "compact");
        properties.put("compression.type", BrokerCompressionType.PRODUCER.name);
        properties.put("segment.bytes", "1000");
        Assertions.assertEquals(properties, groupCoordinatorService.groupMetadataTopicConfigs());
    }

    @Test
    public void testOnElection() {
        CoordinatorRuntime<GroupCoordinatorShard, Record> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime);
        Assertions.assertThrows(CoordinatorNotAvailableException.class, () -> {
            groupCoordinatorService.onElection(5, 10);
        });
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        groupCoordinatorService.onElection(5, 10);
        ((CoordinatorRuntime) Mockito.verify(mockRuntime, Mockito.times(1))).scheduleLoadOperation(new TopicPartition("__consumer_offsets", 5), 10);
    }

    @Test
    public void testOnResignation() {
        CoordinatorRuntime<GroupCoordinatorShard, Record> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime);
        Assertions.assertThrows(CoordinatorNotAvailableException.class, () -> {
            groupCoordinatorService.onResignation(5, OptionalInt.of(10));
        });
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        groupCoordinatorService.onResignation(5, OptionalInt.of(10));
        ((CoordinatorRuntime) Mockito.verify(mockRuntime, Mockito.times(1))).scheduleUnloadOperation(new TopicPartition("__consumer_offsets", 5), 10);
    }

    @Test
    public void testJoinGroup() {
        CoordinatorRuntime<GroupCoordinatorShard, Record> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime);
        JoinGroupRequestData groupId = new JoinGroupRequestData().setGroupId("foo");
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("generic-group-join"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(new JoinGroupResponseData()));
        Assertions.assertFalse(groupCoordinatorService.joinGroup(TestUtil.requestContext(ApiKeys.JOIN_GROUP), groupId, BufferSupplier.NO_CACHING).isDone());
    }

    @Test
    public void testJoinGroupWithException() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, Record> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime);
        JoinGroupRequestData groupId = new JoinGroupRequestData().setGroupId("foo");
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("generic-group-join"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(FutureUtils.failedFuture(new IllegalStateException()));
        Assertions.assertEquals(new JoinGroupResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()), groupCoordinatorService.joinGroup(TestUtil.requestContext(ApiKeys.JOIN_GROUP), groupId, BufferSupplier.NO_CACHING).get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testJoinGroupInvalidGroupId() throws Exception {
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime());
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        CompletableFuture joinGroup = groupCoordinatorService.joinGroup(new RequestContext(new RequestHeader(ApiKeys.JOIN_GROUP, ApiKeys.JOIN_GROUP.latestVersion(), "client", 0), "1", InetAddress.getLoopbackAddress(), KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false), new JoinGroupRequestData().setGroupId((String) null).setMemberId(""), BufferSupplier.NO_CACHING);
        Assertions.assertTrue(joinGroup.isDone());
        Assertions.assertEquals(new JoinGroupResponseData().setErrorCode(Errors.INVALID_GROUP_ID.code()).setMemberId(""), joinGroup.get());
    }

    @Test
    public void testSyncGroup() {
        CoordinatorRuntime<GroupCoordinatorShard, Record> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime);
        SyncGroupRequestData groupId = new SyncGroupRequestData().setGroupId("foo");
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("generic-group-sync"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(new SyncGroupResponseData()));
        Assertions.assertFalse(groupCoordinatorService.syncGroup(TestUtil.requestContext(ApiKeys.SYNC_GROUP), groupId, BufferSupplier.NO_CACHING).isDone());
    }

    @Test
    public void testSyncGroupWithException() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, Record> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime);
        SyncGroupRequestData groupId = new SyncGroupRequestData().setGroupId("foo");
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("generic-group-sync"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(FutureUtils.failedFuture(new IllegalStateException()));
        CompletableFuture syncGroup = groupCoordinatorService.syncGroup(TestUtil.requestContext(ApiKeys.SYNC_GROUP), groupId, BufferSupplier.NO_CACHING);
        Assertions.assertTrue(syncGroup.isDone());
        Assertions.assertEquals(new SyncGroupResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()), syncGroup.get());
    }

    @Test
    public void testSyncGroupInvalidGroupId() throws Exception {
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime());
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        CompletableFuture syncGroup = groupCoordinatorService.syncGroup(TestUtil.requestContext(ApiKeys.SYNC_GROUP), new SyncGroupRequestData().setGroupId((String) null).setMemberId(""), BufferSupplier.NO_CACHING);
        Assertions.assertTrue(syncGroup.isDone());
        Assertions.assertEquals(new SyncGroupResponseData().setErrorCode(Errors.INVALID_GROUP_ID.code()), syncGroup.get());
    }

    @Test
    public void testHeartbeat() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, Record> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime);
        HeartbeatRequestData groupId = new HeartbeatRequestData().setGroupId("foo");
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        Mockito.when(mockRuntime.scheduleReadOperation((String) ArgumentMatchers.eq("generic-group-heartbeat"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorReadOperation) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(new HeartbeatResponseData()));
        CompletableFuture heartbeat = groupCoordinatorService.heartbeat(TestUtil.requestContext(ApiKeys.HEARTBEAT), groupId);
        Assertions.assertTrue(heartbeat.isDone());
        Assertions.assertEquals(new HeartbeatResponseData(), heartbeat.get());
    }

    @Test
    public void testHeartbeatCoordinatorNotAvailableException() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, Record> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime);
        HeartbeatRequestData groupId = new HeartbeatRequestData().setGroupId("foo");
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        Mockito.when(mockRuntime.scheduleReadOperation((String) ArgumentMatchers.eq("generic-group-heartbeat"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorReadOperation) ArgumentMatchers.any())).thenReturn(FutureUtils.failedFuture(new CoordinatorLoadInProgressException((String) null)));
        CompletableFuture heartbeat = groupCoordinatorService.heartbeat(TestUtil.requestContext(ApiKeys.HEARTBEAT), groupId);
        Assertions.assertTrue(heartbeat.isDone());
        Assertions.assertEquals(new HeartbeatResponseData(), heartbeat.get());
    }

    @Test
    public void testHeartbeatCoordinatorException() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, Record> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime);
        HeartbeatRequestData groupId = new HeartbeatRequestData().setGroupId("foo");
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        Mockito.when(mockRuntime.scheduleReadOperation((String) ArgumentMatchers.eq("generic-group-heartbeat"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorReadOperation) ArgumentMatchers.any())).thenReturn(FutureUtils.failedFuture(new RebalanceInProgressException()));
        CompletableFuture heartbeat = groupCoordinatorService.heartbeat(TestUtil.requestContext(ApiKeys.HEARTBEAT), groupId);
        Assertions.assertTrue(heartbeat.isDone());
        Assertions.assertEquals(new HeartbeatResponseData().setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()), heartbeat.get());
    }
}
