package org.apache.kafka.clients.producer;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.apache.kafka.clients.producer.internals.Metadata;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/clients/producer/SenderTest.class */
public class SenderTest {
    private static final int MAX_REQUEST_SIZE = 1048576;
    private static final short ACKS_ALL = -1;
    private static final int MAX_RETRIES = 0;
    private static final int REQUEST_TIMEOUT_MS = 10000;
    private TopicPartition tp = new TopicPartition("test", MAX_RETRIES);
    private MockTime time = new MockTime();
    private MockClient client = new MockClient(this.time);
    private int batchSize = 16384;
    private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
    private Cluster cluster = TestUtils.singletonCluster("test", 1);
    private Metrics metrics = new Metrics(this.time);
    Map<String, String> metricTags = new LinkedHashMap();
    private RecordAccumulator accumulator = new RecordAccumulator(this.batchSize, 1048576, 0, 0, false, this.metrics, this.time, this.metricTags);
    private Sender sender = new Sender(this.client, this.metadata, this.accumulator, MAX_REQUEST_SIZE, -1, MAX_RETRIES, REQUEST_TIMEOUT_MS, this.metrics, this.time, "clientId");

    @Before
    public void setup() {
        this.metadata.update(this.cluster, this.time.milliseconds());
    }

    @Test
    public void testSimple() throws Exception {
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, (Callback) null).future;
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals("We should have a single produce request in flight.", 1L, this.client.inFlightRequestCount());
        this.client.respond(produceResponse(this.tp.topic(), this.tp.partition(), MAX_RETRIES, Errors.NONE.code()));
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals("All requests completed.", MAX_RETRIES, this.client.inFlightRequestCount());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue("Request should be completed", futureRecordMetadata.isDone());
        Assert.assertEquals(MAX_RETRIES, ((RecordMetadata) futureRecordMetadata.get()).offset());
    }

    @Test
    public void testRetries() throws Exception {
        Sender sender = new Sender(this.client, this.metadata, this.accumulator, MAX_REQUEST_SIZE, (short) -1, 1, REQUEST_TIMEOUT_MS, new Metrics(), this.time, "clientId");
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, (Callback) null).future;
        sender.run(this.time.milliseconds());
        sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        this.client.disconnect(Integer.valueOf(this.client.requests().peek().request().destination()));
        Assert.assertEquals(0L, this.client.inFlightRequestCount());
        sender.run(this.time.milliseconds());
        sender.run(this.time.milliseconds());
        sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        this.client.respond(produceResponse(this.tp.topic(), this.tp.partition(), MAX_RETRIES, Errors.NONE.code()));
        sender.run(this.time.milliseconds());
        Assert.assertTrue("Request should have retried and completed", futureRecordMetadata.isDone());
        Assert.assertEquals(MAX_RETRIES, ((RecordMetadata) futureRecordMetadata.get()).offset());
        FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(this.tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, (Callback) null).future;
        sender.run(this.time.milliseconds());
        for (int i = MAX_RETRIES; i < 1 + 1; i++) {
            this.client.disconnect(Integer.valueOf(this.client.requests().peek().request().destination()));
            sender.run(this.time.milliseconds());
            sender.run(this.time.milliseconds());
            sender.run(this.time.milliseconds());
        }
        sender.run(this.time.milliseconds());
        completedWithError(futureRecordMetadata2, Errors.NETWORK_EXCEPTION);
    }

    private void completedWithError(Future<RecordMetadata> future, Errors errors) throws Exception {
        Assert.assertTrue("Request should be completed", future.isDone());
        try {
            future.get();
            Assert.fail("Should have thrown an exception.");
        } catch (ExecutionException e) {
            Assert.assertEquals(errors.exception().getClass(), e.getCause().getClass());
        }
    }

    private Struct produceResponse(String str, int i, long j, int i2) {
        Struct struct = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id));
        Struct instance = struct.instance("responses");
        instance.set("topic", str);
        Struct instance2 = instance.instance("partition_responses");
        instance2.set("partition", Integer.valueOf(i));
        instance2.set("error_code", Short.valueOf((short) i2));
        instance2.set("base_offset", Long.valueOf(j));
        instance.set("partition_responses", new Object[]{instance2});
        struct.set("responses", new Object[]{instance});
        return struct;
    }
}
