package io.scalecube.services.gateway.http;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.api.ErrorData;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.exceptions.DefaultErrorMapper;
import io.scalecube.services.gateway.GatewayMetrics;
import io.scalecube.services.transport.api.DataCodec;
import io.scalecube.services.transport.api.ReferenceCountUtil;
import java.util.Optional;
import java.util.function.BiFunction;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufMono;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;

/* loaded from: input_file:io/scalecube/services/gateway/http/HttpGatewayAcceptor.class */
public class HttpGatewayAcceptor implements BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpGatewayAcceptor.class);
    private static final String SERVICE_RECV_TIME = "service-recv-time";
    private static final String SERVICE_SEND_TIME = "service-send-time";
    private static final String CLIENT_RECV_TIME = "client-recv-time";
    private static final String CLIENT_SEND_TIME = "client-send-time";
    private final ServiceCall serviceCall;
    private final GatewayMetrics metrics;

    /* JADX INFO: Access modifiers changed from: package-private */
    public HttpGatewayAcceptor(ServiceCall serviceCall, GatewayMetrics gatewayMetrics) {
        this.serviceCall = serviceCall;
        this.metrics = gatewayMetrics;
    }

    @Override // java.util.function.BiFunction
    public Publisher<Void> apply(HttpServerRequest httpServerRequest, HttpServerResponse httpServerResponse) {
        LOGGER.debug("Accepted request: {}, headers: {}, params: {}", new Object[]{httpServerRequest, httpServerRequest.requestHeaders(), httpServerRequest.params()});
        if (httpServerRequest.method() == HttpMethod.POST) {
            return httpServerRequest.receive().aggregate().switchIfEmpty(Mono.defer(() -> {
                return ByteBufMono.just(Unpooled.EMPTY_BUFFER);
            })).map((v0) -> {
                return v0.retain();
            }).doOnNext(byteBuf -> {
                this.metrics.markRequest();
            }).flatMap(byteBuf2 -> {
                return handleRequest(byteBuf2, httpServerRequest, httpServerResponse);
            }).doOnSuccess(r3 -> {
                this.metrics.markResponse();
            }).onErrorResume(th -> {
                return error(httpServerResponse, DefaultErrorMapper.INSTANCE.toMessage(th));
            });
        }
        LOGGER.error("Unsupported HTTP method. Expected POST, actual {}", httpServerRequest.method());
        return methodNotAllowed(httpServerResponse);
    }

    private Mono<Void> handleRequest(ByteBuf byteBuf, HttpServerRequest httpServerRequest, HttpServerResponse httpServerResponse) {
        String uri = httpServerRequest.uri();
        ServiceMessage.Builder data = ServiceMessage.builder().qualifier(uri).data(byteBuf);
        enrichRequest(httpServerRequest.requestHeaders(), data);
        return this.serviceCall.requestOne(data.build()).doOnNext(serviceMessage -> {
            this.metrics.markServiceResponse();
        }).switchIfEmpty(Mono.defer(() -> {
            return Mono.just(ServiceMessage.builder().qualifier(uri).build());
        })).flatMap(serviceMessage2 -> {
            enrichResponse(httpServerResponse, serviceMessage2);
            return Mono.defer(() -> {
                return serviceMessage2.isError() ? error(httpServerResponse, serviceMessage2) : serviceMessage2.hasData() ? ok(httpServerResponse, serviceMessage2) : noContent(httpServerResponse);
            });
        });
    }

    private Publisher<Void> methodNotAllowed(HttpServerResponse httpServerResponse) {
        return httpServerResponse.addHeader(HttpHeaderNames.ALLOW, HttpMethod.POST.name()).status(HttpResponseStatus.METHOD_NOT_ALLOWED).send();
    }

    private Mono<Void> error(HttpServerResponse httpServerResponse, ServiceMessage serviceMessage) {
        return httpServerResponse.status(HttpResponseStatus.valueOf(serviceMessage.errorType())).sendObject(serviceMessage.hasData(ErrorData.class) ? encodeData(serviceMessage.data(), serviceMessage.dataFormatOrDefault()) : ((ByteBuf) serviceMessage.data()).retain()).then();
    }

    private Mono<Void> noContent(HttpServerResponse httpServerResponse) {
        return httpServerResponse.status(HttpResponseStatus.NO_CONTENT).send();
    }

    private Mono<Void> ok(HttpServerResponse httpServerResponse, ServiceMessage serviceMessage) {
        return httpServerResponse.status(HttpResponseStatus.OK).sendObject(serviceMessage.hasData(ByteBuf.class) ? ((ByteBuf) serviceMessage.data()).retain() : encodeData(serviceMessage.data(), serviceMessage.dataFormatOrDefault())).then();
    }

    private ByteBuf encodeData(Object obj, String str) {
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
        try {
            DataCodec.getInstance(str).encode(new ByteBufOutputStream(buffer), obj);
            return buffer;
        } catch (Throwable th) {
            ReferenceCountUtil.safestRelease(buffer);
            LOGGER.error("Failed to encode data: {}", obj, th);
            return Unpooled.EMPTY_BUFFER;
        }
    }

    private void enrichRequest(HttpHeaders httpHeaders, ServiceMessage.Builder builder) {
        Optional.ofNullable(httpHeaders.get(CLIENT_SEND_TIME)).ifPresent(str -> {
            builder.header(CLIENT_SEND_TIME, str);
        });
        Optional.ofNullable(httpHeaders.get(CLIENT_RECV_TIME)).ifPresent(str2 -> {
            builder.header(CLIENT_RECV_TIME, str2);
        });
        Optional.ofNullable(httpHeaders.get(SERVICE_RECV_TIME)).ifPresent(str3 -> {
            builder.header(SERVICE_RECV_TIME, str3);
        });
        Optional.ofNullable(httpHeaders.get(SERVICE_SEND_TIME)).ifPresent(str4 -> {
            builder.header(SERVICE_SEND_TIME, str4);
        });
    }

    private void enrichResponse(HttpServerResponse httpServerResponse, ServiceMessage serviceMessage) {
        Optional.ofNullable(serviceMessage.header(CLIENT_SEND_TIME)).ifPresent(str -> {
            httpServerResponse.header(CLIENT_SEND_TIME, str);
        });
        Optional.ofNullable(serviceMessage.header(CLIENT_RECV_TIME)).ifPresent(str2 -> {
            httpServerResponse.header(CLIENT_RECV_TIME, str2);
        });
        Optional.ofNullable(serviceMessage.header(SERVICE_RECV_TIME)).ifPresent(str3 -> {
            httpServerResponse.header(SERVICE_RECV_TIME, str3);
        });
        Optional.ofNullable(serviceMessage.header(SERVICE_SEND_TIME)).ifPresent(str4 -> {
            httpServerResponse.header(SERVICE_SEND_TIME, str4);
        });
    }
}
