package org.nustaq.kontraktor.remoting.http;

import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.Headers;
import io.undertow.util.Methods;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.remoting.base.ActorServer;
import org.nustaq.kontraktor.remoting.base.ActorServerConnector;
import org.nustaq.kontraktor.remoting.base.ObjectSink;
import org.nustaq.kontraktor.remoting.base.ObjectSocket;
import org.nustaq.kontraktor.remoting.base.SessionResurrector;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.kontraktor.util.Pair;
import org.nustaq.serialization.FSTConfiguration;
import org.xnio.channels.StreamSinkChannel;
import org.xnio.channels.StreamSourceChannel;

/* loaded from: input_file:org/nustaq/kontraktor/remoting/http/UndertowHttpServerConnector.class */
public class UndertowHttpServerConnector implements ActorServerConnector, HttpHandler {
    public static int REQUEST_RESULTING_FUTURE_TIMEOUT = 3000;
    public static long SESSION_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(30);
    Actor facade;
    Function<ObjectSocket, ObjectSink> factory;
    private ActorServer actorServer;
    HashMap<String, HttpObjectSocket> sessions = new HashMap<>();
    FSTConfiguration conf = FSTConfiguration.createJsonConfiguration();
    long sessionTimeout = SESSION_TIMEOUT_MS;
    volatile boolean isClosed = false;

    public UndertowHttpServerConnector(Actor actor) {
        this.facade = actor;
        actor.delayed(HttpObjectSocket.LP_TIMEOUT / 2, () -> {
            houseKeeping();
        });
    }

    public void houseKeeping() {
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList(0);
        this.sessions.entrySet().forEach(entry -> {
            HttpObjectSocket httpObjectSocket = (HttpObjectSocket) entry.getValue();
            if (currentTimeMillis - httpObjectSocket.getLongPollTaskTime() >= HttpObjectSocket.LP_TIMEOUT / 2) {
                httpObjectSocket.triggerLongPoll();
            }
            if (currentTimeMillis - httpObjectSocket.getLastUse() > getSessionTimeout()) {
                arrayList.add(entry.getKey());
            }
        });
        arrayList.forEach(str -> {
            closeSession(str);
        });
        if (this.isClosed) {
            return;
        }
        this.facade.delayed(HttpObjectSocket.LP_TIMEOUT / 4, () -> {
            houseKeeping();
        });
    }

    public void setSessionTimeout(long j) {
        this.sessionTimeout = j;
    }

    public long getSessionTimeout() {
        return this.sessionTimeout;
    }

    public void handleRequest(HttpServerExchange httpServerExchange) throws Exception {
        if (httpServerExchange.getRequestMethod() != Methods.POST) {
            httpServerExchange.setResponseCode(404);
            httpServerExchange.endExchange();
            return;
        }
        String relativePath = httpServerExchange.getRelativePath();
        StreamSourceChannel requestChannel = httpServerExchange.getRequestChannel();
        ByteBuffer allocate = ByteBuffer.allocate(Integer.parseInt(httpServerExchange.getRequestHeaders().getFirst(Headers.CONTENT_LENGTH)));
        requestChannel.getReadSetter().set(streamSourceChannel -> {
            try {
                streamSourceChannel.read(allocate);
            } catch (IOException e) {
                e.printStackTrace();
            }
            if (allocate.remaining() == 0) {
                try {
                    requestChannel.shutdownReads();
                } catch (IOException e2) {
                    e2.printStackTrace();
                }
                this.facade.execute(() -> {
                    requestReceived(httpServerExchange, allocate.array(), relativePath);
                });
            }
        });
        requestChannel.resumeReads();
    }

    protected void requestReceived(HttpServerExchange httpServerExchange, byte[] bArr, String str) {
        while (str.startsWith("/")) {
            str = str.substring(1);
        }
        if (str.trim().length() <= 0) {
            if (bArr != null && bArr.length > 0) {
                this.conf.asObject(bArr);
            }
            handleNewSession(httpServerExchange, UUID.randomUUID().toString());
            return;
        }
        String[] split = str.split("/");
        HttpObjectSocket httpObjectSocket = this.sessions.get(split[0]);
        if (httpObjectSocket != null) {
            handleClientRequest(httpServerExchange, httpObjectSocket, bArr, split.length > 1 ? split[1] : null);
            return;
        }
        HttpObjectSocket restoreSessionFromId = restoreSessionFromId(split[0]);
        if (restoreSessionFromId != null) {
            handleClientRequest(httpServerExchange, restoreSessionFromId, bArr, split.length > 1 ? split[1] : null);
        } else {
            httpServerExchange.setResponseCode(404);
            httpServerExchange.endExchange();
        }
    }

