package com.rabbitmq.stream.perf;

import com.google.common.util.concurrent.RateLimiter;
import com.rabbitmq.stream.Address;
import com.rabbitmq.stream.AddressResolver;
import com.rabbitmq.stream.ByteCapacity;
import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.ConfirmationHandler;
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.ConsumerBuilder;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.EnvironmentBuilder;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.Producer;
import com.rabbitmq.stream.StreamCreator;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.codec.QpidProtonCodec;
import com.rabbitmq.stream.codec.SimpleCodec;
import com.rabbitmq.stream.compression.Compression;
import com.rabbitmq.stream.impl.Client;
import com.rabbitmq.stream.metrics.MetricsCollector;
import com.rabbitmq.stream.metrics.MicrometerMetricsCollector;
import com.rabbitmq.stream.perf.ShutdownService;
import com.rabbitmq.stream.perf.Utils;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufAllocatorMetric;
import io.netty.buffer.ByteBufAllocatorMetricProvider;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.util.internal.PlatformDependent;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;

@CommandLine.Command(name = "stream-perf-test", mixinStandardHelpOptions = false, showDefaultValues = true, version = {"perftest 0.1"}, description = {"Tests the performance of stream queues in RabbitMQ."})
/* loaded from: input_file:com/rabbitmq/stream/perf/StreamPerfTest.class */
public class StreamPerfTest implements Callable<Integer> {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamPerfTest.class);
    private static final Map<String, String> CODEC_ALIASES = new HashMap<String, String>() { // from class: com.rabbitmq.stream.perf.StreamPerfTest.1
        {
            put("qpid", QpidProtonCodec.class.getName());
            put("simple", SimpleCodec.class.getName());
        }
    };
    private final String[] arguments;

    @CommandLine.Mixin
    private final CommandLine.HelpCommand helpCommand;
    int streamDispatching;
    private volatile Codec codec;

    @CommandLine.Option(names = {"--uris", "-u"}, description = {"servers to connect to, e.g. rabbitmq-stream://localhost:5552, separated by commas"}, defaultValue = "rabbitmq-stream://localhost:5552", split = ",")
    private List<String> uris;

    @CommandLine.Option(names = {"--producers", "-x"}, description = {"number of producers"}, defaultValue = "1", converter = {Utils.NotNegativeIntegerTypeConverter.class})
    private int producers;

    @CommandLine.Option(names = {"--consumers", "-y"}, description = {"number of consumers"}, defaultValue = "1", converter = {Utils.NotNegativeIntegerTypeConverter.class})
    private int consumers;

    @CommandLine.Option(names = {"--size", "-s"}, description = {"size of messages in bytes"}, defaultValue = "10", converter = {Utils.NotNegativeIntegerTypeConverter.class})
    private volatile int messageSize;

    @CommandLine.Option(names = {"--confirms", "-c"}, description = {"outstanding confirms"}, defaultValue = "10000", converter = {Utils.NotNegativeIntegerTypeConverter.class})
    private int confirms;

    @CommandLine.Option(names = {"--stream-count", "-sc"}, description = {"number of streams to send and consume from. Examples: 10, 1-10."}, defaultValue = "1", converter = {Utils.RangeTypeConverter.class})
    private String streamCount;

    @CommandLine.Option(names = {"--streams", "-st"}, description = {"stream(s) to send to and consume from, separated by commas"}, defaultValue = "stream", split = ",")
    private List<String> streams;

    @CommandLine.Option(names = {"--delete-streams", "-ds"}, description = {"whether to delete stream(s) after the run or not"}, defaultValue = "false")
    private boolean deleteStreams;

    @CommandLine.Option(names = {"--offset", "-o"}, description = {"offset to start listening from. Valid values are 'first', 'last', 'next', an unsigned long, or an ISO 8601 formatted timestamp (eg. 2020-06-03T07:45:54Z)."}, defaultValue = "next", converter = {Utils.OffsetSpecificationTypeConverter.class})
    private OffsetSpecification offset;

    @CommandLine.Option(names = {"--rate", "-r"}, description = {"maximum rate of published messages"}, defaultValue = "-1")
    private int rate;

    @CommandLine.Option(names = {"--batch-size", "-bs"}, description = {"size of a batch of published messages"}, defaultValue = "100", converter = {Utils.PositiveIntegerTypeConverter.class})
    private int batchSize;

    @CommandLine.Option(names = {"--sub-entry-size", "-ses"}, description = {"number of messages packed into a normal message entry"}, defaultValue = "1", converter = {Utils.PositiveIntegerTypeConverter.class})
    private int subEntrySize;

    @CommandLine.Option(names = {"--compression", "-co"}, description = {"compression codec to use for sub-entries. Values: none, gzip, snappy, lz4, zstd."}, defaultValue = "none", converter = {Utils.CompressionTypeConverter.class})
    private Compression compression;

    @CommandLine.Option(names = {"--codec", "-cc"}, description = {"class of codec to use. Aliases: qpid, simple."}, defaultValue = "qpid")
    private String codecClass;

    @CommandLine.Option(names = {"--max-length-bytes", "-mlb"}, description = {"max size of created streams"}, defaultValue = "20gb", converter = {Utils.ByteCapacityTypeConverter.class})
    private ByteCapacity maxLengthBytes;

    @CommandLine.Option(names = {"--stream-max-segment-size-bytes", "-smssb"}, description = {"max size of segments"}, defaultValue = "500mb", converter = {Utils.ByteCapacityTypeConverter.class})
    private ByteCapacity maxSegmentSize;

    @CommandLine.Option(names = {"--max-age", "-ma"}, description = {"max age of segments using the ISO 8601 duration format, e.g. PT10M30S for 10 minutes 30 seconds, P5DT8H for 5 days 8 hours."}, converter = {Utils.DurationTypeConverter.class})
    private Duration maxAge;

    @CommandLine.Option(names = {"--leader-locator", "-ll"}, description = {"leader locator strategy for created stream. Possible values: client-local, least-leaders, random."}, converter = {Utils.LeaderLocatorTypeConverter.class}, defaultValue = "least-leaders")
    private StreamCreator.LeaderLocator leaderLocator;

    @CommandLine.Option(names = {"--store-every", "-se"}, description = {"the frequency of offset storage"}, defaultValue = "0")
    private int storeEvery;

    @CommandLine.Option(names = {"--version", "-v"}, description = {"show version information"}, defaultValue = "false")
    private boolean version;

    @CommandLine.Option(names = {"--summary-file", "-sf"}, description = {"generate a summary file with metrics"}, defaultValue = "false")
    private boolean summaryFile;

    @CommandLine.Option(names = {"--producers-by-connection", "-pbc"}, description = {"number of producers by connection. Value must be between 1 and 255."}, defaultValue = "1", converter = {Utils.OneTo255RangeIntegerTypeConverter.class})
    private int producersByConnection;

    @CommandLine.Option(names = {"--tracking-consumers-by-connection", "-ccbc"}, description = {"number of tracking consumers by connection. Value must be between 1 and 255."}, defaultValue = "50", converter = {Utils.OneTo255RangeIntegerTypeConverter.class})
    private int trackingConsumersByConnection;

    @CommandLine.Option(names = {"--consumers-by-connection", "-cbc"}, description = {"number of consumers by connection. Value must be between 1 and 255."}, defaultValue = "1", converter = {Utils.OneTo255RangeIntegerTypeConverter.class})
    private int consumersByConnection;

    @CommandLine.Option(names = {"--load-balancer", "-lb"}, description = {"assume URIs point to a load balancer"}, defaultValue = "false")
    private boolean loadBalancer;

    @CommandLine.Option(names = {"--consumer-names", "-cn"}, description = {"naming strategy for consumer names. Valid are values are 'uuid' or a pattern with stream name and consumer index as arguments."}, defaultValue = "%s-%d", converter = {Utils.ConsumerNameStrategyConverter.class})
    private BiFunction<String, Integer, String> consumerNameStrategy;

    @CommandLine.Option(names = {"--metrics-byte-rates", "-mbr"}, description = {"include written and read byte rates in metrics"}, defaultValue = "false")
    private boolean includeByteRates;

    @CommandLine.Option(names = {"--memory-report", "-mr"}, description = {"report information on memory settings and usage"}, defaultValue = "false")
    private boolean memoryReport;
    private MetricsCollector metricsCollector;
    private PerformanceMetrics performanceMetrics;
    private final PrintWriter err;
    private final PrintWriter out;

    public StreamPerfTest() {
        this(null, null, null);
    }

    public StreamPerfTest(String[] strArr, PrintStream printStream, PrintStream printStream2) {
        this.helpCommand = new CommandLine.HelpCommand();
        this.streamDispatching = 0;
        this.arguments = strArr;
        printStream = printStream == null ? System.out : printStream;
        printStream2 = printStream2 == null ? System.err : printStream2;
        this.out = new PrintWriter((OutputStream) printStream, true);
        this.err = new PrintWriter((OutputStream) printStream2, true);
    }

    public static void main(String[] strArr) {
        System.exit(run(strArr, System.out, System.err));
    }

    static int run(String[] strArr, PrintStream printStream, PrintStream printStream2) {
        StreamPerfTest streamPerfTest = new StreamPerfTest(strArr, printStream, printStream2);
        return new CommandLine(streamPerfTest).setOut(streamPerfTest.out).setErr(streamPerfTest.err).execute(strArr);
    }

    static void versionInformation(PrintStream printStream) {
        String property = System.getProperty("line.separator");
        String format = String.format("RabbitMQ Stream Perf Test %s (%s; %s)", Version.VERSION, Version.BUILD, Version.BUILD_TIMESTAMP);
        String format2 = String.format("Java version: %s, vendor: %s" + property + "Java home: %s" + property + "Default locale: %s, platform encoding: %s" + property + "OS name: %s, version: %s, arch: %s", System.getProperty("java.version"), System.getProperty("java.vendor"), System.getProperty("java.home"), Locale.getDefault().toString(), Charset.defaultCharset(), System.getProperty("os.name"), System.getProperty("os.version"), System.getProperty("os.arch"));
        printStream.println("\u001b[1m" + format);
        printStream.println("\u001b[0m" + format2);
    }

    private static Codec createCodec(String str) {
        String orDefault = CODEC_ALIASES.getOrDefault(str, str);
        try {
            return (Codec) Class.forName(orDefault).getConstructor(new Class[0]).newInstance(new Object[0]);
        } catch (Exception e) {
            throw new StreamException("Exception while creating codec " + orDefault, e);
        }
    }

    private static boolean isTls(Collection<String> collection) {
        return collection.stream().anyMatch(str -> {
            return str.toLowerCase().startsWith("rabbitmq-stream+tls");
        });
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public Integer call() throws Exception {
        Supplier supplier;
        AddressResolver addressResolver;
        ExecutorService executorService;
        if (this.version) {
            versionInformation(System.out);
            System.exit(0);
        }
        this.codec = createCodec(this.codecClass);
        ByteBufAllocatorMetricProvider byteBufAllocatorMetricProvider = ByteBufAllocator.DEFAULT;
        CompositeMeterRegistry compositeMeterRegistry = new CompositeMeterRegistry();
        this.metricsCollector = new MicrometerMetricsCollector(compositeMeterRegistry, "rabbitmq.stream");
        Counter counter = compositeMeterRegistry.counter("rabbitmq.stream.producer_confirmed", new String[0]);
        if (this.memoryReport) {
            long physicalMemory = Utils.physicalMemory();
            this.out.println(String.format("Max memory %s (%d bytes), max direct memory %s (%d bytes)%s", Utils.formatByte(Runtime.getRuntime().maxMemory()), Long.valueOf(Runtime.getRuntime().maxMemory()), Utils.formatByte(PlatformDependent.maxDirectMemory()), Long.valueOf(PlatformDependent.maxDirectMemory()), physicalMemory == 0 ? "" : String.format(", physical memory %s (%d bytes)", Utils.formatByte(physicalMemory), Long.valueOf(physicalMemory))));
            if (byteBufAllocatorMetricProvider instanceof ByteBufAllocatorMetricProvider) {
                ByteBufAllocatorMetric metric = byteBufAllocatorMetricProvider.metric();
                supplier = () -> {
                    long usedHeapMemory = metric.usedHeapMemory();
                    long usedDirectMemory = metric.usedDirectMemory();
                    return String.format("Used heap memory %s (%d bytes), used direct memory %s (%d bytes)", Utils.formatByte(usedHeapMemory), Long.valueOf(usedHeapMemory), Utils.formatByte(usedDirectMemory), Long.valueOf(usedDirectMemory));
                };
            } else {
                supplier = () -> {
                    return "";
                };
            }
        } else {
            supplier = () -> {
                return "";
            };
        }
        this.performanceMetrics = new DefaultPerformanceMetrics(compositeMeterRegistry, "rabbitmq.stream", this.summaryFile, this.includeByteRates, supplier, this.out);
        this.messageSize = this.messageSize < 8 ? 8 : this.messageSize;
        ShutdownService shutdownService = new ShutdownService();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            shutdownService.close();
        }));
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(Math.max(Runtime.getRuntime().availableProcessors(), this.producers), new Utils.NamedThreadFactory("stream-perf-test-env-"));
        shutdownService.wrap(closeStep("Closing environment executor", () -> {
            newScheduledThreadPool.shutdownNow();
        }));
        boolean isTls = isTls(this.uris);
        if (this.loadBalancer) {
            int i = isTls ? Client.DEFAULT_TLS_PORT : Client.DEFAULT_PORT;
            List list = (List) this.uris.stream().map(str -> {
                try {
                    return new URI(str);
                } catch (URISyntaxException e) {
                    throw new IllegalArgumentException("Error while parsing URI " + str + ": " + e.getMessage());
                }
            }).map(uri -> {
                return new Address(uri.getHost() == null ? "localhost" : uri.getHost(), uri.getPort() == -1 ? i : uri.getPort());
            }).collect(Collectors.toList());
            AtomicInteger atomicInteger = new AtomicInteger(0);
            addressResolver = address -> {
                return (Address) list.get(atomicInteger.getAndIncrement() % list.size());
            };
        } else {
            addressResolver = address2 -> {
                return address2;
            };
        }
        EnvironmentBuilder maxConsumersByConnection = Environment.builder().uris(this.uris).addressResolver(addressResolver).scheduledExecutorService(newScheduledThreadPool).metricsCollector(this.metricsCollector).byteBufAllocator(byteBufAllocatorMetricProvider).maxProducersByConnection(this.producersByConnection).maxTrackingConsumersByConnection(this.trackingConsumersByConnection).maxConsumersByConnection(this.consumersByConnection);
        if (isTls) {
            maxConsumersByConnection = maxConsumersByConnection.tls().sslContext(SslContextBuilder.forClient().trustManager(Utils.TRUST_EVERYTHING_TRUST_MANAGER).build()).environmentBuilder();
        }
        Environment build = maxConsumersByConnection.build();
        shutdownService.wrap(closeStep("Closing environment(s)", () -> {
            build.close();
        }));
        this.streams = Utils.streams(this.streamCount, this.streams);
        for (String str2 : this.streams) {
            StreamCreator leaderLocator = build.streamCreator().stream(str2).maxLengthBytes(this.maxLengthBytes).maxSegmentSizeBytes(this.maxSegmentSize).leaderLocator(this.leaderLocator);
            if (this.maxAge != null) {
                leaderLocator.maxAge(this.maxAge);
            }
            try {
                leaderLocator.create();
            } catch (StreamException e) {
                if (e.getCode() != 17) {
                    throw e;
                }
                String format = String.format("Warning: stream '%s' already exists, but with different properties than max-length-bytes=%s, stream-max-segment-size-bytes=%s, queue-leader-locator=%s", str2, this.maxLengthBytes, this.maxSegmentSize, this.leaderLocator);
                if (this.maxAge != null) {
                    format = format + String.format(", max-age=%s", this.maxAge);
                }
                this.out.println(format);
            }
        }
        if (this.deleteStreams) {
            shutdownService.wrap(closeStep("Deleting stream(s)", () -> {
                for (String str3 : this.streams) {
                    LOGGER.debug("Deleting {}", str3);
                    try {
                        build.deleteStream(str3);
                        LOGGER.debug("Deleted {}", str3);
                    } catch (Exception e2) {
                        LOGGER.warn("Could not delete stream {}: {}", str3, e2.getMessage());
                    }
                }
            }));
        }
        List synchronizedList = Collections.synchronizedList(new ArrayList(this.producers));
        List<Runnable> list2 = (List) IntStream.range(0, this.producers).mapToObj(i2 -> {
            Runnable runnable;
            if (this.rate > 0) {
                RateLimiter create = RateLimiter.create(this.rate);
                runnable = () -> {
                    create.acquire(1);
                };
            } else {
                runnable = () -> {
                };
            }
            Producer build2 = build.producerBuilder().subEntrySize(this.subEntrySize).batchSize(this.batchSize).compression(this.compression == Compression.NONE ? null : this.compression).maxUnconfirmedMessages(this.confirms).stream(stream()).build();
            synchronizedList.add(build2);
            Runnable runnable2 = runnable;
            return () -> {
                int i2 = this.messageSize;
                ConfirmationHandler confirmationHandler = confirmationStatus -> {
                    if (confirmationStatus.isConfirmed()) {
                        counter.increment();
                    }
                };
                while (!Thread.currentThread().isInterrupted()) {
                    runnable2.run();
                    byte[] bArr = new byte[i2];
                    Utils.writeLong(bArr, System.nanoTime());
                    build2.send(build2.messageBuilder().addData(bArr).build(), confirmationHandler);
                }
            };
        }).collect(Collectors.toList());
        List synchronizedList2 = Collections.synchronizedList((List) IntStream.range(0, this.consumers).mapToObj(i3 -> {
            PerformanceMetrics performanceMetrics = this.performanceMetrics;
            AtomicLong atomicLong = new AtomicLong(0L);
            String stream = stream();
            ConsumerBuilder offset = build.consumerBuilder().stream(stream).offset(this.offset);
            if (this.storeEvery > 0) {
                offset = offset.name(this.consumerNameStrategy.apply(stream, Integer.valueOf(i3 + 1))).autoTrackingStrategy().messageCountBeforeStorage(this.storeEvery).builder();
            }
            return offset.messageHandler((context, message) -> {
                if (atomicLong.incrementAndGet() % 100 == 0) {
                    performanceMetrics.latency(System.nanoTime() - Utils.readLong(message.getBodyAsBinary()), TimeUnit.NANOSECONDS);
                }
            }).build();
        }).collect(Collectors.toList()));
        shutdownService.wrap(closeStep("Closing consumers", () -> {
            Iterator it = synchronizedList2.iterator();
            while (it.hasNext()) {
                ((Consumer) it.next()).close();
            }
        }));
        if (this.producers > 0) {
            executorService = Executors.newFixedThreadPool(this.producers);
            for (Runnable runnable : list2) {
                this.out.println("Starting producer");
                executorService.submit(runnable);
            }
        } else {
            executorService = null;
        }
        shutdownService.wrap(closeStep("Closing producers", () -> {
            Iterator it = synchronizedList.iterator();
            while (it.hasNext()) {
                ((Producer) it.next()).close();
            }
        }));
        ExecutorService executorService2 = executorService;
        shutdownService.wrap(closeStep("Closing producers executor service", () -> {
            if (executorService2 != null) {
                executorService2.shutdownNow();
            }
        }));
        this.performanceMetrics.start("Arguments: " + String.join(" ", this.arguments));
        shutdownService.wrap(closeStep("Closing metrics", () -> {
            this.performanceMetrics.close();
        }));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            countDownLatch.countDown();
        }));
        try {
            countDownLatch.await();
        } catch (InterruptedException e2) {
        }
        shutdownService.close();
        return 0;
    }

    private ShutdownService.CloseCallback closeStep(final String str, final ShutdownService.CloseCallback closeCallback) {
        return new ShutdownService.CloseCallback() { // from class: com.rabbitmq.stream.perf.StreamPerfTest.2
            @Override // com.rabbitmq.stream.perf.ShutdownService.CloseCallback
            public void run() throws Exception {
                StreamPerfTest.LOGGER.debug(str);
                closeCallback.run();
            }

            public String toString() {
                return str;
            }
        };
    }

    private String stream() {
        List<String> list = this.streams;
        int i = this.streamDispatching;
        this.streamDispatching = i + 1;
        return list.get(i % this.streams.size());
    }
}
