package group.insyde.statefun.tsukuyomi.dispatcher.job;

import group.insyde.statefun.tsukuyomi.dispatcher.config.DispatcherConfig;
import java.lang.invoke.SerializedLambda;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
import org.apache.flink.statefun.flink.datastream.RequestReplyFunctionBuilder;
import org.apache.flink.statefun.flink.datastream.StatefulFunctionDataStreamBuilder;
import org.apache.flink.statefun.flink.datastream.StatefulFunctionEgressStreams;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:group/insyde/statefun/tsukuyomi/dispatcher/job/DispatcherJob.class */
public class DispatcherJob implements FlinkDispatcherJob {
    private static final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
    static final EgressIdentifier<TypedValue> CAPTURED_MESSAGES = new EgressIdentifier<>("group.insyde.statefun.tsukuyomi", "captured-messages", TypedValue.class);
    private final DispatcherConfig config;

    public JobClient start() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStateBackend(new EmbeddedRocksDBStateBackend());
        SingleOutputStreamOperator map = executionEnvironment.addSource(new DispatcherSocketSource()).map((v0) -> {
            return v0.toRoutableMessage();
        });
        StatefulFunctionsConfig fromEnvironment = StatefulFunctionsConfig.fromEnvironment(executionEnvironment);
        fromEnvironment.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);
        StatefulFunctionDataStreamBuilder withConfiguration = StatefulFunctionDataStreamBuilder.builder("statefun").withDataStreamAsIngress(map).withEgressId(CAPTURED_MESSAGES).withConfiguration(fromEnvironment);
        bindFunctions(this.config, withConfiguration);
        Set<EgressIdentifier<TypedValue>> egressIdentifiers = this.config.getEgressIdentifiers();
        Objects.requireNonNull(withConfiguration);
        egressIdentifiers.forEach(withConfiguration::withEgressId);
        StatefulFunctionEgressStreams build = withConfiguration.build(executionEnvironment);
        ((DataStream) egressIdentifiers.stream().peek(egressIdentifier -> {
            log.info("Configuring custom egress {}/{}", egressIdentifier.namespace(), egressIdentifier.name());
        }).map(egressIdentifier2 -> {
            return build.getDataStreamForEgressId(egressIdentifier2).map(typedValue -> {
                log.info("Captured a message sent to egress {}/{}", egressIdentifier2.namespace(), egressIdentifier2.name());
                return Envelope.builder().to(egressIdentifier2.namespace(), egressIdentifier2.name()).data(typedValue).build();
            });
        }).map(singleOutputStreamOperator -> {
            return singleOutputStreamOperator;
        }).reduce(build.getDataStreamForEgressId(CAPTURED_MESSAGES).map(typedValue -> {
            return Envelope.fromJson(new String(typedValue.getValue().toByteArray(), StandardCharsets.UTF_8));
        }), (obj, dataStream) -> {
            return ((DataStream) obj).union(new DataStream[]{dataStream});
        })).addSink(new DispatcherSocketSink());
        return executionEnvironment.executeAsync("statefun-tsukuyomi");
    }

    private void bindFunctions(DispatcherConfig dispatcherConfig, StatefulFunctionDataStreamBuilder statefulFunctionDataStreamBuilder) {
        Iterator<FunctionType> it = dispatcherConfig.getFunctionTypes().iterator();
        while (it.hasNext()) {
            statefulFunctionDataStreamBuilder.withRequestReplyRemoteFunction(RequestReplyFunctionBuilder.requestReplyFunctionBuilder(it.next(), dispatcherConfig.getEndpoint()));
        }
    }

    private DispatcherJob(DispatcherConfig dispatcherConfig) {
        this.config = dispatcherConfig;
    }

    public static DispatcherJob of(DispatcherConfig dispatcherConfig) {
        return new DispatcherJob(dispatcherConfig);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1532407242:
                if (implMethodName.equals("toRoutableMessage")) {
                    z = 2;
                    break;
                }
                break;
            case 563999473:
                if (implMethodName.equals("lambda$start$3d74dd1$1")) {
                    z = true;
                    break;
                }
                break;
            case 1313754586:
                if (implMethodName.equals("lambda$start$9f4d154a$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("group/insyde/statefun/tsukuyomi/dispatcher/job/DispatcherJob") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/statefun/sdk/io/EgressIdentifier;Lorg/apache/flink/statefun/sdk/reqreply/generated/TypedValue;)Lgroup/insyde/statefun/tsukuyomi/dispatcher/job/Envelope;")) {
                    EgressIdentifier egressIdentifier = (EgressIdentifier) serializedLambda.getCapturedArg(0);
                    return typedValue -> {
                        log.info("Captured a message sent to egress {}/{}", egressIdentifier.namespace(), egressIdentifier.name());
                        return Envelope.builder().to(egressIdentifier.namespace(), egressIdentifier.name()).data(typedValue).build();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("group/insyde/statefun/tsukuyomi/dispatcher/job/DispatcherJob") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/statefun/sdk/reqreply/generated/TypedValue;)Lgroup/insyde/statefun/tsukuyomi/dispatcher/job/Envelope;")) {
                    return typedValue2 -> {
                        return Envelope.fromJson(new String(typedValue2.getValue().toByteArray(), StandardCharsets.UTF_8));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("group/insyde/statefun/tsukuyomi/dispatcher/job/Envelope") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/flink/statefun/flink/core/message/RoutableMessage;")) {
                    return (v0) -> {
                        return v0.toRoutableMessage();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
