package org.nustaq.kontraktor.remoting.http;

import io.undertow.server.HttpServerExchange;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.remoting.base.ObjectSink;
import org.nustaq.kontraktor.remoting.base.messagestore.HeapMessageStore;
import org.nustaq.kontraktor.remoting.base.messagestore.MessageStore;
import org.nustaq.kontraktor.remoting.websockets.WebObjectSocket;
import org.nustaq.kontraktor.util.Pair;
import org.nustaq.offheap.BinaryQueue;
import org.nustaq.offheap.bytez.onheap.HeapBytez;
import org.nustaq.serialization.util.FSTUtil;

/* loaded from: input_file:org/nustaq/kontraktor/remoting/http/HttpObjectSocket.class */
public class HttpObjectSocket extends WebObjectSocket implements ObjectSink {
    final Runnable closeAction;
    String sessionId;
    ObjectSink sink;
    Pair<Runnable, HttpServerExchange> longPollTask;
    Thread myThread;
    long longPollTaskTime;
    static AtomicInteger idCount = new AtomicInteger(0);
    public static int LP_TIMEOUT = 15000;
    public static int HISTORY_SIZE = 3;
    public static int HTTP_BATCH_SIZE = 500;
    int id = idCount.incrementAndGet();
    long lastUse = System.currentTimeMillis();
    long creation = this.lastUse;
    BinaryQueue queue = new BinaryQueue(4096);
    MessageStore store = new HeapMessageStore(HISTORY_SIZE);

    public HttpObjectSocket(String str, Runnable runnable) {
        this.sessionId = str;
        this.closeAction = runnable;
    }

    public String getSessionId() {
        return this.sessionId;
    }

    public void updateTimeStamp() {
        this.lastUse = System.currentTimeMillis();
    }

    public long getLastUse() {
        return this.lastUse;
    }

    @Override // org.nustaq.kontraktor.remoting.websockets.WebObjectSocket
    public void sendBinary(byte[] bArr) {
        checkThread();
        this.queue.addInt(this.sendSequence.get());
        this.queue.addInt(bArr.length);
        this.queue.add(new HeapBytez(bArr));
        triggerLongPoll();
    }

    @Override // org.nustaq.kontraktor.remoting.websockets.WebObjectSocket
    public void writeObject(Object obj) throws Exception {
        checkThread();
        super.writeObject(obj);
    }

    public void close() throws IOException {
        if (this.closeAction != null) {
            this.closeAction.run();
        }
    }

    public int getId() {
        return this.id;
    }

    public void setSink(ObjectSink objectSink) {
        this.sink = objectSink;
    }

    public ObjectSink getSink() {
        return this;
    }

    public Pair<byte[], Integer> getNextQueuedMessage() {
        checkThread();
        try {
            flush();
        } catch (Exception e) {
            FSTUtil.rethrow(e);
        }
        if (this.queue.available() <= 8) {
            return new Pair<>(new byte[0], 0);
        }
        int readInt = this.queue.readInt();
        int readInt2 = this.queue.readInt();
        return (readInt2 <= 0 || this.queue.available() < ((long) readInt2)) ? new Pair<>(new byte[0], 0) : new Pair<>(this.queue.readByteArray(readInt2), Integer.valueOf(readInt));
    }

    protected void checkThread() {
        if (this.myThread == null) {
            this.myThread = Thread.currentThread();
        } else if (this.myThread != Thread.currentThread()) {
            System.out.println("unexpected multithreading detected:" + this.myThread.getName() + " curr:" + Thread.currentThread().getName());
            Thread.dumpStack();
        }
    }

    public Pair<Runnable, HttpServerExchange> getLongPollTask() {
        return this.longPollTask;
    }

    public void cancelLongPoll() {
        synchronized (this) {
            if (this.longPollTask != null) {
                ((HttpServerExchange) this.longPollTask.cdr()).endExchange();
                this.longPollTask = null;
            }
        }
    }

    public void triggerLongPoll() {
        synchronized (this) {
            if (this.longPollTask != null) {
                Runnable runnable = (Runnable) this.longPollTask.car();
                this.longPollTask = null;
                runnable.run();
            }
        }
    }

    public void setLongPollTask(Pair<Runnable, HttpServerExchange> pair) {
        synchronized (this) {
            this.longPollTask = pair;
            this.longPollTaskTime = System.currentTimeMillis();
        }
    }

    public long getLongPollTaskTime() {
        return this.longPollTaskTime;
    }

    public void receiveObject(ObjectSink objectSink, Object obj, List<IPromise> list) {
        this.sink.receiveObject(objectSink, obj, list);
    }

    public void sinkClosed() {
        this.sink.sinkClosed();
    }

    public Object takeStoredLPMessage(int i) {
        return this.store.getMessage("sen", i);
    }

    public void storeLPMessage(int i, Object obj) {
        this.store.putMessage("sen", i, obj);
    }

    @Override // org.nustaq.kontraktor.remoting.websockets.WebObjectSocket
    public void flush() throws Exception {
        if (this.objects.size() == 0) {
            return;
        }
        this.objects.add(Integer.valueOf(this.sendSequence.incrementAndGet()));
        Object[] array = this.objects.toArray();
        this.objects.clear();
        sendBinary(this.conf.asByteArray(array));
    }
}
