package xyz.cofe.trambda.tcp;

import java.io.IOError;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Method;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import xyz.cofe.ecolls.ListenersHelper;
import xyz.cofe.fn.Tuple2;
import xyz.cofe.fn.Tuple3;
import xyz.cofe.text.Text;
import xyz.cofe.trambda.LambdaDump;
import xyz.cofe.trambda.LambdaNode;
import xyz.cofe.trambda.sec.SecurAccess;
import xyz.cofe.trambda.sec.SecurMessage;
import xyz.cofe.trambda.sec.SecurityFilter;
import xyz.cofe.trambda.tcp.Publisher;

/* loaded from: input_file:xyz/cofe/trambda/tcp/TcpSession.class */
public class TcpSession<ENV> extends Thread implements Comparable<TcpSession<ENV>> {
    private static final Logger log = LoggerFactory.getLogger(TcpSession.class);
    private static final AtomicInteger idSeq = new AtomicInteger();
    public final int sid;
    protected final Socket socket;
    protected final TcpProtocol proto;
    protected final SecurityFilter<String, Tuple2<LambdaDump, LambdaNode>> securityFilter;
    protected final TcpServer<ENV> server;
    protected final ENV service;
    protected final ListenersHelper<TrListener, TrEvent> listeners;
    protected final AtomicInteger compileId;
    protected final Map<Integer, Method> compiled;
    protected final Map<String, Integer> methodDefHash2compileKey;
    protected final Map<String, Tuple3<Publisher.Subscriber<Serializable>, Long, AutoCloseable>> subscribers;

    /* loaded from: input_file:xyz/cofe/trambda/tcp/TcpSession$SessionFinished.class */
    public static class SessionFinished implements TrEvent {
        private final TcpSession<?> session;

        public SessionFinished(TcpSession<?> tcpSession) {
            if (tcpSession == null) {
                throw new IllegalArgumentException("session==null");
            }
            this.session = tcpSession;
        }

        public TcpSession<?> getSession() {
            return this.session;
        }
    }

    public TcpSession(TcpServer<ENV> tcpServer, Socket socket, Function<TcpSession<ENV>, ENV> function) {
        this(tcpServer, socket, function, null);
    }

    public TcpSession(TcpServer<ENV> tcpServer, Socket socket, Function<TcpSession<ENV>, ENV> function, SecurityFilter<String, Tuple2<LambdaDump, LambdaNode>> securityFilter) {
        this.sid = idSeq.incrementAndGet();
        this.listeners = new ListenersHelper<>((v0, v1) -> {
            v0.trEvent(v1);
        });
        this.compileId = new AtomicInteger();
        this.compiled = new ConcurrentHashMap();
        this.methodDefHash2compileKey = new ConcurrentHashMap();
        this.subscribers = new HashMap();
        if (tcpServer == null) {
            throw new IllegalArgumentException("server==null");
        }
        if (socket == null) {
            throw new IllegalArgumentException("socket==null");
        }
        if (function == null) {
            throw new IllegalArgumentException("envBuilder==null");
        }
        this.server = tcpServer;
        this.securityFilter = (SecurityFilter) Objects.requireNonNullElseGet(securityFilter, () -> {
            return list -> {
                return List.of();
            };
        });
        this.socket = socket;
        this.proto = new TcpProtocol(socket);
        this.service = function.apply(this);
    }

    public TcpServer<ENV> getServer() {
        return this.server;
    }

    public ENV getService() {
        return this.service;
    }

    public boolean hasListener(TrListener trListener) {
        return this.listeners.hasListener(trListener);
    }

    public Set<TrListener> getListeners() {
        return this.listeners.getListeners();
    }

    public AutoCloseable addListener(TrListener trListener) {
        return this.listeners.addListener(trListener);
    }

    public AutoCloseable addListener(TrListener trListener, boolean z) {
        return this.listeners.addListener(trListener, z);
    }

    public void removeListener(TrListener trListener) {
        this.listeners.removeListener(trListener);
    }

    public void removeAllListeners() {
        this.listeners.removeAllListeners();
    }

    protected void withQueue(Runnable runnable) {
        this.listeners.withQueue(runnable);
    }

    protected <T> T withQueue(Supplier<T> supplier) {
        return (T) this.listeners.withQueue(supplier);
    }

    protected void fireEvent(TrEvent trEvent) {
        this.listeners.fireEvent(trEvent);
    }

    protected void addEvent(TrEvent trEvent) {
        this.listeners.addEvent(trEvent);
    }

    protected void runEventQueue() {
        this.listeners.runEventQueue();
    }

