package eu.elg.ltservice;

import com.fasterxml.jackson.core.JsonProcessingException;
import eu.elg.ltservice.LTService.Context;
import eu.elg.model.Failure;
import eu.elg.model.Progress;
import eu.elg.model.Request;
import eu.elg.model.Response;
import eu.elg.model.ResponseMessage;
import eu.elg.model.StandardMessages;
import eu.elg.model.StatusMessage;
import io.micronaut.context.annotation.Value;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.annotation.ReflectiveAccess;
import io.micronaut.core.async.annotation.SingleResult;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Consumes;
import io.micronaut.http.annotation.Error;
import io.micronaut.http.annotation.Post;
import io.micronaut.http.annotation.Produces;
import io.micronaut.http.annotation.RequestBean;
import io.micronaut.http.sse.Event;
import java.time.Duration;
import java.util.Collections;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:eu/elg/ltservice/LTService.class */
public class LTService<T extends Request<T>, C extends Context> {

    @Value("${elg.progress-interval:15s}")
    @ReflectiveAccess
    protected Duration progressInterval;

    @Introspected
    /* loaded from: input_file:eu/elg/ltservice/LTService$Context.class */
    public static class Context {
        private volatile FluxSink<Progress> sink;
        boolean progressEnabled = false;

        Flux<ResponseMessage> progressFlux(Duration duration) {
            return Flux.concat(new Publisher[]{Mono.just(new Progress().withPercent(Double.valueOf(0.0d)).asMessage()), Flux.create(fluxSink -> {
                this.sink = fluxSink;
            }, FluxSink.OverflowStrategy.LATEST).map((v0) -> {
                return v0.asMessage();
            })}).switchMap(responseMessage -> {
                return Flux.interval(Duration.ZERO, duration).map(l -> {
                    return responseMessage;
                });
            });
        }

        public boolean sendingProgress() {
            return this.progressEnabled;
        }

        public void reportProgress(Double d, StatusMessage statusMessage) {
            if (this.sink == null || this.sink.isCancelled()) {
                return;
            }
            this.sink.next(new Progress().withPercent(d).withMessage(statusMessage));
        }

        public void reportProgress(Double d) {
            reportProgress(d, null);
        }
    }

    @Consumes({"application/json"})
    @Post
    @Produces({"application/json", "*/*"})
    public Mono<HttpResponse<ResponseMessage>> jsonRequestSingleResponse(@Body T t, @RequestBean C c) {
        return sendSingleResponse(jsonRequest(t, c));
    }

    @Consumes({"application/json"})
    @Post
    @Produces({"text/event-stream"})
    public Flux<Event<ResponseMessage>> jsonRequestStreamingResponse(@Body T t, @RequestBean C c) {
        return sendProgress(() -> {
            return jsonRequest(t, c);
        }, c);
    }

    private Mono<ResponseMessage> jsonRequest(T t, C c) {
        return wrap(t, handle(t, c));
    }

    @Error(status = HttpStatus.NOT_FOUND)
    public HttpResponse<ResponseMessage> notFound(HttpRequest<?> httpRequest) {
        return HttpResponse.notFound(new Failure().withErrors(new StatusMessage[]{StandardMessages.elgServiceNotFound(httpRequest.getPath())}).asMessage());
    }

    @Error
    public HttpResponse<ResponseMessage> errorHandler(HttpRequest<?> httpRequest, JsonProcessingException jsonProcessingException) {
        return HttpResponse.badRequest(new Failure().withErrors(new StatusMessage[]{StandardMessages.elgRequestInvalid().withDetail(Collections.singletonMap("reason", jsonProcessingException.getMessage()))}).asMessage());
    }

    @SingleResult
    protected Publisher<? extends Response<?>> handle(T t, C c) {
        return Mono.fromCallable(() -> {
            return handleSync(t, c);
        }).subscribeOn(syncScheduler());
    }

    protected Response<?> handleSync(T t, C c) throws Exception {
        throw new UnsupportedOperationException("not implemented");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Mono<HttpResponse<ResponseMessage>> sendSingleResponse(Mono<ResponseMessage> mono) {
        return mono.map((v0) -> {
            return HttpResponse.ok(v0);
        }).onErrorResume(ELGException.class, eLGException -> {
            return Mono.just(HttpResponse.status(eLGException.getStatus()).body((ResponseMessage) eLGException.getBody().get()));
        }).onErrorResume(th -> {
            return Mono.just(HttpResponse.serverError(ELGErrorResponseProcessor.errorResponseForException(th, LoggerFactory.getLogger(getClass()).isTraceEnabled())));
        }).map(mutableHttpResponse -> {
            return mutableHttpResponse.contentType(MediaType.APPLICATION_JSON_TYPE);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Flux<Event<ResponseMessage>> sendProgress(Supplier<Mono<ResponseMessage>> supplier, C c) {
        c.progressEnabled = true;
        return c.progressFlux(this.progressInterval).mergeWith(Mono.defer(supplier).onErrorResume(ELGException.class, eLGException -> {
            return Mono.just((ResponseMessage) eLGException.getBody().get());
        }).onErrorResume(th -> {
            return Mono.just(ELGErrorResponseProcessor.errorResponseForException(th, LoggerFactory.getLogger(getClass()).isTraceEnabled()));
        })).handle((responseMessage, synchronousSink) -> {
            synchronousSink.next(Event.of(responseMessage));
            if (responseMessage.getFailure() == null && responseMessage.getResponse() == null) {
                return;
            }
            synchronousSink.complete();
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Scheduler syncScheduler() {
        return Schedulers.boundedElastic();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Mono<ResponseMessage> wrap(T t, Publisher<? extends Response<?>> publisher) {
        return Mono.from(publisher).map(response -> {
            ResponseMessage asMessage = response.asMessage();
            asMessage.addWarnings(t.unknownPropertyWarnings());
            return asMessage;
        });
    }
}
