package org.apache.kafka.connect.storage;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;

@PrepareForTest({KafkaOffsetBackingStore.class})
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
/* loaded from: input_file:org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.class */
public class KafkaOffsetBackingStoreTest {
    private static final String TOPIC = "connect-offsets";
    private static final short TOPIC_PARTITIONS = 2;
    private static final short TOPIC_REPLICATION_FACTOR = 5;
    private static final Map<String, String> DEFAULT_PROPS = new HashMap();
    private static final DistributedConfig DEFAULT_DISTRIBUTED_CONFIG;
    private static final Map<ByteBuffer, ByteBuffer> FIRST_SET;
    private static final ByteBuffer TP0_KEY;
    private static final ByteBuffer TP1_KEY;
    private static final ByteBuffer TP2_KEY;
    private static final ByteBuffer TP0_VALUE;
    private static final ByteBuffer TP1_VALUE;
    private static final ByteBuffer TP2_VALUE;
    private static final ByteBuffer TP0_VALUE_NEW;
    private static final ByteBuffer TP1_VALUE_NEW;

    @Mock
    KafkaBasedLog<byte[], byte[]> storeLog;
    private KafkaOffsetBackingStore store;
    private Capture<String> capturedTopic = EasyMock.newCapture();
    private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
    private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
    private Capture<Map<String, Object>> capturedAdminProps = EasyMock.newCapture();
    private Capture<NewTopic> capturedNewTopic = EasyMock.newCapture();
    private Capture<Callback<ConsumerRecord<byte[], byte[]>>> capturedConsumedCallback = EasyMock.newCapture();

    @Before
    public void setUp() throws Exception {
        this.store = (KafkaOffsetBackingStore) PowerMock.createPartialMockAndInvokeDefaultConstructor(KafkaOffsetBackingStore.class, new String[]{"createKafkaBasedLog"});
    }

