package org.nustaq.kontraktor.remoting.http.undertow;

import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.Headers;
import io.undertow.util.Methods;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.remoting.http.AbstractHttpServerConnector;
import org.nustaq.kontraktor.remoting.http.HttpObjectSocket;
import org.nustaq.kontraktor.remoting.http.KHttpExchange;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.kontraktor.util.Pair;
import org.xnio.channels.StreamSinkChannel;
import org.xnio.channels.StreamSourceChannel;

/* loaded from: input_file:org/nustaq/kontraktor/remoting/http/undertow/UndertowHttpServerConnector.class */
public class UndertowHttpServerConnector extends AbstractHttpServerConnector implements HttpHandler {
    public UndertowHttpServerConnector(Actor actor) {
        super(actor);
    }

    public void handleRequest(HttpServerExchange httpServerExchange) throws Exception {
        if (httpServerExchange.getRequestMethod() != Methods.POST) {
            httpServerExchange.setResponseCode(404);
            httpServerExchange.endExchange();
            return;
        }
        String relativePath = httpServerExchange.getRelativePath();
        StreamSourceChannel requestChannel = httpServerExchange.getRequestChannel();
        ByteBuffer allocate = ByteBuffer.allocate(Integer.parseInt(httpServerExchange.getRequestHeaders().getFirst(Headers.CONTENT_LENGTH)));
        requestChannel.getReadSetter().set(streamSourceChannel -> {
            try {
                streamSourceChannel.read(allocate);
            } catch (IOException e) {
                Log.Warn(this, e);
            }
            if (allocate.remaining() == 0) {
                try {
                    requestChannel.shutdownReads();
                } catch (IOException e2) {
                    Log.Warn(this, e2);
                }
                this.facade.execute(() -> {
                    requestReceived(httpServerExchange, allocate.array(), relativePath);
                });
            }
        });
        requestChannel.resumeReads();
    }

    protected void requestReceived(HttpServerExchange httpServerExchange, byte[] bArr, String str) {
        String first = httpServerExchange.getRequestHeaders().getFirst("sid");
        String[] strArr = new String[0];
        if (first == null) {
            while (str.startsWith("/")) {
                str = str.substring(1);
            }
            if (str.trim().length() > 0) {
                strArr = str.split("/");
                first = strArr[0];
            }
        }
        if (first == null || first.length() <= 0) {
            if (bArr != null && bArr.length > 0) {
                this.conf.asObject(bArr);
            }
            handleNewSession(new UndertowKHttpExchangeImpl(httpServerExchange));
            return;
        }
        HttpObjectSocket httpObjectSocket = this.sessions.get(first);
        if (httpObjectSocket != null) {
            handleClientRequest(httpServerExchange, httpObjectSocket, bArr, strArr.length > 1 ? strArr[1] : null);
            return;
        }
        HttpObjectSocket restoreSessionFromId = restoreSessionFromId(first);
        if (restoreSessionFromId != null) {
            handleClientRequest(httpServerExchange, restoreSessionFromId, bArr, strArr.length > 1 ? strArr[1] : null);
        } else {
            httpServerExchange.setResponseCode(401);
            httpServerExchange.endExchange();
        }
    }

    public void handleClientRequest(HttpServerExchange httpServerExchange, HttpObjectSocket httpObjectSocket, byte[] bArr, String str) {
        byte[] bArr2;
        StreamSinkChannel responseChannel = httpServerExchange.getResponseChannel();
        if (responseChannel == null) {
            Log.Error(this, "could not aquire response channel. rejecting request.");
            httpServerExchange.endExchange();
            return;
        }
        httpObjectSocket.updateTimeStamp();
        Object[] objArr = (Object[]) httpObjectSocket.getConf().asObject(bArr);
        if (!(objArr.length == 1 && (objArr[0] instanceof Number))) {
            handleRegularRequest(httpServerExchange, httpObjectSocket, objArr, responseChannel);
            return;
        }
        int i = -1;
        if (str != null) {
            try {
                i = Integer.parseInt(str);
            } catch (Throwable th) {
                Log.Warn(this, th);
            }
        }
        if (i > 0 && (bArr2 = (byte[]) httpObjectSocket.takeStoredLPMessage(i + 1)) != null) {
            replyFromHistory(httpServerExchange, responseChannel, bArr2);
            return;
        }
        responseChannel.resumeWrites();
        Pair<Runnable, KHttpExchange> createLongPollTask = createLongPollTask(new UndertowKHttpExchangeImpl(httpServerExchange), httpObjectSocket, responseChannel);
        httpObjectSocket.cancelLongPoll();
        httpObjectSocket.setLongPollTask(createLongPollTask);
    }

