package org.nustaq.kontraktor.remoting.tcp;

import java.io.EOFException;
import java.io.IOException;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.remoting.base.ActorClientConnector;
import org.nustaq.kontraktor.remoting.base.ObjectSink;
import org.nustaq.kontraktor.remoting.base.ObjectSocket;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.net.TCPObjectSocket;
import org.nustaq.serialization.util.FSTUtil;

/* loaded from: input_file:org/nustaq/kontraktor/remoting/tcp/TCPClientConnector.class */
public class TCPClientConnector implements ActorClientConnector {
    protected static AtomicReference<RemotingHelper> singleton = new AtomicReference<>();
    protected int port;
    protected String host;
    protected MyTCPSocket socket;
    protected Callback<ActorClientConnector> disconnectCallback;

    /* loaded from: input_file:org/nustaq/kontraktor/remoting/tcp/TCPClientConnector$MyTCPSocket.class */
    static class MyTCPSocket extends TCPObjectSocket implements ObjectSocket {
        static AtomicInteger idCount = new AtomicInteger(0);
        int id;
        ArrayList objects;

        public MyTCPSocket(String str, int i) throws IOException {
            super(str, i);
            this.id = idCount.incrementAndGet();
            this.objects = new ArrayList();
            getSocket().setKeepAlive(true);
        }

        @Override // org.nustaq.kontraktor.remoting.base.ObjectSocket
        public void writeObject(Object obj) throws Exception {
            this.objects.add(obj);
            if (this.objects.size() > 100) {
                flush();
            }
        }

        @Override // org.nustaq.kontraktor.remoting.base.ObjectSocket
        public void flush() throws IOException {
            if (this.objects.size() == 0) {
                return;
            }
            this.objects.add(0);
            Object[] array = this.objects.toArray();
            this.objects.clear();
            try {
                super.writeObject(array);
            } catch (Exception e) {
                Log.Error(this, "** FAILED TO SERIALIZE:" + Arrays.toString(array));
                FSTUtil.rethrow(e);
            }
            super.flush();
        }

        @Override // org.nustaq.kontraktor.remoting.base.ObjectSocket
        public int getId() {
            return this.id;
        }
    }

    /* loaded from: input_file:org/nustaq/kontraktor/remoting/tcp/TCPClientConnector$RemotingHelper.class */
    public static class RemotingHelper extends Actor<RemotingHelper> {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public static RemotingHelper get() {
        RemotingHelper remotingHelper;
        synchronized (singleton) {
            if (singleton.get() == null) {
                singleton.set(Actors.AsActor(RemotingHelper.class));
            }
            remotingHelper = singleton.get();
        }
        return remotingHelper;
    }

    public TCPClientConnector(int i, String str, Callback<ActorClientConnector> callback) {
        this.port = i;
        this.host = str;
        this.disconnectCallback = callback;
    }

    @Override // org.nustaq.kontraktor.remoting.base.ActorClientConnector
    public IPromise connect(Function<ObjectSocket, ObjectSink> function) throws Exception {
        Promise promise = new Promise();
        this.socket = new MyTCPSocket(this.host, this.port);
        ObjectSink apply = function.apply(this.socket);
        new Thread(() -> {
            promise.resolve();
            while (!this.socket.isClosed()) {
                try {
                    apply.receiveObject(this.socket.readObject(), null);
                } catch (Exception e) {
                    if ((e instanceof EOFException) || (e instanceof SocketException)) {
                        Log.Warn(this, e.getMessage());
                    } else {
                        Log.Warn(this, e);
                    }
                    try {
                        this.socket.close();
                    } catch (IOException e2) {
                        Log.Warn(this, e.getMessage());
                    }
                }
            }
            if (this.disconnectCallback != null) {
                this.disconnectCallback.complete(this, null);
            }
            apply.sinkClosed();
        }, "tcp client receiver").start();
        return promise;
    }

    @Override // org.nustaq.kontraktor.remoting.base.ActorClientConnector
    public IPromise closeClient() {
        try {
            this.socket.close();
            return new Promise();
        } catch (IOException e) {
            e.printStackTrace();
            return new Promise(e);
        }
    }
}
