package uk.gov.gchq.gaffer.flink.integration.operation.handler;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.ServerSocket;
import java.util.Iterator;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.QueryExp;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import org.apache.curator.test.TestingServer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.jmx.JMXReporter;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.gov.gchq.gaffer.data.element.Element;
import uk.gov.gchq.gaffer.flink.operation.FlinkTest;
import uk.gov.gchq.gaffer.flink.operation.TestFileSink;
import uk.gov.gchq.gaffer.flink.operation.handler.AddElementsFromKafkaHandler;
import uk.gov.gchq.gaffer.generator.TestBytesGeneratorImpl;
import uk.gov.gchq.gaffer.generator.TestGeneratorImpl;
import uk.gov.gchq.gaffer.graph.Graph;
import uk.gov.gchq.gaffer.mapstore.MapStore;
import uk.gov.gchq.gaffer.mapstore.MapStoreProperties;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.operation.impl.add.AddElementsFromKafka;
import uk.gov.gchq.gaffer.store.Store;
import uk.gov.gchq.gaffer.user.User;

/* loaded from: input_file:uk/gov/gchq/gaffer/flink/integration/operation/handler/AddElementsFromKafkaHandlerIT.class */
public class AddElementsFromKafkaHandlerIT extends FlinkTest {
    private static final long TEST_TIMEOUT_MS = 10000;
    private static final long WAIT_MS = 1000;
    private static final String GROUP_ID = "groupId";
    private final MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
    private KafkaProducer<Integer, String> producer;
    private KafkaServer kafkaServer;
    private TestingServer zkServer;
    private String bootstrapServers;
    private TestFileSink testFileSink;
    private static final Logger LOGGER = LoggerFactory.getLogger(AddElementsFromKafkaHandlerIT.class);
    private static final String TOPIC = UUID.randomUUID().toString();

    @BeforeEach
    public void before() throws Exception {
        this.bootstrapServers = "localhost:" + getOpenPort();
        this.zkServer = new TestingServer(-1, createTemporaryDirectory("zkTmpDir"));
        this.zkServer.start();
        this.testFileSink = createTestFileSink();
        this.kafkaServer = TestUtils.createServer(new KafkaConfig(serverProperties()), new MockTime());
        MapStore.resetStaticMap();
    }

    @AfterEach
    public void cleanUp() throws IOException {
        if (null != this.producer) {
            this.producer.close();
        }
        if (null != this.kafkaServer) {
            this.kafkaServer.shutdown();
        }
        if (null != this.zkServer) {
            this.zkServer.close();
        }
        unregisterMBeans();
    }

    @Timeout(value = TEST_TIMEOUT_MS, unit = TimeUnit.MILLISECONDS)
    @Test
    public void shouldAddElementsWithStringConsumer() throws Exception {
        shouldAddElements(String.class, TestGeneratorImpl.class, StringSerializer.class);
    }

    @Timeout(value = TEST_TIMEOUT_MS, unit = TimeUnit.MILLISECONDS)
    @Test
    public void shouldAddElementsWithByteArrayConsumer() throws Exception {
        shouldAddElements(byte[].class, TestBytesGeneratorImpl.class, ByteArraySerializer.class);
    }

    protected <T> void shouldAddElements(Class<T> cls, Class<? extends Function<Iterable<? extends T>, Iterable<? extends Element>>> cls2, Class<? extends Serializer> cls3) throws Exception {
        Graph createGraph = createGraph();
        AddElementsFromKafka build = new AddElementsFromKafka.Builder().generator(cls, cls2).parallelism(1).validate(true).skipInvalidElements(false).topic(TOPIC).bootstrapServers(new String[]{this.bootstrapServers}).groupId(GROUP_ID).build();
        new Thread(() -> {
            try {
                createGraph.execute(build, new User());
            } catch (OperationException e) {
                throw new RuntimeException((Throwable) e);
            }
        }).start();
        waitForOperationToStart();
        this.producer = new KafkaProducer<>(producerProps(cls3));
        for (String str : DATA_VALUES) {
            this.producer.send(new ProducerRecord(TOPIC, convert(cls, str))).get();
        }
        waitForElements(cls, cls2);
        verifyElements(cls, this.testFileSink, cls2);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <T> T convert(Class<T> cls, String str) {
        return cls.equals(String.class) ? str : (T) str.getBytes();
    }

    private Properties producerProps(Class<? extends Serializer> cls) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.bootstrapServers);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", cls);
        return properties;
    }

    private Properties serverProperties() throws IOException {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", this.zkServer.getConnectString());
        properties.put("broker.id", "0");
        properties.setProperty("listeners", "PLAINTEXT://" + this.bootstrapServers);
        properties.put(KafkaConfig.LogDirProp(), createTemporaryDirectory("kafkaLogDir").getPath());
        return properties;
    }

    private static int getOpenPort() {
        try {
            ServerSocket serverSocket = new ServerSocket(0);
            Throwable th = null;
            try {
                int localPort = serverSocket.getLocalPort();
                if (serverSocket != null) {
                    if (0 != 0) {
                        try {
                            serverSocket.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        serverSocket.close();
                    }
                }
                return localPort;
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void waitForOperationToStart() throws Exception {
        while (!consumerConnected()) {
            LOGGER.info("Waiting for Operation to start, sleeping for {} ms", Long.valueOf(WAIT_MS));
            Thread.sleep(WAIT_MS);
        }
    }

    private <T> void waitForElements(Class<T> cls, Class<? extends Function<Iterable<? extends T>, Iterable<? extends Element>>> cls2) throws Exception {
        while (!waitForElements(cls, this.testFileSink, cls2)) {
            LOGGER.info("Waiting for Elements to be stored, sleeping for {} ms", Long.valueOf(WAIT_MS));
            Thread.sleep(WAIT_MS);
        }
    }

    @Override // uk.gov.gchq.gaffer.flink.operation.FlinkTest
    public Store createStore() {
        Store createStore = Store.createStore("graphId", SCHEMA, MapStoreProperties.loadStoreProperties("store.properties"));
        createStore.addOperationHandler(AddElementsFromKafka.class, new AddElementsFromKafkaHandler(createJmxEnabledExecutionEnvironment(), this.testFileSink));
        return createStore;
    }

    private StreamExecutionEnvironment createJmxEnabledExecutionEnvironment() {
        Configuration configuration = new Configuration();
        configuration.setString("metrics.reporters", "jmx");
        configuration.setString("metrics.reporter.jmx.class", JMXReporter.class.getName());
        return StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
    }

    private void unregisterMBeans() {
        Iterator it = this.platformMBeanServer.queryNames((ObjectName) null, (QueryExp) null).iterator();
        while (it.hasNext()) {
            try {
                this.platformMBeanServer.unregisterMBean((ObjectName) it.next());
            } catch (Exception e) {
            }
        }
    }

    private boolean consumerConnected() throws Exception {
        return !this.platformMBeanServer.queryNames(new ObjectName("kafka.consumer:type=consumer-fetch-manager-metrics,*,topic=".concat(TOPIC)), (QueryExp) null).isEmpty();
    }
}
