package lumbermill.internal.http;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.net.JksOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import lumbermill.Http;
import lumbermill.api.Event;
import lumbermill.api.MetaDataEvent;
import lumbermill.http.UnitOfWork;
import lumbermill.http.UnitOfWorkListener;
import lumbermill.internal.MapWrap;
import lumbermill.internal.http.PostHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:lumbermill/internal/http/VertxHttpServer.class */
public class VertxHttpServer<T extends Event> extends AbstractVerticle implements Http.Server {
    private static final Logger LOGGER = LoggerFactory.getLogger(VertxHttpServer.class);
    private static final int DEFAULT_PORT = 5678;
    private static final String ON_NO_TAG = "_totag";
    private final int port;
    private final BlockingQueue<Event> queue;
    private final Router router;
    private final Vertx vertx;
    private final HttpServer httpServer;
    private final Map<String, UnitOfWorkListener> observableListenersByTag;
    private boolean setupCompleted;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lumbermill/internal/http/VertxHttpServer$PostCreatedCallback.class */
    public class PostCreatedCallback implements PostHandler.OnPostCreatedCallback {
        private final Optional<List<String>> tags;

        public PostCreatedCallback(Optional<List<String>> optional) {
            this.tags = optional;
        }

        @Override // lumbermill.internal.http.PostHandler.OnPostCreatedCallback
        public void onCreated(Event event) {
            if (this.tags.isPresent()) {
                ((MetaDataEvent) event).addTags(this.tags.get());
            }
            if (((List) VertxHttpServer.this.observableListenersByTag.keySet().stream().filter(str -> {
                return event.hasTag(str) || VertxHttpServer.this.observableListenersByTag.containsKey(VertxHttpServer.ON_NO_TAG);
            }).map(str2 -> {
                return (UnitOfWorkListener) VertxHttpServer.this.observableListenersByTag.get(str2);
            }).map(unitOfWorkListener -> {
                return prepare(event, unitOfWorkListener);
            }).collect(Collectors.toList())).isEmpty()) {
                throw new IllegalStateException("No UnitOfWorkListener matches the specified event. Check your tags if using server.onTag('tag'..), or add a fallback with onTag(..)");
            }
        }

        UnitOfWork<Event> prepare(Event event, UnitOfWorkListener unitOfWorkListener) {
            HttpUnitOfWork httpUnitOfWork = new HttpUnitOfWork(event);
            unitOfWorkListener.apply(httpUnitOfWork.observable()).subscribe(httpUnitOfWork.subscriber());
            return httpUnitOfWork;
        }
    }

    public VertxHttpServer(MapWrap mapWrap) {
        this.port = ((Integer) mapWrap.get("port", Integer.valueOf(DEFAULT_PORT))).intValue();
        this.queue = new LinkedBlockingQueue();
        this.observableListenersByTag = new HashMap();
        this.vertx = Vertx.vertx();
        this.vertx.deployVerticle(this);
        if (mapWrap.exists("keyStorePath")) {
            this.httpServer = this.vertx.createHttpServer(new HttpServerOptions().setSsl(true).setKeyStoreOptions(new JksOptions().setPath(mapWrap.asString("keyStorePath")).setPassword(mapWrap.asString("keyStorePassword"))));
        } else {
            this.httpServer = this.vertx.createHttpServer();
        }
        this.router = Router.router(this.vertx);
    }

    public VertxHttpServer(int i) {
        this(MapWrap.of("port", Integer.valueOf(i)));
    }

    @Override // lumbermill.Http.Server, lumbermill.http.UnitOfWorkAwareSource
    public Http.Server onTag(String str, UnitOfWorkListener unitOfWorkListener) {
        this.observableListenersByTag.put(str, unitOfWorkListener);
        if (!this.setupCompleted) {
            finalizeHttpServer();
        }
        return this;
    }

    @Override // lumbermill.Http.Server, lumbermill.http.UnitOfWorkAwareSource
    public Http.Server<T> on(UnitOfWorkListener unitOfWorkListener) {
        this.observableListenersByTag.put(ON_NO_TAG, unitOfWorkListener);
        if (!this.setupCompleted) {
            finalizeHttpServer();
        }
        return this;
    }

    @Override // lumbermill.Http.Server
    public Http.Server post(Map map) {
        return setupRoute(MapWrap.of(map).put("method", "POST"));
    }

    @Override // lumbermill.Http.Server
    public Http.Server get(Map map) {
        return setupRoute(MapWrap.of(map).put("method", "GET"));
    }

    public void shutdown() {
        this.httpServer.close();
        this.vertx.close();
    }

    private Http.Server setupRoute(MapWrap mapWrap) {
        String asString = mapWrap.asString("path");
        String asString2 = mapWrap.asString("method");
        Optional ifExists = mapWrap.getIfExists("codec");
        Optional ifExists2 = mapWrap.getIfExists("tags");
        LOGGER.debug("Setting up route, path: {}, method: {}", asString, asString2);
        if (asString2.equalsIgnoreCase("POST")) {
            this.router.route().handler(BodyHandler.create());
            this.router.post(asString).handler(new PostHandler(ifExists, new PostCreatedCallback(ifExists2)));
        } else if (asString2.equalsIgnoreCase("GET")) {
            this.router.get().path(asString).handler(new GetHandler());
        }
        return this;
    }

    private void finalizeHttpServer() {
        if (this.setupCompleted) {
            throw new IllegalStateException("You can only invoke stream() or forEach() once and either of them, not both");
        }
        this.router.route().handler(routingContext -> {
            routingContext.response().setStatusCode(404).end();
        });
        HttpServer httpServer = this.httpServer;
        Router router = this.router;
        router.getClass();
        httpServer.requestHandler(router::accept).listen(this.port);
        this.setupCompleted = true;
    }
}
