package xyz.cofe.trambda.tcp;

import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import xyz.cofe.ecolls.ListenersHelper;
import xyz.cofe.fn.Tuple2;
import xyz.cofe.trambda.LambdaDump;
import xyz.cofe.trambda.LambdaNode;
import xyz.cofe.trambda.sec.SecurityFilter;
import xyz.cofe.trambda.tcp.TcpSession;

/* loaded from: input_file:xyz/cofe/trambda/tcp/TcpServer.class */
public class TcpServer<ENV> extends Thread implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(TcpServer.class);
    protected final ServerSocket socket;
    protected final Set<TcpSession<ENV>> sessions;
    protected final Map<Integer, Long> fireClosed;
    protected final Function<TcpSession<ENV>, ENV> envBuilder;
    protected final SecurityFilter<String, Tuple2<LambdaDump, LambdaNode>> securityFilter;
    private final TrListener sesListener;
    protected final ListenersHelper<TrListener, TrEvent> listeners;
    protected final Map<String, Publisher<?>> publishers;
    private final Map<Class<?>, Object> proxyPublishers;

    /* loaded from: input_file:xyz/cofe/trambda/tcp/TcpServer$SessionClosed.class */
    public static class SessionClosed<ENV> implements TrEvent {
        private final TcpServer<ENV> server;
        private final TcpSession<ENV> session;

        public SessionClosed(TcpServer<ENV> tcpServer, TcpSession<ENV> tcpSession) {
            if (tcpServer == null) {
                throw new IllegalArgumentException("server==null");
            }
            if (tcpSession == null) {
                throw new IllegalArgumentException("session==null");
            }
            this.server = tcpServer;
            this.session = tcpSession;
        }

        public TcpServer<ENV> getServer() {
            return this.server;
        }

        public TcpSession<ENV> getSession() {
            return this.session;
        }

        public String toString() {
            return SessionClosed.class.getSimpleName() + " session.id=" + this.session.sid;
        }
    }

    /* loaded from: input_file:xyz/cofe/trambda/tcp/TcpServer$SessionCreated.class */
    public static class SessionCreated<ENV> implements TrEvent {
        private final TcpServer<ENV> server;
        private final TcpSession<ENV> session;

        public SessionCreated(TcpServer<ENV> tcpServer, TcpSession<ENV> tcpSession) {
            if (tcpServer == null) {
                throw new IllegalArgumentException("server==null");
            }
            if (tcpSession == null) {
                throw new IllegalArgumentException("session==null");
            }
            this.server = tcpServer;
            this.session = tcpSession;
        }

        public TcpServer<ENV> getServer() {
            return this.server;
        }

        public TcpSession<ENV> getSession() {
            return this.session;
        }

        public String toString() {
            return SessionCreated.class.getSimpleName() + " session.id=" + this.session.sid;
        }
    }

    public Set<TcpSession<ENV>> getSessions() {
        return this.sessions;
    }

    public TcpServer(ServerSocket serverSocket, Function<TcpSession<ENV>, ENV> function, SecurityFilter<String, Tuple2<LambdaDump, LambdaNode>> securityFilter) {
        this.fireClosed = new ConcurrentHashMap();
        this.sesListener = trEvent -> {
            withQueue(() -> {
                TcpSession<?> session;
                if (!(trEvent instanceof TcpSession.SessionFinished) || (session = ((TcpSession.SessionFinished) trEvent).getSession()) == null) {
                    return;
                }
                synchronized (session) {
                    if (!this.fireClosed.containsKey(Integer.valueOf(session.sid))) {
                        fireEvent(new SessionClosed(this, session));
                        this.fireClosed.put(Integer.valueOf(session.sid), Long.valueOf(System.currentTimeMillis()));
                    }
                }
            });
            cleanup_fireClosed();
        };
        this.listeners = new ListenersHelper<>((v0, v1) -> {
            v0.trEvent(v1);
        });
        this.publishers = new ConcurrentHashMap();
        this.proxyPublishers = new ConcurrentHashMap();
        if (serverSocket == null) {
            throw new IllegalArgumentException("socket==null");
        }
        if (function == null) {
            throw new IllegalArgumentException("envBuilder==null");
        }
        this.envBuilder = function;
        this.socket = serverSocket;
        this.sessions = new ConcurrentSkipListSet();
        if (securityFilter != null) {
            this.securityFilter = securityFilter;
        } else {
            this.securityFilter = list -> {
                return List.of();
            };
        }
    }

    public TcpServer(ServerSocket serverSocket, Function<TcpSession<ENV>, ENV> function) {
        this(serverSocket, function, null);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (true) {
            if (Thread.currentThread().isInterrupted()) {
                log.info("interrupted");
                break;
            }
            if (this.socket.isClosed()) {
                log.info("socket closed");
                break;
            }
            try {
                this.sessions.add(create(this.socket.accept()));
            } catch (SocketException e) {
                if (e.getMessage() == null || !e.getMessage().matches("(?i).*socket\\s+closed.*")) {
                    log.error("socket err", e);
                } else {
                    log.info("socket closed");
                }
            } catch (SocketTimeoutException e2) {
                log.trace("SocketTimeoutException");
                if (Thread.currentThread().isInterrupted()) {
                    log.info("interrupted");
                    break;
                }
                checkTerminatedSessions();
            } catch (IOException e3) {
                log.error("accept", e3);
            }
        }
        closeSocket();
        closeSessions();
    }

    protected int sessionSoTimeout() {
        return 3000;
    }

    protected TcpSession<ENV> create(Socket socket) {
        TcpSession<ENV> tcpSession = new TcpSession<>(this, socket, this.envBuilder, this.securityFilter);
        try {
            socket.setSoTimeout(sessionSoTimeout());
        } catch (SocketException e) {
            log.warn("can't set so timeout");
        }
        tcpSession.setDaemon(true);
        tcpSession.setName("session#" + tcpSession.sid + "(" + socket.getRemoteSocketAddress() + ")");
        addSesListener(tcpSession);
        fireEvent(new SessionCreated(this, tcpSession));
        log.info("starting session {}", tcpSession.getName());
        tcpSession.start();
        return tcpSession;
    }

    public synchronized void shutdown() {
        log.info("shutdown");
        closeSocket();
        closeSessions();
    }

    protected void closeSocket() {
        if (this.socket.isClosed()) {
            return;
        }
        try {
            log.info("close socket");
            this.socket.close();
        } catch (IOException e) {
            log.info("socket close error", e);
        }
    }

    protected long sessionCloseTimeout() {
        return 5000L;
    }

    protected void closeSessions() {
        log.info("close sessions");
        for (TcpSession<ENV> tcpSession : this.sessions) {
            if (tcpSession.isAlive()) {
                tcpSession.close();
                try {
                    tcpSession.join(sessionCloseTimeout());
                } catch (InterruptedException e) {
                    log.warn("session {} close not responsed", Integer.valueOf(tcpSession.sid));
                    tcpSession.stop();
                }
            }
        }
    }

    protected void addSesListener(TcpSession<ENV> tcpSession) {
        if (tcpSession == null) {
            throw new IllegalArgumentException("ses==null");
        }
        tcpSession.addListener(this.sesListener);
    }

    private void checkTerminatedSessions() {
        HashSet hashSet = new HashSet();
        withQueue(() -> {
            TcpSession<ENV> next;
            Iterator<TcpSession<ENV>> it = this.sessions.iterator();
            while (it.hasNext() && (next = it.next()) != null) {
                if (!next.isAlive()) {
                    hashSet.add(next);
                    synchronized (next) {
                        if (!this.fireClosed.containsKey(Integer.valueOf(next.sid))) {
                            fireEvent(new SessionClosed(this, next));
                            this.fireClosed.put(Integer.valueOf(next.sid), Long.valueOf(System.currentTimeMillis()));
                        }
                    }
                }
            }
        });
        if (!hashSet.isEmpty()) {
            log.info("remove closed sessions, count={}", Integer.valueOf(hashSet.size()));
            hashSet.forEach(tcpSession -> {
                log.debug("remove closed session id={} name={}", Integer.valueOf(tcpSession.sid), tcpSession.getName());
            });
        }
        this.sessions.removeAll(hashSet);
        cleanup_fireClosed();
    }

    private void cleanup_fireClosed() {
        int sessionSoTimeout = sessionSoTimeout();
        int i = sessionSoTimeout > 0 ? sessionSoTimeout * 3 : 15000;
        long currentTimeMillis = System.currentTimeMillis();
        Set set = (Set) this.fireClosed.entrySet().stream().filter(entry -> {
            return currentTimeMillis - ((Long) entry.getValue()).longValue() > ((long) i);
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toUnmodifiableSet());
        Map<Integer, Long> map = this.fireClosed;
        Objects.requireNonNull(map);
        set.forEach((v1) -> {
            r1.remove(v1);
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        shutdown();
    }

    public boolean hasListener(TrListener trListener) {
        return this.listeners.hasListener(trListener);
    }

    public Set<TrListener> getListeners() {
        return this.listeners.getListeners();
    }

    public AutoCloseable addListener(TrListener trListener) {
        return this.listeners.addListener(trListener);
    }

    public AutoCloseable addListener(TrListener trListener, boolean z) {
        return this.listeners.addListener(trListener, z);
    }

    public void removeListener(TrListener trListener) {
        this.listeners.removeListener(trListener);
    }

    public void removeAllListeners() {
        this.listeners.removeAllListeners();
    }

    protected void withQueue(Runnable runnable) {
        this.listeners.withQueue(runnable);
    }

    protected <T> T withQueue(Supplier<T> supplier) {
        return (T) this.listeners.withQueue(supplier);
    }

    protected void fireEvent(TrEvent trEvent) {
        this.listeners.fireEvent(trEvent);
    }

    protected void addEvent(TrEvent trEvent) {
        this.listeners.addEvent(trEvent);
    }

    protected void runEventQueue() {
        this.listeners.runEventQueue();
    }

    public <T extends Serializable> Publisher<T> publisher(String str) {
        if (str == null) {
            throw new IllegalArgumentException("name==null");
        }
        return (Publisher) this.publishers.computeIfAbsent(str, str2 -> {
            return createPublisher(str2);
        });
    }

    protected Publisher<?> createPublisher(String str) {
        log.info("create publisher {}", str);
        return new Publisher<>();
    }

    public <T> T publishers(Class<T> cls) {
        if (cls == null) {
            throw new IllegalArgumentException("cls==null");
        }
        return (T) this.proxyPublishers.computeIfAbsent(cls, this::createProxyPublisher);
    }

    protected <T> T createProxyPublisher(Class<T> cls) {
        if (cls == null) {
            throw new IllegalArgumentException("cls==null");
        }
        return (T) new PubProxy() { // from class: xyz.cofe.trambda.tcp.TcpServer.1
            @Override // xyz.cofe.trambda.tcp.PubProxy
            protected Publisher<?> publisher(Method method) {
                return TcpServer.this.publisher(method.getName());
            }
        }.proxy(cls);
    }
}