    protected HttpObjectSocket restoreSessionFromId(String str) {
        if (!(this.facade instanceof SessionResurrector)) {
            return null;
        }
        this.facade.getActorRef().restoreRemoteRefConnection(str);
        HttpObjectSocket httpObjectSocket = new HttpObjectSocket(str, () -> {
            this.facade.execute(() -> {
                closeSession(str);
            });
        }) { // from class: org.nustaq.kontraktor.remoting.http.UndertowHttpServerConnector.1
            @Override // org.nustaq.kontraktor.remoting.websockets.WebObjectSocket
            protected int getObjectMaxBatchSize() {
                return HttpObjectSocket.HTTP_BATCH_SIZE;
            }

            public String getConnectionIdentifier() {
                return this.sessionId;
            }
        };
        this.sessions.put(httpObjectSocket.getSessionId(), httpObjectSocket);
        httpObjectSocket.setSink(this.factory.apply(httpObjectSocket));
        return httpObjectSocket;
    }

    protected void handleNewSession(HttpServerExchange httpServerExchange, String str) {
        HttpObjectSocket httpObjectSocket = new HttpObjectSocket(str, () -> {
            this.facade.execute(() -> {
                closeSession(str);
            });
        }) { // from class: org.nustaq.kontraktor.remoting.http.UndertowHttpServerConnector.2
            @Override // org.nustaq.kontraktor.remoting.websockets.WebObjectSocket
            protected int getObjectMaxBatchSize() {
                return HttpObjectSocket.HTTP_BATCH_SIZE;
            }

            public String getConnectionIdentifier() {
                return this.sessionId;
            }
        };
        this.sessions.put(httpObjectSocket.getSessionId(), httpObjectSocket);
        httpObjectSocket.setSink(this.factory.apply(httpObjectSocket));
        ByteBuffer wrap = ByteBuffer.wrap(this.conf.asByteArray(httpObjectSocket.getSessionId()));
        httpServerExchange.setResponseCode(200);
        httpServerExchange.setResponseContentLength(r0.length);
        StreamSinkChannel responseChannel = httpServerExchange.getResponseChannel();
        responseChannel.getWriteSetter().set(streamSinkChannel -> {
            if (wrap.remaining() <= 0) {
                Log.Info(this, "client connected " + str);
                httpServerExchange.endExchange();
                return;
            }
            try {
                responseChannel.write(wrap);
                if (wrap.remaining() == 0) {
                    Log.Info(this, "client connected " + str);
                    httpServerExchange.endExchange();
                }
            } catch (IOException e) {
                e.printStackTrace();
                httpServerExchange.endExchange();
            }
        });
        responseChannel.resumeWrites();
    }

    protected HttpObjectSocket closeSession(String str) {
        Log.Info(this, str + " closed");
        HttpObjectSocket httpObjectSocket = this.sessions.get(str);
        if (httpObjectSocket != null) {
            httpObjectSocket.sinkClosed();
        }
        return this.sessions.remove(str);
    }

    public void handleClientRequest(HttpServerExchange httpServerExchange, HttpObjectSocket httpObjectSocket, byte[] bArr, String str) {
        byte[] bArr2;
        StreamSinkChannel responseChannel = httpServerExchange.getResponseChannel();
        if (responseChannel == null) {
            Log.Error(this, "could not aquire response channel. rejecting request.");
            httpServerExchange.endExchange();
            return;
        }
        httpObjectSocket.updateTimeStamp();
        Object[] objArr = (Object[]) httpObjectSocket.getConf().asObject(bArr);
        if (!((objArr instanceof Object[]) && objArr.length == 1 && (objArr[0] instanceof Number))) {
            handleRegularRequest(httpServerExchange, httpObjectSocket, objArr, responseChannel);
            return;
        }
        int i = -1;
        if (str != null) {
            try {
                i = Integer.parseInt(str);
            } catch (Throwable th) {
                Log.Warn(this, th);
            }
        }
        if (i > 0 && (bArr2 = (byte[]) httpObjectSocket.takeStoredLPMessage(i + 1)) != null) {
            replyFromHistory(httpServerExchange, responseChannel, bArr2);
            return;
        }
        responseChannel.resumeWrites();
        Pair<Runnable, HttpServerExchange> createLongPollTask = createLongPollTask(httpServerExchange, httpObjectSocket, responseChannel);
        httpObjectSocket.cancelLongPoll();
        httpObjectSocket.setLongPollTask(createLongPollTask);
    }

