package uk.gov.gchq.gaffer.flink.operation.handler;

import java.util.Properties;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.operation.impl.add.AddElementsFromKafka;
import uk.gov.gchq.gaffer.store.Context;
import uk.gov.gchq.gaffer.store.Store;
import uk.gov.gchq.gaffer.store.operation.handler.OperationHandler;

/* loaded from: input_file:uk/gov/gchq/gaffer/flink/operation/handler/AddElementsFromKafkaHandler.class */
public class AddElementsFromKafkaHandler implements OperationHandler<AddElementsFromKafka> {
    private static final String FLINK_KAFKA_BOOTSTRAP_SERVERS = "bootstrap.servers";
    private static final String FLINK_KAFKA_GROUP_ID = "group.id";

    public Object doOperation(AddElementsFromKafka addElementsFromKafka, Context context, Store store) throws OperationException {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        if (null != addElementsFromKafka.getParallelism()) {
            executionEnvironment.setParallelism(addElementsFromKafka.getParallelism().intValue());
        }
        executionEnvironment.addSource(new FlinkKafkaConsumer010(addElementsFromKafka.getTopic(), new SimpleStringSchema(), createFlinkProperties(addElementsFromKafka))).map(new GafferMapFunction(addElementsFromKafka.getElementGenerator())).returns(GafferMapFunction.RETURN_CLASS).rebalance().addSink(new GafferSink(addElementsFromKafka, store));
        try {
            executionEnvironment.execute(addElementsFromKafka.getClass().getSimpleName() + "-" + addElementsFromKafka.getGroupId() + "-" + addElementsFromKafka.getTopic());
            return null;
        } catch (Exception e) {
            throw new OperationException("Failed to add elements from kafka topic: " + addElementsFromKafka.getTopic(), e);
        }
    }

    private Properties createFlinkProperties(AddElementsFromKafka addElementsFromKafka) {
        Properties properties = new Properties();
        if (null != addElementsFromKafka.getOptions()) {
            properties.putAll(addElementsFromKafka.getOptions());
        }
        properties.put(FLINK_KAFKA_GROUP_ID, addElementsFromKafka.getGroupId());
        properties.put(FLINK_KAFKA_BOOTSTRAP_SERVERS, StringUtils.join(addElementsFromKafka.getBootstrapServers(), ","));
        return properties;
    }
}
