package uk.gov.gchq.gaffer.flink.operation.handler;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import uk.gov.gchq.gaffer.flink.operation.handler.util.FlinkConstants;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.operation.impl.add.AddElementsFromSocket;
import uk.gov.gchq.gaffer.store.Context;
import uk.gov.gchq.gaffer.store.Store;
import uk.gov.gchq.gaffer.store.operation.handler.OperationHandler;

/* loaded from: input_file:uk/gov/gchq/gaffer/flink/operation/handler/AddElementsFromSocketHandler.class */
public class AddElementsFromSocketHandler implements OperationHandler<AddElementsFromSocket> {
    public Object doOperation(AddElementsFromSocket addElementsFromSocket, Context context, Store store) throws OperationException {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        if (null != addElementsFromSocket.getParallelism()) {
            executionEnvironment.setParallelism(addElementsFromSocket.getParallelism().intValue());
        }
        SingleOutputStreamOperator flatMap = executionEnvironment.socketTextStream(addElementsFromSocket.getHostname(), addElementsFromSocket.getPort(), addElementsFromSocket.getDelimiter()).flatMap(new GafferMapFunction(String.class, addElementsFromSocket.getElementGenerator()));
        if (Boolean.parseBoolean(addElementsFromSocket.getOption(FlinkConstants.SKIP_REBALANCING))) {
            flatMap.addSink(new GafferSink(addElementsFromSocket, store));
        } else {
            flatMap.rebalance().addSink(new GafferSink(addElementsFromSocket, store));
        }
        try {
            executionEnvironment.execute(addElementsFromSocket.getClass().getSimpleName() + "-" + addElementsFromSocket.getHostname() + ":" + addElementsFromSocket.getPort());
            return null;
        } catch (Exception e) {
            throw new OperationException("Failed to add elements from port: " + addElementsFromSocket.getPort(), e);
        }
    }
}