    @Test
    public void testStartStop() throws Exception {
        expectConfigure();
        expectStart(Collections.emptyList());
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.store.configure(DEFAULT_DISTRIBUTED_CONFIG);
        Assert.assertEquals(TOPIC, this.capturedTopic.getValue());
        Assert.assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", ((Map) this.capturedProducerProps.getValue()).get("key.serializer"));
        Assert.assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", ((Map) this.capturedProducerProps.getValue()).get("value.serializer"));
        Assert.assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", ((Map) this.capturedConsumerProps.getValue()).get("key.deserializer"));
        Assert.assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", ((Map) this.capturedConsumerProps.getValue()).get("value.deserializer"));
        Assert.assertEquals(TOPIC, ((NewTopic) this.capturedNewTopic.getValue()).name());
        Assert.assertEquals(2L, ((NewTopic) this.capturedNewTopic.getValue()).numPartitions());
        Assert.assertEquals(5L, ((NewTopic) this.capturedNewTopic.getValue()).replicationFactor());
        this.store.start();
        this.store.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testReloadOnStart() throws Exception {
        expectConfigure();
        expectStart(Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE.array()), new ConsumerRecord(TOPIC, 1, 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE.array()), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array()), new ConsumerRecord(TOPIC, 1, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array())));
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.store.configure(DEFAULT_DISTRIBUTED_CONFIG);
        this.store.start();
        HashMap hashMap = (HashMap) Whitebox.getInternalState(this.store, "data");
        Assert.assertEquals(TP0_VALUE_NEW, hashMap.get(TP0_KEY));
        Assert.assertEquals(TP1_VALUE_NEW, hashMap.get(TP1_KEY));
        this.store.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testGetSet() throws Exception {
        expectConfigure();
        expectStart(Collections.emptyList());
        expectStop();
        Capture newCapture = EasyMock.newCapture();
        this.storeLog.readToEnd((Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, (Object) null);
            return null;
        });
        Capture newCapture2 = EasyMock.newCapture();
        this.storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), (org.apache.kafka.clients.producer.Callback) EasyMock.capture(newCapture2));
        PowerMock.expectLastCall();
        Capture newCapture3 = EasyMock.newCapture();
        this.storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), (org.apache.kafka.clients.producer.Callback) EasyMock.capture(newCapture3));
        PowerMock.expectLastCall();
        final Capture newCapture4 = EasyMock.newCapture();
        this.storeLog.readToEnd((Callback) EasyMock.capture(newCapture4));
        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { // from class: org.apache.kafka.connect.storage.KafkaOffsetBackingStoreTest.1
            public Object answer() throws Throwable {
                ((Callback) KafkaOffsetBackingStoreTest.this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, new ConsumerRecord(KafkaOffsetBackingStoreTest.TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, KafkaOffsetBackingStoreTest.TP0_KEY.array(), KafkaOffsetBackingStoreTest.TP0_VALUE.array()));
                ((Callback) KafkaOffsetBackingStoreTest.this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, new ConsumerRecord(KafkaOffsetBackingStoreTest.TOPIC, 1, 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, KafkaOffsetBackingStoreTest.TP1_KEY.array(), KafkaOffsetBackingStoreTest.TP1_VALUE.array()));
                ((Callback) newCapture4.getValue()).onCompletion((Throwable) null, (Object) null);
                return null;
            }
        });
        final Capture newCapture5 = EasyMock.newCapture();
        this.storeLog.readToEnd((Callback) EasyMock.capture(newCapture5));
        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { // from class: org.apache.kafka.connect.storage.KafkaOffsetBackingStoreTest.2
            public Object answer() throws Throwable {
                ((Callback) KafkaOffsetBackingStoreTest.this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, new ConsumerRecord(KafkaOffsetBackingStoreTest.TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, KafkaOffsetBackingStoreTest.TP0_KEY.array(), KafkaOffsetBackingStoreTest.TP0_VALUE_NEW.array()));
                ((Callback) KafkaOffsetBackingStoreTest.this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, new ConsumerRecord(KafkaOffsetBackingStoreTest.TOPIC, 1, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, KafkaOffsetBackingStoreTest.TP1_KEY.array(), KafkaOffsetBackingStoreTest.TP1_VALUE_NEW.array()));
                ((Callback) newCapture5.getValue()).onCompletion((Throwable) null, (Object) null);
                return null;
            }
        });
        PowerMock.replayAll(new Object[0]);
        this.store.configure(DEFAULT_DISTRIBUTED_CONFIG);
        this.store.start();
        Map map = (Map) this.store.get(Arrays.asList(TP0_KEY, TP1_KEY)).get(10000L, TimeUnit.MILLISECONDS);
        Assert.assertNull(map.get(TP0_KEY));
        Assert.assertNull(map.get(TP1_KEY));
        HashMap hashMap = new HashMap();
        hashMap.put(TP0_KEY, TP0_VALUE);
        hashMap.put(TP1_KEY, TP1_VALUE);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Future future = this.store.set(hashMap, new Callback<Void>() { // from class: org.apache.kafka.connect.storage.KafkaOffsetBackingStoreTest.3
            public void onCompletion(Throwable th, Void r5) {
                atomicBoolean.set(true);
            }
        });
        Assert.assertFalse(future.isDone());
        ((org.apache.kafka.clients.producer.Callback) newCapture3.getValue()).onCompletion((RecordMetadata) null, (Exception) null);
        Assert.assertFalse(atomicBoolean.get());
        ((org.apache.kafka.clients.producer.Callback) newCapture2.getValue()).onCompletion((RecordMetadata) null, (Exception) null);
        future.get(10000L, TimeUnit.MILLISECONDS);
        Assert.assertTrue(atomicBoolean.get());
        Map map2 = (Map) this.store.get(Arrays.asList(TP0_KEY, TP1_KEY)).get(10000L, TimeUnit.MILLISECONDS);
        Assert.assertEquals(TP0_VALUE, map2.get(TP0_KEY));
        Assert.assertEquals(TP1_VALUE, map2.get(TP1_KEY));
        Map map3 = (Map) this.store.get(Arrays.asList(TP0_KEY, TP1_KEY)).get(10000L, TimeUnit.MILLISECONDS);
        Assert.assertEquals(TP0_VALUE_NEW, map3.get(TP0_KEY));
        Assert.assertEquals(TP1_VALUE_NEW, map3.get(TP1_KEY));
        this.store.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testGetSetNull() throws Exception {
        expectConfigure();
        expectStart(Collections.emptyList());
        Capture newCapture = EasyMock.newCapture();
        this.storeLog.send(EasyMock.isNull(byte[].class), EasyMock.aryEq(TP0_VALUE.array()), (org.apache.kafka.clients.producer.Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall();
        Capture newCapture2 = EasyMock.newCapture();
        this.storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.isNull(byte[].class), (org.apache.kafka.clients.producer.Callback) EasyMock.capture(newCapture2));
        PowerMock.expectLastCall();
        Capture newCapture3 = EasyMock.newCapture();
        this.storeLog.readToEnd((Callback) EasyMock.capture(newCapture3));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (byte[]) null, TP0_VALUE.array()));
            ((Callback) this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, new ConsumerRecord(TOPIC, 1, 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), (byte[]) null));
            ((Callback) newCapture3.getValue()).onCompletion((Throwable) null, (Object) null);
            return null;
        });
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.store.configure(DEFAULT_DISTRIBUTED_CONFIG);
        this.store.start();
        HashMap hashMap = new HashMap();
        hashMap.put(null, TP0_VALUE);
        hashMap.put(TP1_KEY, null);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Future future = this.store.set(hashMap, new Callback<Void>() { // from class: org.apache.kafka.connect.storage.KafkaOffsetBackingStoreTest.4
            public void onCompletion(Throwable th, Void r5) {
                atomicBoolean.set(true);
            }
        });
        Assert.assertFalse(future.isDone());
        ((org.apache.kafka.clients.producer.Callback) newCapture2.getValue()).onCompletion((RecordMetadata) null, (Exception) null);
        Assert.assertFalse(atomicBoolean.get());
        ((org.apache.kafka.clients.producer.Callback) newCapture.getValue()).onCompletion((RecordMetadata) null, (Exception) null);
        future.get(10000L, TimeUnit.MILLISECONDS);
        Assert.assertTrue(atomicBoolean.get());
        Map map = (Map) this.store.get(Arrays.asList(null, TP1_KEY)).get(10000L, TimeUnit.MILLISECONDS);
        Assert.assertEquals(TP0_VALUE, map.get(null));
        Assert.assertNull(map.get(TP1_KEY));
        this.store.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testSetFailure() throws Exception {
        expectConfigure();
        expectStart(Collections.emptyList());
        expectStop();
        Capture newCapture = EasyMock.newCapture();
        this.storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), (org.apache.kafka.clients.producer.Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall();
        Capture newCapture2 = EasyMock.newCapture();
        this.storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), (org.apache.kafka.clients.producer.Callback) EasyMock.capture(newCapture2));
        PowerMock.expectLastCall();
        Capture newCapture3 = EasyMock.newCapture();
        this.storeLog.send(EasyMock.aryEq(TP2_KEY.array()), EasyMock.aryEq(TP2_VALUE.array()), (org.apache.kafka.clients.producer.Callback) EasyMock.capture(newCapture3));
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.store.configure(DEFAULT_DISTRIBUTED_CONFIG);
        this.store.start();
        HashMap hashMap = new HashMap();
        hashMap.put(TP0_KEY, TP0_VALUE);
        hashMap.put(TP1_KEY, TP1_VALUE);
        hashMap.put(TP2_KEY, TP2_VALUE);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        Future future = this.store.set(hashMap, new Callback<Void>() { // from class: org.apache.kafka.connect.storage.KafkaOffsetBackingStoreTest.5
            public void onCompletion(Throwable th, Void r5) {
                atomicBoolean.set(true);
                if (th != null) {
                    atomicBoolean2.set(true);
                }
            }
        });
        Assert.assertFalse(future.isDone());
        ((org.apache.kafka.clients.producer.Callback) newCapture2.getValue()).onCompletion((RecordMetadata) null, (Exception) null);
        Assert.assertFalse(atomicBoolean.get());
        ((org.apache.kafka.clients.producer.Callback) newCapture3.getValue()).onCompletion((RecordMetadata) null, new KafkaException("bogus error"));
        Assert.assertTrue(atomicBoolean.get());
        Assert.assertTrue(atomicBoolean2.get());
        ((org.apache.kafka.clients.producer.Callback) newCapture.getValue()).onCompletion((RecordMetadata) null, (Exception) null);
        try {
            future.get(10000L, TimeUnit.MILLISECONDS);
            Assert.fail("Should have seen KafkaException thrown when waiting on KafkaOffsetBackingStore.set() future");
        } catch (ExecutionException e) {
            Assert.assertNotNull(e.getCause());
            Assert.assertTrue(e.getCause() instanceof KafkaException);
        }
        this.store.stop();
        PowerMock.verifyAll();
    }

    private void expectConfigure() throws Exception {
        PowerMock.expectPrivate(this.store, "createKafkaBasedLog", new Object[]{EasyMock.capture(this.capturedTopic), EasyMock.capture(this.capturedProducerProps), EasyMock.capture(this.capturedConsumerProps), EasyMock.capture(this.capturedConsumedCallback), EasyMock.capture(this.capturedNewTopic), EasyMock.capture(this.capturedAdminProps)}).andReturn(this.storeLog);
    }

    private void expectStart(final List<ConsumerRecord<byte[], byte[]>> list) throws Exception {
        this.storeLog.start();
        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { // from class: org.apache.kafka.connect.storage.KafkaOffsetBackingStoreTest.6
            public Object answer() throws Throwable {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    ((Callback) KafkaOffsetBackingStoreTest.this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, (ConsumerRecord) it.next());
                }
                return null;
            }
        });
    }

    private void expectStop() {
        this.storeLog.stop();
        PowerMock.expectLastCall();
    }

    private static ByteBuffer buffer(String str) {
        return ByteBuffer.wrap(str.getBytes());
    }

    static {
        DEFAULT_PROPS.put("bootstrap.servers", "broker1:9092,broker2:9093");
        DEFAULT_PROPS.put("offset.storage.topic", TOPIC);
        DEFAULT_PROPS.put("offset.storage.replication.factor", Short.toString((short) 5));
        DEFAULT_PROPS.put("offset.storage.partitions", Integer.toString(TOPIC_PARTITIONS));
        DEFAULT_PROPS.put("config.storage.topic", "connect-configs");
        DEFAULT_PROPS.put("config.storage.replication.factor", Short.toString((short) 5));
        DEFAULT_PROPS.put("group.id", "connect");
        DEFAULT_PROPS.put("status.storage.topic", "status-topic");
        DEFAULT_PROPS.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        DEFAULT_PROPS.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        DEFAULT_PROPS.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
        DEFAULT_PROPS.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
        DEFAULT_DISTRIBUTED_CONFIG = new DistributedConfig(DEFAULT_PROPS);
        FIRST_SET = new HashMap();
        FIRST_SET.put(buffer("key"), buffer("value"));
        FIRST_SET.put(null, null);
        TP0_KEY = buffer("TP0KEY");
        TP1_KEY = buffer("TP1KEY");
        TP2_KEY = buffer("TP2KEY");
        TP0_VALUE = buffer("VAL0");
        TP1_VALUE = buffer("VAL1");
        TP2_VALUE = buffer("VAL2");
        TP0_VALUE_NEW = buffer("VAL0_NEW");
        TP1_VALUE_NEW = buffer("VAL1_NEW");
    }
}
