package net.pincette.jes;

import com.mongodb.ReadConcern;
import com.mongodb.WriteConcern;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Projections;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.JsonValue;
import net.pincette.function.SideEffect;
import net.pincette.jes.util.Command;
import net.pincette.jes.util.Event;
import net.pincette.jes.util.Mongo;
import net.pincette.jes.util.Reducer;
import net.pincette.jes.util.Streams;
import net.pincette.json.JsonUtil;
import net.pincette.mongo.BsonUtil;
import net.pincette.mongo.Collection;
import net.pincette.mongo.Expression;
import net.pincette.mongo.JsonClient;
import net.pincette.util.Builder;
import net.pincette.util.Collections;
import net.pincette.util.Or;
import net.pincette.util.Pair;
import net.pincette.util.TimedCache;
import net.pincette.util.Util;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.bson.Document;
import org.bson.conversions.Bson;

/* loaded from: input_file:net/pincette/jes/Aggregate.class */
public class Aggregate {
    private static final String AGGREGATE_TOPIC = "aggregate";
    private static final String COMMAND_TOPIC = "command";
    private static final Duration DUPLICATE_WINDOW;
    private static final String EVENT_TOPIC = "event";
    private static final String EVENT_FULL_TOPIC = "event-full";
    private static final String EXCEPTION = "exception";
    private static final String MONITOR_TOPIC = "monitor";
    private static final String REDUCER_COMMAND = "command";
    private static final String REDUCER_STATE = "state";
    private static final String REPLY_TOPIC = "reply";
    private static final String STEP = "step";
    private static final String STEP_AFTER = "after";
    private static final String STEP_COMMAND = "command";
    private static final String STEP_ERROR = "error";
    private static final String STEP_TIMESTAMP = "timestamp";
    private static final Set<String> TECHNICAL_FIELDS;
    private static final String UNIQUE_TOPIC = "unique";
    private String auditTopic;
    private MongoCollection<Document> aggregateCollection;
    private KStream<String, JsonObject> aggregates;
    private String app;
    private boolean breakingTheGlass;
    private StreamsBuilder builder;
    private StreamProcessor commandProcessor;
    private KStream<String, JsonObject> commands;
    private MongoDatabase database;
    private MongoDatabase databaseArchive;
    private MongoCollection<Document> eventCollection;
    private KStream<String, JsonObject> events;
    private KStream<String, JsonObject> eventsFull;
    private KStream<String, JsonObject> monitor;
    private boolean monitoring;
    private Reducer reducer;
    private KStream<String, JsonObject> replies;
    private String type;
    private JsonValue uniqueExpression;
    private Function<JsonObject, JsonValue> uniqueFunction;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Map<String, Reducer> reducers = new HashMap();
    private final TimedCache<String, JsonObject> aggregateCache = new TimedCache<>(DUPLICATE_WINDOW);
    private String environment = "dev";

    public Aggregate() {
        withReducer("delete", (jsonObject, jsonObject2) -> {
            return delete(jsonObject2);
        });
        withReducer("patch", Aggregate::patch);
        withReducer("put", (jsonObject3, jsonObject4) -> {
            return put(jsonObject3);
        });
    }