    protected Pair<Runnable, HttpServerExchange> createLongPollTask(HttpServerExchange httpServerExchange, HttpObjectSocket httpObjectSocket, StreamSinkChannel streamSinkChannel) {
        return new Pair<>(() -> {
            if (streamSinkChannel.isOpen()) {
                Pair<byte[], Integer> nextQueuedMessage = httpObjectSocket.getNextQueuedMessage();
                byte[] bArr = (byte[]) nextQueuedMessage.getFirst();
                httpServerExchange.setResponseContentLength(bArr.length);
                if (bArr.length == 0) {
                    httpServerExchange.endExchange();
                    return;
                }
                httpObjectSocket.storeLPMessage(((Integer) nextQueuedMessage.getSecond()).intValue(), bArr);
                ByteBuffer wrap = ByteBuffer.wrap(bArr);
                while (wrap.remaining() > 0) {
                    try {
                        streamSinkChannel.write(wrap);
                    } catch (Throwable th) {
                        System.out.println("buffer size:" + bArr.length);
                        try {
                            streamSinkChannel.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                        th.printStackTrace();
                    }
                }
                httpServerExchange.endExchange();
            }
        }, httpServerExchange);
    }

    protected void replyFromHistory(HttpServerExchange httpServerExchange, StreamSinkChannel streamSinkChannel, byte[] bArr) {
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        httpServerExchange.setResponseContentLength(bArr.length);
        streamSinkChannel.getWriteSetter().set(streamSinkChannel2 -> {
            if (wrap.remaining() <= 0) {
                httpServerExchange.endExchange();
                return;
            }
            try {
                streamSinkChannel.write(wrap);
                if (wrap.remaining() == 0) {
                    httpServerExchange.endExchange();
                } else {
                    streamSinkChannel.resumeWrites();
                }
            } catch (Exception e) {
                e.printStackTrace();
                httpServerExchange.endExchange();
            }
        });
        streamSinkChannel.resumeWrites();
    }

    protected void handleRegularRequest(HttpServerExchange httpServerExchange, HttpObjectSocket httpObjectSocket, Object[] objArr, StreamSinkChannel streamSinkChannel) {
        ArrayList arrayList = new ArrayList();
        httpObjectSocket.getSink().receiveObject(objArr, arrayList);
        Runnable runnable = () -> {
            Pair<byte[], Integer> nextQueuedMessage = httpObjectSocket.getNextQueuedMessage();
            byte[] bArr = (byte[]) nextQueuedMessage.getFirst();
            httpServerExchange.setResponseContentLength(bArr.length);
            if (bArr.length == 0) {
                httpServerExchange.endExchange();
                return;
            }
            httpObjectSocket.storeLPMessage(((Integer) nextQueuedMessage.cdr()).intValue(), bArr);
            System.nanoTime();
            ByteBuffer wrap = ByteBuffer.wrap(bArr);
            while (wrap.remaining() > 0) {
                try {
                    streamSinkChannel.write(wrap);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            httpServerExchange.endExchange();
        };
        if (arrayList == null || arrayList.size() == 0) {
            runnable.run();
        } else {
            Actors.all(arrayList).timeoutIn(REQUEST_RESULTING_FUTURE_TIMEOUT).then(() -> {
                runnable.run();
            }).onTimeout(() -> {
                runnable.run();
            });
            streamSinkChannel.resumeWrites();
        }
    }

    public void connect(Actor actor, Function<ObjectSocket, ObjectSink> function) throws Exception {
        this.facade = actor;
        this.factory = function;
    }

    public IPromise closeServer() {
        this.isClosed = true;
        return new Promise((Object) null);
    }

    public void setActorServer(ActorServer actorServer) {
        this.actorServer = actorServer;
    }

    public ActorServer getActorServer() {
        return this.actorServer;
    }
}