    public SocketAddress getLocalAddress() {
        return this.socket.getLocalSocketAddress();
    }

    public SocketAddress getRemoteAddress() {
        return this.socket.getRemoteSocketAddress();
    }

    public boolean isBound() {
        return this.socket.isBound();
    }

    public boolean isClosed() {
        return this.socket.isClosed();
    }

    public boolean isInputShutdown() {
        return this.socket.isInputShutdown();
    }

    public boolean isOutputShutdown() {
        return this.socket.isOutputShutdown();
    }

    public Optional<Boolean> getKeepAlive() {
        try {
            return Optional.of(Boolean.valueOf(this.socket.getKeepAlive()));
        } catch (SocketException e) {
            log.warn("socket info", e);
            return Optional.empty();
        }
    }

    public Optional<Boolean> getTcpNoDelay() {
        try {
            return Optional.of(Boolean.valueOf(this.socket.getTcpNoDelay()));
        } catch (SocketException e) {
            log.warn("socket info", e);
            return Optional.empty();
        }
    }

    public Optional<Boolean> getReuseAddress() {
        try {
            return Optional.of(Boolean.valueOf(this.socket.getReuseAddress()));
        } catch (SocketException e) {
            log.warn("socket info", e);
            return Optional.empty();
        }
    }

    public Optional<Boolean> getOOBInline() {
        try {
            return Optional.of(Boolean.valueOf(this.socket.getOOBInline()));
        } catch (SocketException e) {
            log.warn("socket info", e);
            return Optional.empty();
        }
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        return obj != null && getClass() == obj.getClass() && this.sid == ((TcpSession) obj).sid;
    }

    public int hashCode() {
        return Objects.hash(Integer.valueOf(this.sid));
    }

    @Override // java.lang.Comparable
    public int compareTo(TcpSession tcpSession) {
        if (tcpSession == null) {
            return -1;
        }
        return Integer.compare(this.sid, tcpSession.sid);
    }

