package org.nustaq.kontraktor.asyncio;

import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.remoting.base.ObjectSocket;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.offheap.BinaryQueue;
import org.nustaq.serialization.FSTConfiguration;
import org.nustaq.serialization.util.FSTUtil;

/* loaded from: input_file:org/nustaq/kontraktor/asyncio/ObjectAsyncSocketConnection.class */
public abstract class ObjectAsyncSocketConnection extends QueuingAsyncSocketConnection implements ObjectSocket {
    FSTConfiguration conf;
    Throwable lastError;
    ArrayList objects;

    public ObjectAsyncSocketConnection(SelectionKey selectionKey, SocketChannel socketChannel) {
        super(selectionKey, socketChannel);
        this.objects = new ArrayList();
    }

    public ObjectAsyncSocketConnection(FSTConfiguration fSTConfiguration, SelectionKey selectionKey, SocketChannel socketChannel) {
        super(selectionKey, socketChannel);
        this.objects = new ArrayList();
        setConf(fSTConfiguration);
    }

    @Override // org.nustaq.kontraktor.remoting.base.ObjectSocket
    public void setConf(FSTConfiguration fSTConfiguration) {
        this.conf = fSTConfiguration;
    }

    @Override // org.nustaq.kontraktor.remoting.base.ObjectSocket
    public FSTConfiguration getConf() {
        return this.conf;
    }

    @Override // org.nustaq.kontraktor.asyncio.QueuingAsyncSocketConnection
    public void dataReceived(BinaryQueue binaryQueue) {
        checkThread();
        while (binaryQueue.available() > 4) {
            int readInt = binaryQueue.readInt();
            if (readInt <= 0) {
                System.out.println("object len ?? " + readInt);
                return;
            } else if (binaryQueue.available() < readInt) {
                binaryQueue.back(4);
                return;
            } else {
                receivedObject(this.conf.asObject(binaryQueue.readByteArray(readInt)));
            }
        }
    }

    public abstract void receivedObject(Object obj);

    @Override // org.nustaq.kontraktor.remoting.base.ObjectSocket
    public void writeObject(Object obj) {
        if (this.myActor != null) {
            this.myActor = Actor.current();
        }
        checkThread();
        this.objects.add(obj);
        if (this.objects.size() > 100) {
            try {
                flush();
            } catch (Exception e) {
                FSTUtil.rethrow(e);
            }
        }
    }

    @Override // org.nustaq.kontraktor.remoting.base.ObjectSocket
    public void flush() throws IOException, Exception {
        if (this.theExecutingThread != Thread.currentThread()) {
            if (this.myActor == null) {
                return;
            }
            this.myActor.execute(() -> {
                try {
                    flush();
                } catch (Exception e) {
                    Log.Warn(this, e);
                }
            });
            return;
        }
        checkThread();
        if (this.objects.size() == 0) {
            return;
        }
        this.objects.add(0);
        Object[] array = this.objects.toArray();
        this.objects.clear();
        byte[] asByteArray = this.conf.asByteArray(array);
        write(asByteArray.length);
        write(asByteArray);
        tryFlush();
    }

    @Override // org.nustaq.kontraktor.remoting.base.ObjectSocket
    public Throwable getLastError() {
        return this.lastError;
    }

    @Override // org.nustaq.kontraktor.remoting.base.ObjectSocket
    public void setLastError(Throwable th) {
        this.lastError = th;
    }
}
