package org.nustaq.kontraktor.remoting.tcp;

import java.io.EOFException;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.function.Function;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.remoting.base.ActorServer;
import org.nustaq.kontraktor.remoting.base.ActorServerConnector;
import org.nustaq.kontraktor.remoting.base.ObjectSink;
import org.nustaq.kontraktor.remoting.base.ObjectSocket;
import org.nustaq.kontraktor.remoting.encoding.Coding;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.net.TCPObjectSocket;
import org.nustaq.serialization.FSTConfiguration;

/* loaded from: input_file:org/nustaq/kontraktor/remoting/tcp/TCPServerConnector.class */
public class TCPServerConnector implements ActorServerConnector {
    int port;
    protected ServerSocket acceptSocket;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/nustaq/kontraktor/remoting/tcp/TCPServerConnector$MyTCPSocket.class */
    public static class MyTCPSocket extends TCPObjectSocket implements ObjectSocket {
        public MyTCPSocket(Socket socket) throws IOException {
            super(socket, (FSTConfiguration) null);
        }
    }

    public static Promise<ActorServer> Publish(Actor actor, int i, Coding coding) {
        Promise<ActorServer> promise = new Promise<>();
        try {
            ActorServer actorServer = new ActorServer(new TCPServerConnector(i), actor, coding);
            actor.execute(() -> {
                try {
                    actorServer.start();
                    promise.resolve(actorServer);
                } catch (Exception e) {
                    promise.reject(e);
                }
            });
            return promise;
        } catch (Exception e) {
            e.printStackTrace();
            return new Promise<>(null, e);
        }
    }

    public TCPServerConnector(int i) {
        this.port = i;
    }

    @Override // org.nustaq.kontraktor.remoting.base.ActorServerConnector
    public void connect(Actor actor, Function<ObjectSocket, ObjectSink> function) throws Exception {
        Promise promise = new Promise();
        new Thread(() -> {
            acceptLoop(actor, this.port, function, promise);
        }, "acceptor thread " + this.port).start();
        promise.await();
    }

    protected Promise acceptLoop(Actor actor, int i, Function<ObjectSocket, ObjectSink> function, Promise promise) {
        try {
            try {
                this.acceptSocket = new ServerSocket(i);
                promise.resolve();
                while (!this.acceptSocket.isClosed()) {
                    Socket accept = this.acceptSocket.accept();
                    MyTCPSocket myTCPSocket = new MyTCPSocket(accept);
                    actor.execute(() -> {
                        ObjectSink objectSink = (ObjectSink) function.apply(myTCPSocket);
                        new Thread(() -> {
                            while (!accept.isClosed()) {
                                try {
                                    objectSink.receiveObject(myTCPSocket.readObject(), null);
                                } catch (Exception e) {
                                    if (!(e instanceof EOFException)) {
                                        Log.Warn(this, e);
                                    }
                                }
                            }
                            objectSink.sinkClosed();
                        }, "tcp receiver").start();
                    });
                }
                if (!promise.isSettled()) {
                    promise.reject("conneciton failed");
                }
                try {
                    this.acceptSocket.close();
                } catch (IOException e) {
                    Log.Warn(this, e);
                }
            } catch (Exception e2) {
                Log.Info(this, e2.getMessage());
                if (!promise.isSettled()) {
                    promise.reject(e2);
                }
                if (!promise.isSettled()) {
                    promise.reject("conneciton failed");
                }
                try {
                    this.acceptSocket.close();
                } catch (IOException e3) {
                    Log.Warn(this, e3);
                }
            }
            return promise;
        } catch (Throwable th) {
            if (!promise.isSettled()) {
                promise.reject("conneciton failed");
            }
            try {
                this.acceptSocket.close();
            } catch (IOException e4) {
                Log.Warn(this, e4);
            }
            throw th;
        }
    }

    @Override // org.nustaq.kontraktor.remoting.base.ActorServerConnector
    public IPromise closeServer() {
        try {
            this.acceptSocket.close();
            return new Promise(null);
        } catch (IOException e) {
            return new Promise(null, e);
        }
    }
}
