package uk.gov.gchq.gaffer.flink.integration.operation.handler;

import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import org.apache.curator.test.TestingServer;
import org.apache.flink.testutils.junit.RetryRule;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import uk.gov.gchq.gaffer.commonutil.CommonTestConstants;
import uk.gov.gchq.gaffer.flink.operation.FlinkTest;
import uk.gov.gchq.gaffer.generator.TestGeneratorImpl;
import uk.gov.gchq.gaffer.graph.Graph;
import uk.gov.gchq.gaffer.mapstore.MapStore;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.operation.impl.add.AddElementsFromKafka;
import uk.gov.gchq.gaffer.user.User;

/* loaded from: input_file:uk/gov/gchq/gaffer/flink/integration/operation/handler/AddElementsFromKafkaHandlerIT.class */
public class AddElementsFromKafkaHandlerIT extends FlinkTest {
    private static final String TOPIC = UUID.randomUUID().toString();
    private static final String BOOTSTRAP_SERVERS = "localhost:" + getOpenPort();

    @Rule
    public final TemporaryFolder testFolder = new TemporaryFolder(CommonTestConstants.TMP_DIRECTORY);

    @Rule
    public final RetryRule rule = new RetryRule();
    private KafkaProducer<Integer, String> producer;
    private KafkaServer kafkaServer;
    private TestingServer zkServer;

    @Before
    public void before() throws Exception {
        this.zkServer = new TestingServer(-1, createZookeeperTmpDir());
        this.zkServer.start();
        this.kafkaServer = TestUtils.createServer(new KafkaConfig(serverProperties()), new MockTime());
        MapStore.resetStaticMap();
    }

    @After
    public void cleanUp() throws IOException {
        if (null != this.producer) {
            this.producer.close();
        }
        if (null != this.kafkaServer) {
            this.kafkaServer.shutdown();
        }
        if (null != this.zkServer) {
            this.zkServer.close();
        }
    }

    @Test
    public void shouldAddElements() throws Exception {
        Graph createGraph = createGraph();
        AddElementsFromKafka build = new AddElementsFromKafka.Builder().generator(TestGeneratorImpl.class).parallelism(1).validate(true).skipInvalidElements(false).topic(TOPIC).bootstrapServers(new String[]{BOOTSTRAP_SERVERS}).groupId("groupId").build();
        new Thread(() -> {
            try {
                createGraph.execute(build, new User());
            } catch (OperationException e) {
                throw new RuntimeException((Throwable) e);
            }
        }).start();
        Thread.sleep(30000L);
        new Thread(() -> {
            this.producer = new KafkaProducer<>(producerProps());
            for (String str : DATA_VALUES) {
                try {
                    this.producer.send(new ProducerRecord(TOPIC, str)).get();
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
        Thread.sleep(5000L);
        try {
            verifyElements(createGraph);
        } catch (AssertionError e) {
            Thread.sleep(30000L);
            verifyElements(createGraph);
        }
    }

    private File createZookeeperTmpDir() throws IOException {
        this.testFolder.delete();
        this.testFolder.create();
        return this.testFolder.newFolder("zkTmpDir");
    }

    private Properties producerProps() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return properties;
    }

    private Properties serverProperties() {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", this.zkServer.getConnectString());
        properties.put("broker.id", "0");
        properties.setProperty("listeners", "PLAINTEXT://" + BOOTSTRAP_SERVERS);
        return properties;
    }

    private static int getOpenPort() {
        try {
            ServerSocket serverSocket = new ServerSocket(0);
            Throwable th = null;
            try {
                int localPort = serverSocket.getLocalPort();
                if (serverSocket != null) {
                    if (0 != 0) {
                        try {
                            serverSocket.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        serverSocket.close();
                    }
                }
                return localPort;
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