    public void close() {
        unsubscribeAll();
        if (this.socket.isClosed()) {
            return;
        }
        try {
            this.socket.close();
        } catch (IOException e) {
            log.error("socket info", e);
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        Optional<Tuple2<RawPack, Optional<BuffInputStream>>> receiveRaw;
        while (true) {
            try {
                receiveRaw = this.proto.receiveRaw(Optional.empty());
            } catch (SocketException e) {
                if (e.getMessage() == null || !e.getMessage().matches("(?i).*socket\\s+closed.*")) {
                    log.error("socket err", e);
                } else {
                    log.info("socket closed");
                }
            } catch (SocketTimeoutException e2) {
                log.trace("io err, session={} {}", Integer.valueOf(this.sid), e2);
                System.out.println("try repeat read");
            } catch (IOException e3) {
                log.warn("io err, session={} {}", Integer.valueOf(this.sid), e3);
            }
            if (receiveRaw.isEmpty()) {
                break;
            }
            received(((RawPack) receiveRaw.get().a()).toReadonly());
        }
        if (!this.socket.isClosed()) {
            try {
                log.info("close socket {}", this.socket.getRemoteSocketAddress());
                this.socket.close();
            } catch (IOException e4) {
                log.warn("socket close error");
            }
        }
        fireEvent(new SessionFinished(this));
        unsubscribeAll();
    }

    protected void received(RawPackReadonly rawPackReadonly) {
        if (!rawPackReadonly.isPayloadChecksumMatched()) {
            log.warn("received bad payload");
            return;
        }
        try {
            Message payloadMessage = rawPackReadonly.payloadMessage();
            if (payloadMessage == null) {
                return;
            }
            process(payloadMessage, rawPackReadonly.getHeader());
        } catch (IOError e) {
            log.error("payload de serialization fail", e);
        }
    }

    protected void process(Message message, TcpHeader tcpHeader) {
        if (message instanceof Ping) {
            process((Ping) message, tcpHeader);
            return;
        }
        if (message instanceof Compile) {
            process((Compile) message, tcpHeader);
            return;
        }
        if (message instanceof Execute) {
            process((Execute) message, tcpHeader);
        } else if (message instanceof Subscribe) {
            process((Subscribe) message, tcpHeader);
        } else if (message instanceof UnSubscribe) {
            process((UnSubscribe) message, tcpHeader);
        }
    }

    protected void process(Ping ping, TcpHeader tcpHeader) {
        try {
            Optional<Integer> sid = tcpHeader.getSid();
            if (sid.isPresent()) {
                this.proto.send(new Pong(), TcpHeader.referrer.create(sid.get()));
            } else {
                this.proto.send(new Pong(), new HeaderValue[0]);
            }
        } catch (IOException e) {
            log.error("fail send response");
        }
    }

    protected Method compile(LambdaDump lambdaDump, String str) {
        LambdaNode lambdaNode = lambdaDump.getLambdaNode();
        log.info("compile '{}' hash {}", lambdaNode != null ? (String) lambdaNode.walk().map((v0) -> {
            return v0.getMethod();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).map(cMethod -> {
            return cMethod.getName() + ":" + cMethod.getDescriptor();
        }).reduce("", (str2, str3) -> {
            return str2 + " " + str3;
        }) : "?", str);
        if (this.securityFilter != null) {
            log.debug("inspect byte code for SecurAccess");
            List inspect = SecurAccess.inspect(lambdaDump);
            if (log.isTraceEnabled()) {
                inspect.forEach(securAccess -> {
                    log.trace("SecurAccess: {}", securAccess);
                });
            }
            List validate = this.securityFilter.validate(inspect);
            if (validate.stream().anyMatch(securMessage -> {
                return !securMessage.isAllow();
            })) {
                log.info("Lambda contains denied byte code");
                validate.forEach(securMessage2 -> {
                    if (log.isDebugEnabled() && securMessage2.isAllow()) {
                        log.info("allow: security message=\"{}\" access=\"{}\"", securMessage2.getMessage(), securMessage2.getAccess());
                    }
                    if (securMessage2.isAllow()) {
                        return;
                    }
                    log.info("deny: security message=\"{}\" access=\"{}\"", securMessage2.getMessage(), securMessage2.getAccess());
                });
                throw new SecurError((List<SecurMessage<String, Tuple2<LambdaDump, LambdaNode>>>) validate);
            }
        }
        return lambdaDump.restore().classLoader(cBegin -> {
            return new ClassLoader() { // from class: xyz.cofe.trambda.tcp.TcpSession.1
                protected void finalize() throws Throwable {
                    try {
                        LoggerFactory.getLogger(TcpSession.class).info("ClassLoader finalize()");
                    } finally {
                        super.finalize();
                    }
                }

                @Override // java.lang.ClassLoader
                protected Class<?> findClass(String str4) throws ClassNotFoundException {
                    if (str4 == null || !str4.equals(cBegin.javaName().getName())) {
                        return super.findClass(str4);
                    }
                    byte[] byteCode = cBegin.toByteCode();
                    LoggerFactory.getLogger(TcpSession.class).info("ClassLoader defineClass {}", str4);
                    return defineClass(str4, byteCode, 0, byteCode.length);
                }
            };
        }).method();
    }

    protected void process(Compile compile, TcpHeader tcpHeader) {
        int incrementAndGet;
        Optional<Integer> sid = tcpHeader.getSid();
        log.info("compile request, sid={}", sid.map((v0) -> {
            return Objects.toString(v0);
        }).orElse("?"));
        try {
            LambdaDump dump = compile.getDump();
            if (dump == null) {
                throw new IllegalArgumentException("compile.getMethodDef() == null");
            }
            byte[] bytes = Serializer.toBytes(dump);
            String encodeHex = Text.encodeHex(Hash.md5(bytes, 0, bytes.length));
            log.debug("mdef hash {}", encodeHex);
            CompileResult compileResult = new CompileResult();
            if (this.methodDefHash2compileKey.containsKey(encodeHex)) {
                incrementAndGet = this.methodDefHash2compileKey.get(encodeHex).intValue();
                if (this.compiled.get(Integer.valueOf(incrementAndGet)) == null) {
                    log.warn("compiled method not found for key={} hash={}", Integer.valueOf(incrementAndGet), encodeHex);
                }
            } else {
                Method compile2 = compile(dump, encodeHex);
                incrementAndGet = this.compileId.incrementAndGet();
                log.info("compiled {} hash {} id {}", new Object[]{compile2, encodeHex, Integer.valueOf(this.sid)});
                this.compiled.put(Integer.valueOf(incrementAndGet), compile2);
            }
            compileResult.setKey(Integer.valueOf(incrementAndGet));
            compileResult.setHash(encodeHex);
            try {
                if (sid.isPresent()) {
                    this.proto.send(compileResult, TcpHeader.referrer.create(sid.get()));
                } else {
                    this.proto.send(compileResult, new HeaderValue[0]);
                }
            } catch (IOException e) {
                log.error("fail send response", e);
            }
        } catch (Throwable th) {
            log.error("compile fail {}", th.getMessage(), th);
            try {
                ErrMessage error = new ErrMessage().error(th);
                if (sid.isPresent()) {
                    this.proto.send(error, TcpHeader.referrer.create(sid.get()));
                } else {
                    this.proto.send(error, new HeaderValue[0]);
                }
            } catch (IOException e2) {
                log.error("fail send response", e2);
            }
        }
    }

    protected void process(Execute execute, TcpHeader tcpHeader) {
        Optional<Integer> sid = tcpHeader.getSid();
        log.info("execute request, sid={}", sid.map((v0) -> {
            return Objects.toString(v0);
        }).orElse("?"));
        try {
            if (execute.getKey() == null && execute.getHash() == null) {
                throw new IllegalArgumentException("execute method key and hash is null");
            }
            Method method = this.compiled.get(execute.getKey() != null ? execute.getKey() : this.methodDefHash2compileKey.get(execute.getHash()));
            if (method == null) {
                throw new IllegalArgumentException("execute method not found " + execute);
            }
            ArrayList arrayList = new ArrayList();
            List<Object> capturedArgs = execute.getCapturedArgs();
            if (capturedArgs != null) {
                log.debug("execute args {}", capturedArgs);
                arrayList.addAll(capturedArgs);
            }
            arrayList.add(this.service);
            Object[] array = arrayList.toArray();
            long currentTimeMillis = System.currentTimeMillis();
            long nanoTime = System.nanoTime();
            log.info("m.invoke args {}", Arrays.toString(array));
            Object invoke = method.invoke(null, array);
            log.info("m.invoke return {}", invoke);
            long nanoTime2 = System.nanoTime();
            long currentTimeMillis2 = System.currentTimeMillis();
            ExecuteResult executeResult = new ExecuteResult();
            executeResult.setValue(invoke);
            executeResult.setStarted(currentTimeMillis);
            executeResult.setStartedNano(nanoTime);
            executeResult.setFinished(nanoTime2);
            executeResult.setFinishedNano(currentTimeMillis2);
            try {
                if (sid.isPresent()) {
                    log.debug("send execute result, referrer {}", sid.get());
                    this.proto.send(executeResult, TcpHeader.referrer.create(sid.get()));
                } else {
                    log.debug("send execute result");
                    this.proto.send(executeResult, new HeaderValue[0]);
                }
            } catch (IOException e) {
                log.error("fail send response", e);
            }
        } catch (Throwable th) {
            log.error("execute fail {}", th.getMessage(), th);
            try {
                ErrMessage error = new ErrMessage().error(th);
                if (sid.isPresent()) {
                    this.proto.send(error, TcpHeader.referrer.create(sid.get()));
                } else {
                    this.proto.send(error, new HeaderValue[0]);
                }
            } catch (IOException e2) {
                log.error("fail send response", e2);
            }
        }
    }

    protected void unsubscribeAll() {
        synchronized (this.subscribers) {
            this.subscribers.forEach((str, tuple3) -> {
                try {
                    ((AutoCloseable) tuple3.c()).close();
                } catch (Exception e) {
                    log.error("unsubscribe error", e);
                }
            });
        }
    }

    protected void process(Subscribe subscribe, TcpHeader tcpHeader) {
        Optional<Integer> sid = tcpHeader.getSid();
        String publisher = subscribe.getPublisher();
        log.info("subscribe, pubName={}, sid={}", publisher, sid.map((v0) -> {
            return Objects.toString(v0);
        }).orElse("?"));
        if (publisher == null) {
            log.error("pubName is null");
            try {
                ErrMessage message = new ErrMessage().message("pubName is null");
                if (sid.isPresent()) {
                    this.proto.send(message, TcpHeader.referrer.create(sid.get()));
                } else {
                    this.proto.send(message, new HeaderValue[0]);
                }
                this.proto.send(new ErrMessage().message("pubName is null"), new HeaderValue[0]);
                return;
            } catch (IOException e) {
                log.error("fail send response", e);
                return;
            }
        }
        Publisher<T> publisher2 = getServer().publisher(publisher);
        synchronized (this.subscribers) {
            SubscribeResult subscribeResult = new SubscribeResult();
            if (this.subscribers.containsKey(publisher)) {
                subscribeResult.setSubscribeTime(((Long) this.subscribers.get(publisher).b()).longValue());
            } else {
                Publisher.Subscriber<Serializable> subscriber = subscriber(publisher);
                long currentTimeMillis = System.currentTimeMillis();
                subscribeResult.setSubscribeTime(currentTimeMillis);
                this.subscribers.put(publisher, Tuple3.of(subscriber, Long.valueOf(currentTimeMillis), publisher2.listen(subscriber)));
            }
            try {
                if (sid.isPresent()) {
                    this.proto.send(subscribeResult, TcpHeader.referrer.create(sid.get()));
                } else {
                    this.proto.send(subscribeResult, new HeaderValue[0]);
                }
                log.debug("subscribed to {}", publisher);
            } catch (IOException e2) {
                log.error("fail send response", e2);
            }
        }
    }

    protected void process(UnSubscribe unSubscribe, TcpHeader tcpHeader) {
        Optional<Integer> sid = tcpHeader.getSid();
        String publisher = unSubscribe.getPublisher();
        log.info("unsubscribe, pubName={}, sid={}", publisher, sid.map((v0) -> {
            return Objects.toString(v0);
        }).orElse("?"));
        if (publisher == null) {
            log.error("pubName is null");
            try {
                ErrMessage message = new ErrMessage().message("pubName is null");
                if (sid.isPresent()) {
                    this.proto.send(message, TcpHeader.referrer.create(sid.get()));
                } else {
                    this.proto.send(message, new HeaderValue[0]);
                }
                this.proto.send(new ErrMessage().message("pubName is null"), new HeaderValue[0]);
                return;
            } catch (IOException e) {
                log.error("fail send response", e);
                return;
            }
        }
        Publisher<T> publisher2 = getServer().publisher(publisher);
        synchronized (this.subscribers) {
            UnSubscribeResult unSubscribeResult = new UnSubscribeResult();
            if (this.subscribers.containsKey(publisher)) {
                Tuple3<Publisher.Subscriber<Serializable>, Long, AutoCloseable> tuple3 = this.subscribers.get(publisher);
                unSubscribeResult.setSubscribeTime(((Long) tuple3.b()).longValue());
                try {
                    ((AutoCloseable) tuple3.c()).close();
                } catch (Exception e2) {
                    log.error("unsubscribe close error", e2);
                    publisher2.removeListener((Publisher.Subscriber) tuple3.a());
                }
                this.subscribers.remove(publisher);
            } else {
                unSubscribeResult.setSubscribeTime(-1L);
            }
            try {
                if (sid.isPresent()) {
                    this.proto.send(unSubscribeResult, TcpHeader.referrer.create(sid.get()));
                } else {
                    this.proto.send(unSubscribeResult, new HeaderValue[0]);
                }
                log.debug("subscribed to {}", publisher);
            } catch (IOException e3) {
                log.error("fail send response", e3);
            }
        }
    }

    protected Publisher.Subscriber<Serializable> subscriber(final String str) {
        return new Publisher.Subscriber<Serializable>() { // from class: xyz.cofe.trambda.tcp.TcpSession.2
            @Override // xyz.cofe.trambda.tcp.Publisher.Subscriber
            public void notification(Serializable serializable) {
                TcpSession.log.info("send notification");
                ServerEvent serverEvent = new ServerEvent();
                serverEvent.setEvent(serializable);
                serverEvent.setPublisher(str);
                try {
                    TcpSession.this.proto.send(serverEvent, new HeaderValue[0]);
                } catch (IOException e) {
                    TcpSession.log.error("fail send response", e);
                }
            }
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2024553767:
                if (implMethodName.equals("lambda$compile$ce772141$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("xyz/cofe/fn/Fn1") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("xyz/cofe/trambda/tcp/TcpSession") && serializedLambda.getImplMethodSignature().equals("(Lxyz/cofe/trambda/bc/cls/CBegin;)Ljava/lang/ClassLoader;")) {
                    TcpSession tcpSession = (TcpSession) serializedLambda.getCapturedArg(0);
                    return cBegin -> {
                        return new ClassLoader() { // from class: xyz.cofe.trambda.tcp.TcpSession.1
                            protected void finalize() throws Throwable {
                                try {
                                    LoggerFactory.getLogger(TcpSession.class).info("ClassLoader finalize()");
                                } finally {
                                    super.finalize();
                                }
                            }

                            @Override // java.lang.ClassLoader
                            protected Class<?> findClass(String str4) throws ClassNotFoundException {
                                if (str4 == null || !str4.equals(cBegin.javaName().getName())) {
                                    return super.findClass(str4);
                                }
                                byte[] byteCode = cBegin.toByteCode();
                                LoggerFactory.getLogger(TcpSession.class).info("ClassLoader defineClass {}", str4);
                                return defineClass(str4, byteCode, 0, byteCode.length);
                            }
                        };
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
