package net.pincette.jes.util;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collection;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.json.JsonArrayBuilder;
import javax.json.JsonObject;
import net.pincette.function.SideEffect;
import net.pincette.json.JsonUtil;
import net.pincette.util.Collections;
import org.apache.kafka.streams.kstream.KStream;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.RequestBuilder;

/* loaded from: input_file:net/pincette/jes/util/Fanout.class */
public class Fanout {
    private static final AsyncHttpClient client = Dsl.asyncHttpClient();

    private Fanout() {
    }

    public static void connect(KStream<String, JsonObject> kStream, String str, String str2) {
        connect(kStream, (Set<String>) null, str, str2);
    }

    public static void connect(KStream<String, JsonObject> kStream, Set<String> set, String str, String str2) {
        kStream.mapValues(jsonObject -> {
            return (Boolean) net.pincette.util.Util.tryToGetRethrow(() -> {
                return send(jsonObject, set, str, str2).toCompletableFuture().get();
            }).orElse(false);
        });
    }

    public static void connect(KStream<String, JsonObject> kStream, String str, String str2, Logger logger) {
        connect(kStream, null, str, str2, logger);
    }

    public static void connect(KStream<String, JsonObject> kStream, Set<String> set, String str, String str2, Logger logger) {
        kStream.mapValues(jsonObject -> {
            return (Boolean) net.pincette.util.Util.tryToGet(() -> {
                return send(jsonObject, set, str, str2).toCompletableFuture().get();
            }, exc -> {
                return (Boolean) SideEffect.run(() -> {
                    logger.log(Level.SEVERE, exc.getMessage(), (Throwable) exc);
                }).andThenGet(() -> {
                    return false;
                });
            }).orElse(false);
        });
    }

    private static JsonObject createMessage(JsonObject jsonObject, Set<String> set) {
        String string = JsonUtil.string(jsonObject);
        return JsonUtil.createObjectBuilder().add("items", (JsonArrayBuilder) set.stream().reduce(JsonUtil.createArrayBuilder(), (jsonArrayBuilder, str) -> {
            return jsonArrayBuilder.add(JsonUtil.createObjectBuilder().add("channel", str).add("formats", JsonUtil.createObjectBuilder().add("http-stream", JsonUtil.createObjectBuilder().add("content", "event: message\ndata:" + string + "\n\n"))));
        }, (jsonArrayBuilder2, jsonArrayBuilder3) -> {
            return jsonArrayBuilder2;
        })).build();
    }

    private static String password(String str, String str2) {
        return (String) net.pincette.util.Util.tryToGetRethrow(() -> {
            return Base64.getEncoder().encodeToString((str + ":" + str2).getBytes(StandardCharsets.UTF_8));
        }).orElse(null);
    }

    public static CompletionStage<Boolean> send(JsonObject jsonObject, String str, String str2) {
        return send(jsonObject, null, str, str2);
    }

    public static CompletionStage<Boolean> send(JsonObject jsonObject, Set<String> set, String str, String str2) {
        return (CompletionStage) usernames(jsonObject, set != null ? set : new HashSet<>()).map(set2 -> {
            return client.executeRequest(new RequestBuilder().setUrl(url(str)).setMethod("POST").addHeader("Content-Type", "application/json").addHeader("Authorization", "Basic " + password(str, str2)).setBody(JsonUtil.string(createMessage(jsonObject, set2))).build()).toCompletableFuture().thenApply(response -> {
                return Boolean.valueOf(response.getStatusCode() == 200);
            });
        }).orElseGet(() -> {
            return CompletableFuture.completedFuture(true);
        });
    }

    private static Set<String> subscribers(JsonObject jsonObject) {
        return (Set) JsonUtil.getObjects(jsonObject, JsonFields.SUBSCRIPTIONS).map(jsonObject2 -> {
            return jsonObject2.getString(JsonFields.SUB, (String) null);
        }).collect(Collectors.toSet());
    }

    private static String url(String str) {
        return "https://api.fanout.io/realm/" + str + "/publish/";
    }

    private static Optional<Set<String>> usernames(JsonObject jsonObject, Set<String> set) {
        return Optional.of(Collections.difference(Collections.union(new Collection[]{(Collection) Util.getUsername(jsonObject).map(str -> {
            return Collections.set(new String[]{str});
        }).orElseGet(HashSet::new), subscribers(jsonObject)}), set)).filter(set2 -> {
            return !set2.isEmpty();
        });
    }
}