    private static JsonObject accessError(JsonObject jsonObject) {
        return JsonUtil.createObjectBuilder(jsonObject).add("_error", true).add("_statusCode", 403).add("message", "Forbidden").build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String commandDuplicateKey(JsonObject jsonObject) {
        return jsonObject.getString("_id") + jsonObject.getString("_corr") + jsonObject.getString("_command");
    }

    private static JsonObject completeCommand(JsonObject jsonObject) {
        return !jsonObject.containsKey("_timestamp") ? JsonUtil.createObjectBuilder(jsonObject).add("_timestamp", ((Long) Optional.ofNullable(jsonObject.getJsonNumber("_timestamp")).map((v0) -> {
            return v0.longValue();
        }).orElseGet(() -> {
            return Long.valueOf(Instant.now().toEpochMilli());
        })).longValue()).build() : jsonObject;
    }

    private static JsonObjectBuilder createAfter(JsonObject jsonObject, JsonObject jsonObject2, String str, int i, long j) {
        return (JsonObjectBuilder) Builder.create(() -> {
            return JsonUtil.createObjectBuilder(jsonObject).add("_corr", str).add("_seq", i).add("_timestamp", j).remove("_jwt");
        }).updateIf(jsonObjectBuilder -> {
            return jsonObject2.containsKey("_jwt");
        }, jsonObjectBuilder2 -> {
            jsonObjectBuilder2.add("_jwt", jsonObject2.getJsonObject("_jwt"));
        }).build();
    }

    private static JsonObject createAggregateMessage(JsonObject jsonObject) {
        return JsonUtil.createObjectBuilder().add("_type", jsonObject.getString("_type")).add("_jwt", jsonObject.getJsonObject("_jwt")).build();
    }

    private static JsonObject createError(JsonObject jsonObject, long j) {
        return JsonUtil.createObjectBuilder().add(STEP, "error").add("command", jsonObject).add(STEP_TIMESTAMP, j).build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static JsonObject createEvent(JsonObject jsonObject, JsonObject jsonObject2, JsonObject jsonObject3, JsonArray jsonArray) {
        String string = jsonObject3.getString("_corr");
        long epochMilli = Instant.now().toEpochMilli();
        int i = jsonObject.getInt("_seq", -1) + 1;
        return ((JsonObjectBuilder) Builder.create(() -> {
            return JsonUtil.createObjectBuilder().add("_corr", string).add("_id", jsonObject2.getString("_id").toLowerCase()).add("_type", jsonObject2.getString("_type")).add("_seq", i).add("_command", jsonObject3.getString("_command")).add("_timestamp", epochMilli).add("_before", jsonObject).add("_after", createAfter(jsonObject2, jsonObject3, string, i, epochMilli)).add("_ops", jsonArray);
        }).updateIf(jsonObjectBuilder -> {
            return jsonObject3.containsKey("_languages");
        }, jsonObjectBuilder2 -> {
            jsonObjectBuilder2.add("_languages", jsonObject3.getJsonArray("_languages"));
        }).updateIf(jsonObjectBuilder3 -> {
            return jsonObject3.containsKey("_jwt");
        }, jsonObjectBuilder4 -> {
            jsonObjectBuilder4.add("_jwt", jsonObject3.getJsonObject("_jwt"));
        }).build()).build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static JsonArray createOps(JsonObject jsonObject, JsonObject jsonObject2) {
        return JsonUtil.createArrayBuilder(JsonUtil.createDiff(removeTechnical(jsonObject).build(), removeTechnical(jsonObject2).build()).toJsonArray()).build();
    }

    private static JsonObject createSource(JsonObject jsonObject, JsonObject jsonObject2) {
        return JsonUtil.createObjectBuilder().add(REDUCER_STATE, jsonObject2).add("command", jsonObject).build();
    }

    private static JsonObject createStep(String str, String str2, long j) {
        return createStep(str, str2, j, null);
    }

    private static JsonObject createStep(String str, String str2, long j, String str3) {
        return ((JsonObjectBuilder) Builder.create(JsonUtil::createObjectBuilder).update(jsonObjectBuilder -> {
            jsonObjectBuilder.add(STEP, str);
        }).update(jsonObjectBuilder2 -> {
            jsonObjectBuilder2.add(STEP_TIMESTAMP, j);
        }).updateIf(jsonObjectBuilder3 -> {
            return str2 != null;
        }, jsonObjectBuilder4 -> {
            jsonObjectBuilder4.add(STEP_AFTER, str2);
        }).updateIf(jsonObjectBuilder5 -> {
            return str3 != null;
        }, jsonObjectBuilder6 -> {
            jsonObjectBuilder6.add("command", str3);
        }).build()).build();
    }

    public static CompletionStage<JsonObject> delete(JsonObject jsonObject) {
        return CompletableFuture.completedFuture(JsonUtil.createObjectBuilder(jsonObject).add("_deleted", true).build());
    }

    private static KStream<String, JsonObject> errors(KStream<String, JsonObject> kStream) {
        return kStream.filter((str, jsonObject) -> {
            return Command.isCommand(jsonObject) && Command.hasError(jsonObject);
        });
    }

    private static String generateSeq(long j) {
        return pad(String.valueOf(j), '0', 12);
    }

    private static JsonObject idsToLowerCase(JsonObject jsonObject) {
        return JsonUtil.createObjectBuilder(jsonObject).add("_id", jsonObject.getString("_id").toLowerCase()).add("_corr", jsonObject.getString("_corr").toLowerCase()).build();
    }

    private static JsonObject makeManaged(JsonObject jsonObject, JsonObject jsonObject2) {
        return ((JsonObjectBuilder) Builder.create(() -> {
            return JsonUtil.createObjectBuilder(jsonObject);
        }).updateIf(jsonObjectBuilder -> {
            return !jsonObject.containsKey("_id");
        }, jsonObjectBuilder2 -> {
            jsonObjectBuilder2.add("_id", jsonObject2.getString("_id"));
        }).updateIf(jsonObjectBuilder3 -> {
            return !jsonObject.containsKey("_type");
        }, jsonObjectBuilder4 -> {
            jsonObjectBuilder4.add("_type", jsonObject2.getString("_type"));
        }).build()).build();
    }

    private static String mongoEventKey(JsonObject jsonObject) {
        return mongoEventKey(jsonObject, jsonObject.getJsonNumber("_seq").longValue());
    }

    private static String mongoEventKey(JsonObject jsonObject, long j) {
        return jsonObject.getString("_id") + "-" + generateSeq(j);
    }

    private static String pad(String str, char c, int i) {
        return str.length() >= i ? str : new String(pad(c, i - str.length())) + str;
    }

    private static char[] pad(char c, int i) {
        char[] cArr = new char[i];
        Arrays.fill(cArr, c);
        return cArr;
    }

    public static CompletionStage<JsonObject> patch(JsonObject jsonObject, JsonObject jsonObject2) {
        return CompletableFuture.completedFuture((JsonObject) Optional.ofNullable(jsonObject.getJsonArray("_ops")).map(jsonArray -> {
            return JsonUtil.createPatch(jsonArray).apply(jsonObject2);
        }).orElse(jsonObject2));
    }

    private static JsonObject plainEvent(JsonObject jsonObject) {
        return JsonUtil.createObjectBuilder(jsonObject).remove("_after").remove("_before").build();
    }

    public static CompletionStage<JsonObject> put(JsonObject jsonObject) {
        return CompletableFuture.completedFuture(JsonUtil.createObjectBuilder(jsonObject).remove("_command").build());
    }

    public static Reducer reducer(BinaryOperator<JsonObject> binaryOperator) {
        return (jsonObject, jsonObject2) -> {
            return CompletableFuture.completedFuture((JsonObject) binaryOperator.apply(jsonObject, jsonObject2));
        };
    }

    @SafeVarargs
    public static Reducer reducer(UnaryOperator<JsonObject>... unaryOperatorArr) {
        UnaryOperator unaryOperator = (UnaryOperator) Arrays.stream(unaryOperatorArr).reduce(jsonObject -> {
            return jsonObject;
        }, (unaryOperator2, unaryOperator3) -> {
            return jsonObject2 -> {
                return (JsonObject) unaryOperator3.apply((JsonObject) unaryOperator2.apply(jsonObject2));
            };
        });
        return (jsonObject2, jsonObject3) -> {
            return CompletableFuture.completedFuture((JsonObject) unaryOperator.apply(createSource(jsonObject2, jsonObject3)));
        };
    }

    private static JsonObjectBuilder removeTechnical(JsonObject jsonObject) {
        return (JsonObjectBuilder) TECHNICAL_FIELDS.stream().reduce(JsonUtil.createObjectBuilder(jsonObject), (v0, v1) -> {
            return v0.remove(v1);
        }, (jsonObjectBuilder, jsonObjectBuilder2) -> {
            return jsonObjectBuilder;
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static JsonObject setException(JsonObject jsonObject, Exception exc) {
        return JsonUtil.createObjectBuilder(jsonObject).add("_error", true).add(EXCEPTION, Util.getStackTrace(exc)).build();
    }

    private static JsonObject setId(JsonObject jsonObject, String str) {
        return JsonUtil.createObjectBuilder(jsonObject).add("_id", str).build();
    }

    private static JsonObject uniqueError(JsonObject jsonObject) {
        return JsonUtil.createObjectBuilder(jsonObject).add("_error", true).add("_statusCode", 400).add("message", "Missing unique expression fields").build();
    }

    public KStream<String, JsonObject> aggregates() {
        return this.aggregates;
    }

    private void aggregates(KStream<String, JsonObject> kStream) {
        this.aggregates = kStream.filter((str, jsonObject) -> {
            return Event.isEvent(jsonObject) && jsonObject.containsKey("_after");
        }).mapValues(jsonObject2 -> {
            return jsonObject2.getJsonObject("_after");
        });
        this.aggregates.to(topic(AGGREGATE_TOPIC));
    }

    public String app() {
        return this.app;
    }

    private void audit() {
        if (this.auditTopic != null) {
            this.events.mapValues(jsonObject -> {
                return JsonUtil.createObjectBuilder().add(AGGREGATE_TOPIC, jsonObject.getString("_id")).add("type", jsonObject.getString("_type")).add("command", jsonObject.getString("_command")).add(STEP_TIMESTAMP, jsonObject.getJsonNumber("_timestamp").longValue()).add("user", (String) JsonUtil.getString(jsonObject, "/_jwt/sub").orElse("anonymous")).add("breakingTheGlass", ((Boolean) JsonUtil.getBoolean(jsonObject, "/_jwt/breakingTheGlass").orElse(false)).booleanValue()).build();
            }).to(this.auditTopic);
        }
    }

    public StreamsBuilder build() {
        if (!$assertionsDisabled && (this.app == null || this.builder == null || this.environment == null || this.type == null || this.database == null)) {
            throw new AssertionError();
        }
        this.aggregateCollection = this.database.getCollection(mongoAggregateCollection());
        this.eventCollection = this.database.getCollection(mongoEventCollection());
        this.commands = createCommands();
        unique();
        KStream<String, JsonObject> reducer = reducer();
        aggregates(reducer);
        replies(reducer);
        events(reducer);
        monitorTopic(this.aggregates, AGGREGATE_TOPIC);
        monitorTopic(this.events, EVENT_TOPIC);
        monitorTopic(this.replies, REPLY_TOPIC);
        monitorTopic(this.eventsFull, EVENT_FULL_TOPIC);
        monitorReducer(reducer);
        audit();
        return this.builder;
    }

    private String cacheKey(JsonObject jsonObject) {
        return this.uniqueFunction != null ? JsonUtil.string(commandKey(jsonObject)) : jsonObject.getString("_id");
    }

    private JsonObject checkUnique(JsonObject jsonObject) {
        return (this.uniqueFunction == null || commandKey(jsonObject) != null) ? jsonObject : uniqueError(jsonObject);
    }

    private JsonValue commandKey(JsonObject jsonObject) {
        return (JsonValue) Optional.ofNullable(this.uniqueFunction).map(function -> {
            return (JsonValue) function.apply(jsonObject);
        }).filter(jsonValue -> {
            return !jsonValue.equals(JsonValue.NULL);
        }).orElse(null);
    }

    private KStream<String, JsonObject> commandSource() {
        return this.uniqueFunction != null ? this.builder.stream(topic(UNIQUE_TOPIC)) : this.commands;
    }

    public KStream<String, JsonObject> commands() {
        return this.commands;
    }

    private KStream<String, JsonObject> createCommands() {
        KStream<String, JsonObject> duplicateFilter = Streams.duplicateFilter(this.builder.stream(topic("command")).filter((str, jsonObject) -> {
            return Command.isCommand(jsonObject);
        }).map((str2, jsonObject2) -> {
            return new KeyValue(str2.toLowerCase(), idsToLowerCase(jsonObject2));
        }).mapValues(Aggregate::completeCommand), (str3, jsonObject3) -> {
            return commandDuplicateKey(jsonObject3);
        }, Duration.ofSeconds(60L));
        monitorCommands(duplicateFilter);
        return this.commandProcessor != null ? this.commandProcessor.apply(duplicateFilter, this.builder) : duplicateFilter;
    }

    private CompletionStage<Boolean> deleteMongoAggregate(JsonObject jsonObject) {
        return Collection.deleteOne(this.aggregateCollection, Filters.eq("_id", jsonObject.getString("_id"))).thenApply(deleteResult -> {
            return (DeleteResult) Util.must(deleteResult, (v0) -> {
                return v0.wasAcknowledged();
            });
        }).thenApply(deleteResult2 -> {
            return true;
        });
    }

    private CompletionStage<Boolean> deleteMongoEvent(JsonObject jsonObject) {
        return Collection.deleteOne(this.eventCollection, Filters.eq("_id", mongoEventKey(jsonObject))).thenApply(deleteResult -> {
            return (DeleteResult) Util.must(deleteResult, (v0) -> {
                return v0.wasAcknowledged();
            });
        }).thenApply(deleteResult2 -> {
            return true;
        });
    }

    private Optional<CompletionStage<Boolean>> deleteReduction(JsonObject jsonObject) {
        return JsonUtil.getBoolean(jsonObject, "/_after/_deleted").filter(bool -> {
            return bool.booleanValue();
        }).map(bool2 -> {
            return deleteMongoAggregate(jsonObject.getJsonObject("_after"));
        });
    }

    public String environment() {
        return this.environment;
    }

    public KStream<String, JsonObject> events() {
        return this.events;
    }

    private void events(KStream<String, JsonObject> kStream) {
        this.eventsFull = kStream.filter((str, jsonObject) -> {
            return Event.isEvent(jsonObject);
        });
        this.events = this.eventsFull.mapValues(Aggregate::plainEvent);
        this.eventsFull.to(topic(EVENT_FULL_TOPIC));
        this.events.to(topic(EVENT_TOPIC));
    }

    public KStream<String, JsonObject> eventsFull() {
        return this.eventsFull;
    }

    private CompletionStage<JsonObject> executeReducer(JsonObject jsonObject, JsonObject jsonObject2) {
        return (CompletionStage) Optional.ofNullable(this.reducer).map(reducer -> {
            return (CompletionStage) reducer.apply(jsonObject, jsonObject2);
        }).orElseGet(() -> {
            return (CompletionStage) Optional.ofNullable(this.reducers.get(jsonObject.getString("_command"))).map(reducer2 -> {
                return (CompletionStage) reducer2.apply(jsonObject, jsonObject2);
            }).orElseGet(() -> {
                return CompletableFuture.completedFuture(jsonObject2);
            });
        });
    }

    public String fullType() {
        return this.app + "-" + this.type;
    }

    private CompletionStage<Pair<JsonObject, Boolean>> getCurrentState(JsonObject jsonObject) {
        return (CompletionStage) this.aggregateCache.get(cacheKey(jsonObject)).map(jsonObject2 -> {
            return CompletableFuture.completedFuture(Pair.pair(jsonObject2, false));
        }).orElseGet(() -> {
            return getMongoCurrentState(jsonObject);
        });
    }

    private CompletionStage<Pair<JsonObject, Boolean>> getMongoCurrentState(JsonObject jsonObject) {
        return JsonClient.findOne(this.aggregateCollection, mongoStateCriterion(jsonObject)).thenComposeAsync(optional -> {
            return (CompletionStage) optional.map(jsonObject2 -> {
                return CompletableFuture.completedFuture(Pair.pair(jsonObject2, false));
            }).orElseGet(() -> {
                return restoreFromCommand(jsonObject).thenApply(jsonObject3 -> {
                    return Pair.pair(jsonObject3, true);
                });
            });
        });
    }

    private CompletionStage<JsonObject> handleAggregate(JsonObject jsonObject, boolean z) {
        return (CompletionStage) Or.tryWith(() -> {
            return deleteReduction(jsonObject).orElse(null);
        }).or(() -> {
            return updateReductionNew(jsonObject, z).orElse(null);
        }).or(() -> {
            return updateReductionExisting(jsonObject, z).orElse(null);
        }).get().map(completionStage -> {
            return completionStage.thenApply(bool -> {
                return (Boolean) Util.must(bool, bool -> {
                    return bool.booleanValue();
                });
            }).thenApply(bool2 -> {
                return jsonObject;
            });
        }).orElseGet(() -> {
            return CompletableFuture.completedFuture(jsonObject);
        });
    }

    private CompletionStage<Boolean> insertMongoEvent(JsonObject jsonObject) {
        return JsonClient.insert(this.eventCollection, setId(jsonObject, mongoEventKey(jsonObject))).thenApply(bool -> {
            return (Boolean) Util.must(bool, bool -> {
                return bool.booleanValue();
            });
        });
    }

    private CompletionStage<Boolean> isDuplicate(JsonObject jsonObject) {
        return JsonClient.aggregate(this.eventCollection, Collections.list(new Bson[]{Aggregates.match(Filters.and(new Bson[]{Filters.regex("_id", "^" + jsonObject.getString("_id") + ".*"), Filters.eq("_corr", jsonObject.getString("_corr")), Filters.eq("_command", jsonObject.getString("_command"))})), Aggregates.project(Projections.include(new String[]{"_id"}))})).thenApply(list -> {
            return Boolean.valueOf(!list.isEmpty());
        });
    }

    private JsonObject keepId(JsonObject jsonObject, JsonObject jsonObject2) {
        return this.uniqueFunction != null ? JsonUtil.createObjectBuilder(jsonObject2).add("_id", jsonObject.getString("_id")).build() : jsonObject2;
    }

    public KStream<String, JsonObject> monitor() {
        if (this.monitor == null) {
            this.monitor = this.builder.stream(topic(MONITOR_TOPIC));
        }
        return this.monitor;
    }

    private String mongoAggregateCollection() {
        return fullType() + "-" + this.environment;
    }

    private String mongoEventCollection() {
        return fullType() + "-event-" + this.environment;
    }

    private Bson mongoStateCriterion(JsonObject jsonObject) {
        return (Bson) Optional.ofNullable(this.uniqueFunction).map(function -> {
            return (JsonValue) function.apply(jsonObject);
        }).map(this::mongoStateQuery).orElseGet(() -> {
            return Filters.eq("_id", jsonObject.getString("_id"));
        });
    }

    private Bson mongoStateQuery(JsonValue jsonValue) {
        return Mongo.addNotDeleted(BsonUtil.fromJson(JsonUtil.isObject(jsonValue) ? jsonValue.asJsonObject() : JsonUtil.createObjectBuilder().add("$expr", JsonUtil.createObjectBuilder().add("$eq", JsonUtil.createArrayBuilder().add(this.uniqueExpression).add(jsonValue))).build()));
    }

    private void monitorCommands(KStream<String, JsonObject> kStream) {
        if (this.monitoring) {
            kStream.filter((str, jsonObject) -> {
                return jsonObject.containsKey("_corr");
            }).flatMap((str2, jsonObject2) -> {
                return Collections.list(new KeyValue[]{new KeyValue(jsonObject2.getString("_corr"), createStep(MonitorSteps.RECEIVED, null, jsonObject2.getJsonNumber("_timestamp").longValue(), jsonObject2.getString("_command"))), new KeyValue(jsonObject2.getString("_corr"), createStep(MonitorSteps.COMMAND_TOPIC, MonitorSteps.RECEIVED, Instant.now().toEpochMilli(), jsonObject2.getString("_command")))});
            }).to(topic(MONITOR_TOPIC));
        }
    }

    private void monitorReducer(KStream<String, JsonObject> kStream) {
        if (this.monitoring) {
            kStream.filter((str, jsonObject) -> {
                return jsonObject.containsKey("_corr") && !Command.hasError(jsonObject);
            }).map((str2, jsonObject2) -> {
                return new KeyValue(jsonObject2.getString("_corr"), createStep(MonitorSteps.REDUCE, MonitorSteps.COMMAND_TOPIC, Instant.now().toEpochMilli()));
            }).to(topic(MONITOR_TOPIC));
            kStream.filter((str3, jsonObject3) -> {
                return jsonObject3.containsKey("_corr") && Command.hasError(jsonObject3);
            }).map((str4, jsonObject4) -> {
                return new KeyValue(jsonObject4.getString("_corr"), createError(jsonObject4, Instant.now().toEpochMilli()));
            }).to(topic(MONITOR_TOPIC));
        }
    }

    private void monitorTopic(KStream<String, JsonObject> kStream, String str) {
        if (this.monitoring) {
            kStream.filter((str2, jsonObject) -> {
                return jsonObject.containsKey("_corr");
            }).map((str3, jsonObject2) -> {
                return new KeyValue(jsonObject2.getString("_corr"), createStep(str + "-topic", MonitorSteps.REDUCE, Instant.now().toEpochMilli()));
            }).to(topic(MONITOR_TOPIC));
        }
    }

    private CompletionStage<JsonObject> processCommand(JsonObject jsonObject) {
        return reduceCommand(jsonObject).thenComposeAsync(pair -> {
            return saveReduction((JsonObject) pair.first, jsonObject2 -> {
                return handleAggregate(jsonObject2, ((Boolean) pair.second).booleanValue());
            });
        });
    }

    private JsonObject processNewState(JsonObject jsonObject, JsonObject jsonObject2, JsonObject jsonObject3) {
        return (JsonObject) Optional.ofNullable(jsonObject2).filter(jsonObject4 -> {
            return !Command.hasError(jsonObject4);
        }).map(jsonObject5 -> {
            return createOps(jsonObject, jsonObject2);
        }).filter(jsonArray -> {
            return !jsonArray.isEmpty();
        }).map(jsonArray2 -> {
            return createEvent(jsonObject, jsonObject2, jsonObject3, jsonArray2);
        }).map(jsonObject6 -> {
            return (JsonObject) SideEffect.run(() -> {
                this.aggregateCache.put(cacheKey(jsonObject3), jsonObject6.getJsonObject("_after"));
            }).andThenGet(() -> {
                return jsonObject6;
            });
        }).orElse(jsonObject2);
    }

    private JsonObject reduce(JsonObject jsonObject) {
        return (JsonObject) Util.tryToGet(() -> {
            return (JsonObject) isDuplicate(jsonObject).thenComposeAsync(bool -> {
                return Boolean.FALSE.equals(bool) ? processCommand(jsonObject) : CompletableFuture.completedFuture(null);
            }).toCompletableFuture().get();
        }, exc -> {
            return setException(jsonObject, exc);
        }).orElse(null);
    }

    private CompletionStage<Pair<JsonObject, Boolean>> reduceCommand(JsonObject jsonObject) {
        JsonObject checkUnique = checkUnique(jsonObject);
        return Command.hasError(checkUnique) ? CompletableFuture.completedFuture(Pair.pair(checkUnique, false)) : getCurrentState(jsonObject).thenApply(pair -> {
            return Pair.pair(makeManaged((JsonObject) pair.first, jsonObject), (Boolean) pair.second);
        }).thenComposeAsync(pair2 -> {
            return reduceIfAllowed((JsonObject) pair2.first, keepId((JsonObject) pair2.first, jsonObject)).thenApply(jsonObject2 -> {
                return Pair.pair(jsonObject2, (Boolean) pair2.second);
            });
        });
    }

    private CompletionStage<JsonObject> reduceIfAllowed(JsonObject jsonObject, JsonObject jsonObject2) {
        return Command.isAllowed(jsonObject, jsonObject2, this.breakingTheGlass) ? executeReducer(jsonObject2, jsonObject).thenApply(jsonObject3 -> {
            return processNewState(jsonObject, jsonObject3, jsonObject2);
        }) : CompletableFuture.completedFuture(accessError(jsonObject2));
    }

    private KStream<String, JsonObject> reducer() {
        return commandSource().mapValues(this::reduce).filter((str, jsonObject) -> {
            return jsonObject != null;
        });
    }

    public KStream<String, JsonObject> replies() {
        return this.replies;
    }

    private void replies(KStream<String, JsonObject> kStream) {
        this.replies = this.aggregates.flatMapValues(jsonObject -> {
            return Collections.list(new JsonObject[]{jsonObject, createAggregateMessage(jsonObject)});
        }).merge(errors(kStream));
        this.replies.to(topic(REPLY_TOPIC));
    }

    private CompletionStage<JsonObject> restoreFromCommand(JsonObject jsonObject) {
        return Mongo.restore(jsonObject.getString("_id"), fullType(), this.environment, this.databaseArchive);
    }

    private CompletionStage<JsonObject> saveReduction(JsonObject jsonObject, Function<JsonObject, CompletionStage<JsonObject>> function) {
        return Event.isEvent(jsonObject) ? insertMongoEvent(plainEvent(jsonObject)).thenComposeAsync(bool -> {
            return (CompletionStage) function.apply(jsonObject);
        }).exceptionally(th -> {
            deleteMongoEvent(jsonObject);
            Util.rethrow(th);
            return null;
        }) : CompletableFuture.completedFuture(jsonObject);
    }

    public String topic(String str) {
        return fullType() + "-" + str + "-" + this.environment;
    }

    private void unique() {
        if (this.uniqueFunction != null) {
            this.commands.mapValues(jsonObject -> {
                return Pair.pair(jsonObject, this.uniqueFunction.apply(jsonObject));
            }).filter((str, pair) -> {
                return !((JsonValue) pair.second).equals(JsonValue.NULL);
            }).map((str2, pair2) -> {
                return new KeyValue(JsonUtil.string((JsonValue) pair2.second), (JsonObject) pair2.first);
            }).to(topic(UNIQUE_TOPIC));
        }
    }

    public String type() {
        return this.type;
    }

    private CompletionStage<Boolean> updateMongoAggregate(JsonObject jsonObject) {
        return JsonClient.update(this.aggregateCollection, jsonObject).thenApply(bool -> {
            return (Boolean) Util.must(bool, bool -> {
                return bool.booleanValue();
            });
        });
    }

    private Optional<CompletionStage<Boolean>> updateReductionNew(JsonObject jsonObject, boolean z) {
        return Optional.of(jsonObject.getJsonObject("_before")).filter(jsonObject2 -> {
            return z;
        }).map(jsonObject3 -> {
            return updateMongoAggregate(jsonObject.getJsonObject("_after"));
        });
    }

    private Optional<CompletionStage<Boolean>> updateReductionExisting(JsonObject jsonObject, boolean z) {
        return Optional.of(jsonObject.getJsonObject("_before")).filter(jsonObject2 -> {
            return !z;
        }).map(jsonObject3 -> {
            return Mongo.updateAggregate(this.aggregateCollection, jsonObject.getJsonObject("_before"), jsonObject);
        });
    }

    public Aggregate withApp(String str) {
        this.app = str;
        return this;
    }

    public Aggregate withAudit(String str) {
        this.auditTopic = str;
        return this;
    }

    public Aggregate withBreakingTheGlass() {
        this.breakingTheGlass = true;
        return this;
    }

    public Aggregate withBuilder(StreamsBuilder streamsBuilder) {
        this.builder = streamsBuilder;
        return this;
    }

    public Aggregate withCommandProcessor(StreamProcessor streamProcessor) {
        StreamProcessor streamProcessor2 = this.commandProcessor;
        this.commandProcessor = streamProcessor2 != null ? (kStream, streamsBuilder) -> {
            return streamProcessor.apply(streamProcessor2.apply(kStream, streamsBuilder), streamsBuilder);
        } : streamProcessor;
        return this;
    }

    public Aggregate withEnvironment(String str) {
        this.environment = str;
        return this;
    }

    public Aggregate withMongoDatabase(MongoDatabase mongoDatabase) {
        this.database = mongoDatabase.withReadConcern(ReadConcern.LINEARIZABLE).withWriteConcern(WriteConcern.MAJORITY);
        if (this.databaseArchive == null) {
            this.databaseArchive = mongoDatabase;
        }
        return this;
    }

    public Aggregate withMongoDatabaseArchive(MongoDatabase mongoDatabase) {
        this.databaseArchive = mongoDatabase != null ? mongoDatabase : this.database;
        return this;
    }

    public Aggregate withMonitoring(boolean z) {
        this.monitoring = z;
        return this;
    }

    public Aggregate withReducer(String str, Reducer reducer) {
        this.reducers.put(str, reducer);
        return this;
    }

    public Aggregate withReducer(Reducer reducer) {
        this.reducer = reducer;
        return this;
    }

    public Aggregate withType(String str) {
        this.type = str;
        return this;
    }

    public Aggregate withUniqueExpression(JsonValue jsonValue) {
        this.uniqueExpression = jsonValue;
        this.uniqueFunction = Expression.function(jsonValue);
        return this;
    }

    static {
        $assertionsDisabled = !Aggregate.class.desiredAssertionStatus();
        DUPLICATE_WINDOW = Duration.ofSeconds(60L);
        TECHNICAL_FIELDS = Collections.set(new String[]{"_command", "_corr", "_id", "_jwt", "_languages", "_seq", "_test", "_timestamp", "_type"});
    }
}
