package org.apache.flink.connector.testframe.utils;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.testframe.environment.TestEnvironment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.StringUtils;
import org.assertj.core.api.AssertionsForClassTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/testframe/utils/MetricQuerier.class */
public class MetricQuerier {
    private static final Logger LOG = LoggerFactory.getLogger(MetricQuerier.class);
    private RestClient restClient;

    public MetricQuerier(Configuration configuration) throws ConfigurationException {
        this.restClient = new RestClient(configuration, Executors.newCachedThreadPool());
    }

    public static JobDetailsInfo getJobDetails(RestClient restClient, TestEnvironment.Endpoint endpoint, JobID jobID) throws Exception {
        String address = endpoint.getAddress();
        int port = endpoint.getPort();
        JobMessageParameters jobMessageParameters = new JobMessageParameters();
        jobMessageParameters.jobPathParameter.resolve(jobID);
        return (JobDetailsInfo) restClient.sendRequest(address, port, JobDetailsHeaders.getInstance(), jobMessageParameters, EmptyRequestBody.getInstance()).get(30L, TimeUnit.SECONDS);
    }

    public AggregatedMetricsResponseBody getMetricList(TestEnvironment.Endpoint endpoint, JobID jobID, JobVertexID jobVertexID) throws Exception {
        AggregatedSubtaskMetricsParameters aggregatedSubtaskMetricsParameters = new AggregatedSubtaskMetricsParameters();
        Iterator it = aggregatedSubtaskMetricsParameters.getPathParameters().iterator();
        ((JobIDPathParameter) it.next()).resolve(jobID);
        ((JobVertexIdPathParameter) it.next()).resolve(jobVertexID);
        return (AggregatedMetricsResponseBody) this.restClient.sendRequest(endpoint.getAddress(), endpoint.getPort(), AggregatedSubtaskMetricsHeaders.getInstance(), aggregatedSubtaskMetricsParameters, EmptyRequestBody.getInstance()).get(30L, TimeUnit.SECONDS);
    }

    public AggregatedMetricsResponseBody getMetrics(TestEnvironment.Endpoint endpoint, JobID jobID, JobVertexID jobVertexID, String str) throws Exception {
        AggregatedSubtaskMetricsParameters aggregatedSubtaskMetricsParameters = new AggregatedSubtaskMetricsParameters();
        Iterator it = aggregatedSubtaskMetricsParameters.getPathParameters().iterator();
        ((JobIDPathParameter) it.next()).resolve(jobID);
        ((JobVertexIdPathParameter) it.next()).resolve(jobVertexID);
        ((MetricsFilterParameter) aggregatedSubtaskMetricsParameters.getQueryParameters().iterator().next()).resolveFromString(str);
        return (AggregatedMetricsResponseBody) this.restClient.sendRequest(endpoint.getAddress(), endpoint.getPort(), AggregatedSubtaskMetricsHeaders.getInstance(), aggregatedSubtaskMetricsParameters, EmptyRequestBody.getInstance()).get(30L, TimeUnit.SECONDS);
    }

    public Double getAggregatedMetricsByRestAPI(TestEnvironment.Endpoint endpoint, JobID jobID, String str, String str2, String str3) throws Exception {
        JobDetailsInfo.JobVertexDetailsInfo jobVertexDetailsInfo = (JobDetailsInfo.JobVertexDetailsInfo) getJobDetails(this.restClient, endpoint, jobID).getJobVertexInfos().stream().filter(jobVertexDetailsInfo2 -> {
            return jobVertexDetailsInfo2.getName().contains(str);
        }).findAny().orElse(null);
        AssertionsForClassTypes.assertThat(jobVertexDetailsInfo).isNotNull();
        JobVertexID jobVertexID = jobVertexDetailsInfo.getJobVertexID();
        String str4 = (String) getMetricList(endpoint, jobID, jobVertexID).getMetrics().stream().filter(aggregatedMetric -> {
            return filterByMetricName(aggregatedMetric.getId(), str, str2, str3);
        }).map(aggregatedMetric2 -> {
            return aggregatedMetric2.getId();
        }).collect(Collectors.joining(","));
        if (StringUtils.isNullOrWhitespaceOnly(str4)) {
            throw new IllegalStateException(String.format("Cannot find metric[%s] for operator [%s].", str2, str));
        }
        Collection metrics = getMetrics(endpoint, jobID, jobVertexID, str4).getMetrics();
        if (metrics == null || metrics.isEmpty()) {
            throw new IllegalStateException(String.format("Cannot find metric[%s] for operator [%s] with filter [%s].", str2, str, str3));
        }
        return ((AggregatedMetric) metrics.iterator().next()).getSum();
    }

    private boolean filterByMetricName(String str, String str2, String str3, @Nullable String str4) {
        boolean z = str.endsWith(str3) && str.contains(str2);
        return !StringUtils.isNullOrWhitespaceOnly(str4) ? z && str.contains(str4) : z;
    }
}