    protected Pair<Runnable, KHttpExchange> createLongPollTask(KHttpExchange kHttpExchange, HttpObjectSocket httpObjectSocket, StreamSinkChannel streamSinkChannel) {
        return new Pair<>(() -> {
            if (streamSinkChannel.isOpen()) {
                Pair<byte[], Integer> nextQueuedMessage = httpObjectSocket.getNextQueuedMessage();
                byte[] bArr = (byte[]) nextQueuedMessage.getFirst();
                kHttpExchange.setResponseContentLength(bArr.length);
                if (bArr.length == 0) {
                    kHttpExchange.endExchange();
                    return;
                }
                httpObjectSocket.storeLPMessage(((Integer) nextQueuedMessage.getSecond()).intValue(), bArr);
                ByteBuffer wrap = ByteBuffer.wrap(bArr);
                while (wrap.remaining() > 0) {
                    try {
                        streamSinkChannel.write(wrap);
                    } catch (Throwable th) {
                        System.out.println("buffer size:" + bArr.length);
                        try {
                            streamSinkChannel.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                        th.printStackTrace();
                    }
                }
                kHttpExchange.endExchange();
            }
        }, kHttpExchange);
    }

    protected void replyFromHistory(HttpServerExchange httpServerExchange, StreamSinkChannel streamSinkChannel, byte[] bArr) {
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        httpServerExchange.setResponseContentLength(bArr.length);
        streamSinkChannel.getWriteSetter().set(streamSinkChannel2 -> {
            if (wrap.remaining() <= 0) {
                httpServerExchange.endExchange();
                return;
            }
            try {
                streamSinkChannel.write(wrap);
                if (wrap.remaining() == 0) {
                    httpServerExchange.endExchange();
                } else {
                    streamSinkChannel.resumeWrites();
                }
            } catch (Exception e) {
                e.printStackTrace();
                httpServerExchange.endExchange();
            }
        });
        streamSinkChannel.resumeWrites();
    }

    protected void handleRegularRequest(HttpServerExchange httpServerExchange, HttpObjectSocket httpObjectSocket, Object[] objArr, StreamSinkChannel streamSinkChannel) {
        ArrayList arrayList = new ArrayList();
        httpObjectSocket.getSink().receiveObject(objArr, arrayList, httpServerExchange.getRequestHeaders().getFirst("JWT"));
        Runnable runnable = () -> {
            Pair<byte[], Integer> nextQueuedMessage = httpObjectSocket.getNextQueuedMessage();
            byte[] bArr = (byte[]) nextQueuedMessage.getFirst();
            httpServerExchange.setResponseContentLength(bArr.length);
            if (bArr.length == 0) {
                httpServerExchange.endExchange();
                return;
            }
            httpObjectSocket.storeLPMessage(((Integer) nextQueuedMessage.cdr()).intValue(), bArr);
            System.nanoTime();
            ByteBuffer wrap = ByteBuffer.wrap(bArr);
            while (wrap.remaining() > 0) {
                try {
                    streamSinkChannel.write(wrap);
                } catch (IOException e) {
                    Log.Warn(this, e);
                }
            }
            httpServerExchange.endExchange();
        };
        if (arrayList == null || arrayList.size() == 0) {
            runnable.run();
        } else {
            Actors.all(arrayList).timeoutIn(REQUEST_RESULTING_FUTURE_TIMEOUT).then(() -> {
                runnable.run();
            }).onTimeout(() -> {
                runnable.run();
            });
            streamSinkChannel.resumeWrites();
        }
    }
}
