package net.pincette.jes;

import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import javax.json.JsonObject;
import javax.json.JsonValue;
import net.pincette.jes.util.Event;
import net.pincette.json.JsonUtil;
import net.pincette.rs.Chain;
import net.pincette.rs.Util;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.reactivestreams.Publisher;

/* loaded from: input_file:net/pincette/jes/Reactor.class */
public class Reactor {
    private StreamsBuilder builder;
    private String destinationType;
    private GetDestinations destinations;
    private String environment = "dev";
    private EventToCommand eventToCommand;
    private Predicate<JsonObject> filter;
    private String sourceType;

    public static EventToCommand eventToCommand(UnaryOperator<JsonObject> unaryOperator) {
        return jsonObject -> {
            return CompletableFuture.completedFuture((JsonObject) unaryOperator.apply(jsonObject));
        };
    }

    public StreamsBuilder build() {
        this.builder.stream(this.sourceType + "-event-full-" + this.environment).filter(this::filterEvent).flatMap(this::createCommands).to(this.destinationType + "-command-" + this.environment);
        return this.builder;
    }

    private JsonObject completeCommand(String str, JsonObject jsonObject, JsonObject jsonObject2) {
        return JsonUtil.createObjectBuilder(jsonObject2).add("_id", str).add("_type", this.destinationType).add("_corr", jsonObject.getString("_corr")).add("_jwt", (JsonValue) Optional.ofNullable(jsonObject.getJsonObject("_jwt")).orElseGet(JsonUtil::emptyObject)).add("_timestamp", Instant.now().toEpochMilli()).build();
    }

    private Iterable<KeyValue<String, JsonObject>> createCommands(String str, JsonObject jsonObject) {
        return Util.iterate((Publisher) net.pincette.util.Util.tryToGetRethrow(() -> {
            return this.eventToCommand.apply(jsonObject).toCompletableFuture().get();
        }).filter(jsonObject2 -> {
            return jsonObject2.containsKey("_command");
        }).map(jsonObject3 -> {
            return Chain.with(this.destinations.apply(jsonObject)).map(jsonObject3 -> {
                return jsonObject3.getString("_id", (String) null);
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).map(str2 -> {
                return new KeyValue(str2, completeCommand(str2, jsonObject, jsonObject3));
            }).get();
        }).orElseGet(Util::empty));
    }

    private boolean filterEvent(String str, JsonObject jsonObject) {
        return Event.isEvent(jsonObject) && jsonObject.containsKey("_corr") && (this.filter == null || this.filter.test(jsonObject));
    }

    public Reactor withBuilder(StreamsBuilder streamsBuilder) {
        this.builder = streamsBuilder;
        return this;
    }

    public Reactor withDestinationType(String str) {
        this.destinationType = str;
        return this;
    }

    public Reactor withDestinations(GetDestinations getDestinations) {
        this.destinations = getDestinations;
        return this;
    }

    public Reactor withEnvironment(String str) {
        this.environment = str;
        return this;
    }

    public Reactor withEventToCommand(EventToCommand eventToCommand) {
        this.eventToCommand = eventToCommand;
        return this;
    }

    public Reactor withFilter(Predicate<JsonObject> predicate) {
        this.filter = predicate;
        return this;
    }

    public Reactor withSourceType(String str) {
        this.sourceType = str;
        return this;
    }
}
