package divconq.bus;

import divconq.bus.net.Session;
import divconq.bus.net.SocketInfo;
import divconq.bus.net.StreamMessage;
import divconq.bus.net.StreamSession;
import divconq.hub.Hub;
import divconq.hub.HubEvents;
import divconq.lang.op.FuncCallback;
import divconq.lang.op.OperationContext;
import divconq.lang.op.OperationObserver;
import divconq.lang.op.OperationResult;
import divconq.lang.op.UserContext;
import divconq.log.Logger;
import divconq.session.DataStreamChannel;
import divconq.struct.FieldStruct;
import divconq.struct.ListStruct;
import divconq.struct.RecordStruct;
import divconq.struct.Struct;
import divconq.util.StringUtil;
import divconq.work.Task;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.ReentrantLock;

/* loaded from: input_file:divconq/bus/HubRouter.class */
public class HubRouter {
    protected String hubid;
    protected String squad;
    protected CopyOnWriteArraySet<String> services;
    protected boolean local;
    protected boolean gateway;
    protected ConcurrentHashMap<String, IService> registered;
    protected ReplyService localReplies;
    protected boolean usekeepalive;
    protected List<Session> sessions;
    protected List<HubRouter> tunnels;
    protected ReentrantLock sessionlock;
    protected int next;
    protected List<StreamSession> streamsessions;
    protected int streamnext;
    protected HashMap<String, HubRouter> proxied;
    protected HashMap<String, StreamPath> streampaths;

    /* loaded from: input_file:divconq/bus/HubRouter$StreamPath.class */
    public class StreamPath {
        protected StreamSession direct = null;
        protected HubRouter tunneled = null;
        protected long touched = System.currentTimeMillis();

        public StreamPath() {
        }
    }

    public String getHubId() {
        return this.hubid;
    }

    public String getSquadId() {
        return this.squad;
    }

    public Collection<String> getServices() {
        return this.services;
    }

    public Collection<HubRouter> getProxiedHubs() {
        return this.proxied.values();
    }

    public boolean isLocal() {
        return this.local;
    }

    public boolean isDirect() {
        return this.sessions.size() > 0 && this.streamsessions.size() > 0;
    }

    public boolean isTunneled() {
        return this.tunnels.size() > 0;
    }

    public boolean isActive() {
        return this.local || this.sessions.size() > 0 || this.tunnels.size() > 0;
    }

    public void setUseKeepAlive(boolean z) {
        this.usekeepalive = z;
    }

    public HubRouter(String str, boolean z) {
        this.hubid = null;
        this.squad = null;
        this.services = new CopyOnWriteArraySet<>();
        this.local = false;
        this.gateway = false;
        this.registered = new ConcurrentHashMap<>();
        this.localReplies = null;
        this.usekeepalive = true;
        this.sessions = new CopyOnWriteArrayList();
        this.tunnels = new CopyOnWriteArrayList();
        this.sessionlock = new ReentrantLock();
        this.next = 0;
        this.streamsessions = new CopyOnWriteArrayList();
        this.streamnext = 0;
        this.proxied = new HashMap<>();
        this.streampaths = new HashMap<>();
        this.hubid = str;
        this.gateway = z;
    }

    public HubRouter() {
        this.hubid = null;
        this.squad = null;
        this.services = new CopyOnWriteArraySet<>();
        this.local = false;
        this.gateway = false;
        this.registered = new ConcurrentHashMap<>();
        this.localReplies = null;
        this.usekeepalive = true;
        this.sessions = new CopyOnWriteArrayList();
        this.tunnels = new CopyOnWriteArrayList();
        this.sessionlock = new ReentrantLock();
        this.next = 0;
        this.streamsessions = new CopyOnWriteArrayList();
        this.streamnext = 0;
        this.proxied = new HashMap<>();
        this.streampaths = new HashMap<>();
        this.local = true;
        this.hubid = OperationContext.getHubId();
        this.squad = Hub.instance.getResources().getSquadId();
        this.localReplies = new ReplyService();
        registerService(this.localReplies);
    }

    public void registerService(IService iService) {
        this.registered.put(iService.serviceName(), iService);
        this.services.add(iService.serviceName());
        Hub.instance.getBus().indexServices(this);
    }

    public void removeService(String str) {
        this.registered.remove(str);
        this.services.remove(str);
        Hub.instance.getBus().indexServices(this);
    }

    public Message buildHello(String str) {
        Message message = new Message();
        message.setField("Kind", "HELLO");
        message.setField("Id", OperationContext.getHubId());
        message.setField("Squad", this.squad);
        if (!Hub.instance.isIdled()) {
            message.setField("Services", new ListStruct(this.services));
            if (Hub.instance.getBus().isProxyMode()) {
                HashMap hashMap = new HashMap();
                for (HubRouter hubRouter : Hub.instance.getBus().getHubs()) {
                    if (!hubRouter.isLocal() && hubRouter.isActive() && !hubRouter.getHubId().equals(str)) {
                        hashMap.put(hubRouter.getHubId(), hubRouter);
                        for (HubRouter hubRouter2 : hubRouter.getProxiedHubs()) {
                            if (hubRouter2.isActive() && !hubRouter2.getHubId().equals(str)) {
                                hashMap.put(hubRouter2.getHubId(), hubRouter2);
                            }
                        }
                    }
                }
                ListStruct listStruct = new ListStruct(new Object[0]);
                for (HubRouter hubRouter3 : hashMap.values()) {
                    listStruct.addItem(new RecordStruct(new FieldStruct("Id", hubRouter3.getHubId()), new FieldStruct("Squad", hubRouter3.getSquadId()), new FieldStruct("Services", new ListStruct(hubRouter3.getServices()))));
                }
                message.setField("ProxiedServices", listStruct);
            }
        }
        return message;
    }

    public StreamMessage buildStreamHello(String str) {
        StreamMessage streamMessage = new StreamMessage("HELLO");
        streamMessage.setField("Id", OperationContext.getHubId());
        streamMessage.setField("Squad", this.squad);
        return streamMessage;
    }

    public String registerForReply(Message message, ServiceResult serviceResult) {
        return this.localReplies.registerForReply(message, serviceResult);
    }

    public ReplyService getReplyService() {
        return this.localReplies;
    }

    public OperationResult deliverMessage(final Message message) {
        OperationResult operationResult = new OperationResult();
        if (message == null) {
            operationResult.error(1L, "Message missing.", new String[0]);
            return operationResult;
        }
        if (!this.local) {
            if (this.gateway) {
            }
            Session nextDirectRoute = nextDirectRoute();
            if (nextDirectRoute != null) {
                if (!nextDirectRoute.write(message)) {
                    operationResult.error(1L, "Unable to route message to remote hub: " + message, new String[0]);
                }
                return operationResult;
            }
            HubRouter nextTunnelRoute = nextTunnelRoute();
            if (nextTunnelRoute != null) {
                return nextTunnelRoute.deliverMessage(message);
            }
            if (!"HELLO".equals(message.getFieldAsString("Kind"))) {
                operationResult.error(1L, "Unable to route message to proxied hub: " + message, new String[0]);
            }
            return operationResult;
        }
        message.removeField("Kind");
        String fieldAsString = message.getFieldAsString("Service");
        if (fieldAsString == null) {
            operationResult.error(1L, "Message missing service.", new String[0]);
            return operationResult;
        }
        IService iService = this.registered.get(fieldAsString);
        if (iService == null) {
            operationResult.error(1L, "Service not on this hub.", new String[0]);
            return operationResult;
        }
        OperationContext allocate = OperationContext.allocate(message);
        if (allocate == null || allocate.getUserContext() == null) {
            operationResult.errorTr(442L, new Object[0]);
            return operationResult;
        }
        Hub.instance.getWorkPool().submit(new Task().withTitle("Hub Router: " + fieldAsString).withContext(allocate).withParams(message).withBucket("Bus").withWork(taskRun -> {
            FuncCallback<UserContext> funcCallback = new FuncCallback<UserContext>() { // from class: divconq.bus.HubRouter.1
                @Override // divconq.lang.op.OperationCallback
                public void callback() {
                    UserContext result = getResult();
                    if (result != null) {
                        OperationContext.use(result, allocate.toBuilder());
                    }
                    if (taskRun.hasErrors()) {
                        taskRun.complete();
                    } else {
                        if (Hub.instance.getSchema().validateRequest(message).hasErrors()) {
                            taskRun.complete();
                            return;
                        }
                        if (!OperationContext.get().isElevated()) {
                            OperationContext.use(OperationContext.get().toBuilder().withElevated(true));
                        }
                        iService.handle(taskRun);
                    }
                }
            };
            if (!message.isVerifyRequest()) {
                allocate.verify(funcCallback);
            } else {
                funcCallback.setResult(allocate.getUserContext());
                funcCallback.complete();
            }
        }), new OperationObserver() { // from class: divconq.bus.HubRouter.2
            @Override // divconq.lang.op.OperationObserver
            public void completed(OperationContext operationContext) {
                OperationContext.set(operationContext);
                Hub.instance.getBus().sendReply(operationContext.getTaskRun().toLogMessage(), message);
            }
        });
        return operationResult;
    }

    public void receiveMessage(Session session, Message message) {
        if ("HELLO".equals(message.getFieldAsString("Kind"))) {
            this.squad = message.getFieldAsString("Squad");
            Collection<HubRouter> arrayList = new ArrayList<>(this.proxied.values());
            this.proxied.clear();
            this.services.clear();
            ListStruct fieldAsList = message.getFieldAsList("ProxiedServices");
            if (fieldAsList != null) {
                Iterator<Struct> it = fieldAsList.getItems().iterator();
                while (it.hasNext()) {
                    RecordStruct recordStruct = (RecordStruct) it.next();
                    HubRouter allocateOrGetHub = Hub.instance.getBus().allocateOrGetHub(recordStruct.getFieldAsString("Id"), session.getSocketInfo().isGateway());
                    allocateOrGetHub.addTunnel(recordStruct, this);
                    arrayList.remove(allocateOrGetHub);
                    this.proxied.put(allocateOrGetHub.getHubId(), allocateOrGetHub);
                }
            }
            clearMyTunnels(arrayList);
            ListStruct fieldAsList2 = message.getFieldAsList("Services");
            if (fieldAsList2 != null) {
                this.services.addAll(fieldAsList2.toStringList());
                Hub.instance.getCountManager().allocateSetNumberCounter("dcBus_" + getHubId() + "_Sessions", this.sessions.size());
            }
            Hub.instance.getBus().indexServices(this);
            return;
        }
        String fieldAsString = message.getFieldAsString("Service");
        String fieldAsString2 = message.getFieldAsString("Feature");
        boolean z = "Replies".equals(fieldAsString) || ("Session".equals(fieldAsString) && "Reply".equals(fieldAsString2));
        if (this.gateway && !message.isVerifyRequest() && !z) {
            boolean z2 = true;
            RecordStruct fieldAsRecord = message.getFieldAsRecord("Context");
            if (fieldAsRecord == null) {
                System.out.println("dcBus " + getHubId() + " tried to call without context, got: " + message);
                return;
            }
            String fieldAsString3 = fieldAsRecord.getFieldAsString("UserId");
            if (StringUtil.isEmpty(fieldAsString3)) {
                System.out.println("dcBus " + getHubId() + " tried to call without userid, got: " + message);
                return;
            }
            if (!"00000_000000000000002".equals(fieldAsString3)) {
                z2 = false;
            } else if (fieldAsRecord.isFieldEmpty("AuthToken") && fieldAsRecord.isFieldEmpty("Credentials")) {
                ListStruct fieldAsList3 = fieldAsRecord.getFieldAsList("AuthTags");
                if (fieldAsList3 == null || fieldAsList3.getSize() != 1) {
                    z2 = false;
                } else if (!"Guest".equals(fieldAsList3.getItemAsString(0))) {
                    z2 = false;
                }
            } else {
                z2 = false;
            }
            if (!z2 && !Hub.instance.getSchema().getService().getOp(fieldAsString, fieldAsString2, message.getFieldAsString("Op")).isTagged("Gateway")) {
                String fieldAsString4 = fieldAsRecord.getFieldAsString("SessionId");
                if (StringUtil.isEmpty(fieldAsString4)) {
                    System.out.println("dcBus " + getHubId() + " tried to call as user without session, got: " + message);
                    return;
                }
                String fieldAsString5 = fieldAsRecord.getFieldAsString("AuthToken");
                if (StringUtil.isEmpty(fieldAsString5)) {
                    System.out.println("dcBus " + getHubId() + " tried to call as user without authtoken, got: " + message);
                    return;
                }
                String hubId = session.getSocketInfo().getHubId();
                if (StringUtil.isNotEmpty(hubId) && !fieldAsString4.startsWith(hubId)) {
                    System.out.println("dcBus " + getHubId() + " tried to call with session " + fieldAsString4 + ", got: " + message);
                    return;
                }
                divconq.session.Session lookup = Hub.instance.getSessions().lookup(fieldAsString4);
                if (lookup == null) {
                    System.out.println("dcBus " + getHubId() + " tried to call with missing session " + fieldAsString4 + ", got: " + message);
                    return;
                } else if (!fieldAsString5.equals(lookup.getUser().getAuthToken())) {
                    System.out.println("dcBus " + getHubId() + " tried to call with bad token " + fieldAsString5 + ", got: " + message);
                    return;
                } else {
                    if (!fieldAsString3.equals(lookup.getUser().getUserId())) {
                        System.out.println("dcBus " + getHubId() + " tried to call with user id for session " + fieldAsString3 + ", got: " + message);
                        return;
                    }
                    lookup.getUser().freeze(fieldAsRecord);
                }
            }
        } else if (!z) {
            System.out.println("&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&");
            System.out.println("Server request passed checks z: " + message);
            System.out.println("&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&");
        }
        ServiceRouter serviceRouter = Hub.instance.getBus().getServiceRouter(message.getFieldAsString("Service"));
        if (serviceRouter != null && serviceRouter.sendMessage(message).hasErrors()) {
        }
    }

    public void receiveMessage(StreamSession streamSession, StreamMessage streamMessage) {
        if ("HELLO".equals(streamMessage.getFieldAsString("Op"))) {
            streamMessage.release();
            this.squad = streamMessage.getFieldAsString("Squad");
            return;
        }
        OperationResult deliverMessage = Hub.instance.getBus().allocateOrGetHub(streamMessage.getFieldAsString("ToHub"), streamSession.getSocketInfo().isGateway()).deliverMessage(streamMessage);
        if (deliverMessage.hasErrors()) {
            Hub.instance.getBus().sendReply(MessageUtil.streamMessages(deliverMessage), streamMessage);
        }
    }

    public void addSession(Session session) {
        this.sessionlock.lock();
        this.sessions.size();
        try {
            this.sessions.add(session);
            int size = this.sessions.size();
            Hub.instance.getCountManager().allocateSetNumberCounter("dcBus_" + getHubId() + "_Sessions", size);
            if (size == 1 && isDirect()) {
                Hub.instance.getWorkPool().submit(taskRun -> {
                    Hub.instance.fireEvent(100, null);
                    taskRun.complete();
                });
            }
            Logger.info("Connect on dcBus, " + getHubId() + " sessions available: " + size, new String[0]);
        } finally {
            this.sessionlock.unlock();
        }
    }

    public void remove(SocketInfo socketInfo) {
        Iterator<Session> it = this.sessions.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Session next = it.next();
            if (next.getSocketInfo() == socketInfo) {
                removeSession(next);
                break;
            }
        }
        for (StreamSession streamSession : this.streamsessions) {
            if (streamSession.getSocketInfo() == socketInfo) {
                removeSession(streamSession);
                return;
            }
        }
    }

    public void removeSession(Session session) {
        this.sessionlock.lock();
        boolean isDirect = isDirect();
        int size = this.sessions.size();
        try {
            this.sessions.remove(session);
            int size2 = this.sessions.size();
            Hub.instance.getCountManager().allocateSetNumberCounter("dcBus_" + getHubId() + "_Sessions", size2);
            session.close();
            this.sessionlock.unlock();
            if (size2 != size) {
                Logger.info("Disconnect on dcBus, " + getHubId() + " sessions available: " + size2, new String[0]);
            }
            if (size2 == 0) {
                if (isDirect) {
                    Hub.instance.getWorkPool().submit(taskRun -> {
                        Hub.instance.fireEvent(Integer.valueOf(HubEvents.BusDisconnected), null);
                        taskRun.complete();
                    });
                }
                clearMyTunnels(this.proxied.values());
                Hub.instance.getBus().indexServices(this);
            }
        } catch (Throwable th) {
            this.sessionlock.unlock();
            throw th;
        }
    }

    public void addSession(StreamSession streamSession) {
        this.sessionlock.lock();
        this.streamsessions.size();
        try {
            this.streamsessions.add(streamSession);
            int size = this.streamsessions.size();
            Logger.info("Connect on dcBus, " + getHubId() + " stream added: " + size, new String[0]);
            Hub.instance.getCountManager().allocateSetNumberCounter("dcBus_" + getHubId() + "_SteramSessions", size);
            if (size == 1 && isDirect()) {
                Hub.instance.getWorkPool().submit(taskRun -> {
                    Hub.instance.fireEvent(100, null);
                    taskRun.complete();
                });
            }
            Logger.info("Connect on dcBus, " + getHubId() + " stream sessions available: " + size, new String[0]);
        } finally {
            this.sessionlock.unlock();
        }
    }

    public void removeSession(StreamSession streamSession) {
        this.sessionlock.lock();
        boolean isDirect = isDirect();
        int size = this.streamsessions.size();
        try {
            this.streamsessions.remove(streamSession);
            int size2 = this.streamsessions.size();
            Hub.instance.getCountManager().allocateSetNumberCounter("dcBus_" + getHubId() + "_StreamSessions", size2);
            streamSession.close();
            this.sessionlock.unlock();
            if (size2 != size) {
                Logger.info("Disconnect on dcBus, " + getHubId() + " stream sessions available: " + size2, new String[0]);
            }
            if (size2 == 0 && isDirect) {
                Hub.instance.getWorkPool().submit(taskRun -> {
                    Hub.instance.fireEvent(Integer.valueOf(HubEvents.BusDisconnected), null);
                    taskRun.complete();
                });
            }
        } catch (Throwable th) {
            this.sessionlock.unlock();
            throw th;
        }
    }

    public void addTunnel(RecordStruct recordStruct, HubRouter hubRouter) {
        this.squad = recordStruct.getFieldAsString("Squad");
        ListStruct fieldAsList = recordStruct.getFieldAsList("Services");
        this.services.clear();
        this.services.addAll(fieldAsList.toStringList());
        this.sessionlock.lock();
        int size = this.tunnels.size();
        try {
            if (!this.tunnels.contains(hubRouter)) {
                this.tunnels.add(hubRouter);
                size = this.tunnels.size();
            }
            Hub.instance.getCountManager().allocateSetNumberCounter("dcBus_" + getHubId() + "_Proxies", size);
            this.sessionlock.unlock();
            if (size != size) {
                Logger.info("Connect on dcBus, " + getHubId() + " proxies available: " + size, new String[0]);
            }
            if (size == 1) {
                Hub.instance.getBus().indexServices(this);
            }
        } catch (Throwable th) {
            this.sessionlock.unlock();
            throw th;
        }
    }

    public void removeTunnel(HubRouter hubRouter) {
        this.sessionlock.lock();
        this.tunnels.size();
        try {
            this.tunnels.remove(hubRouter);
            int size = this.tunnels.size();
            Hub.instance.getCountManager().allocateSetNumberCounter("dcBus_" + getHubId() + "_Proxies", size);
            Logger.info("Disconnect on dcBus, " + getHubId() + " proxies available: " + size, new String[0]);
            if (size == 0) {
                Hub.instance.getBus().indexServices(this);
            }
        } finally {
            this.sessionlock.unlock();
        }
    }

    public void close() {
        Iterator<Session> it = this.sessions.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    public void clearMyTunnels(Collection<HubRouter> collection) {
        Iterator<HubRouter> it = collection.iterator();
        while (it.hasNext()) {
            it.next().removeTunnel(this);
        }
    }

    public Session nextDirectRoute() {
        this.sessionlock.lock();
        try {
            int size = this.sessions.size();
            if (size == 0) {
                return null;
            }
            if (this.next >= size) {
                this.next = 0;
            }
            Session session = this.sessions.get(this.next);
            this.next++;
            this.sessionlock.unlock();
            return session;
        } finally {
            this.sessionlock.unlock();
        }
    }

    public HubRouter nextTunnelRoute() {
        this.sessionlock.lock();
        try {
            int size = this.tunnels.size();
            if (size == 0) {
                return null;
            }
            if (this.next >= size) {
                this.next = 0;
            }
            HubRouter hubRouter = this.tunnels.get(this.next);
            this.next++;
            this.sessionlock.unlock();
            return hubRouter;
        } finally {
            this.sessionlock.unlock();
        }
    }

    public Collection<Session> getSessions() {
        return this.sessions;
    }

    public int getCountSessions(SocketInfo socketInfo) {
        int i = 0;
        Iterator<Session> it = this.sessions.iterator();
        while (it.hasNext()) {
            if (it.next().getSocketInfo() == socketInfo) {
                i++;
            }
        }
        return i;
    }

    public int getCountStreamSessions(SocketInfo socketInfo) {
        int i = 0;
        Iterator<StreamSession> it = this.streamsessions.iterator();
        while (it.hasNext()) {
            if (it.next().getSocketInfo() == socketInfo) {
                i++;
            }
        }
        return i;
    }

    public OperationResult deliverMessage(StreamMessage streamMessage) {
        OperationResult operationResult = new OperationResult();
        if (streamMessage == null) {
            operationResult.error(1L, "Message missing.", new String[0]);
            return operationResult;
        }
        String fieldAsString = streamMessage.getFieldAsString("ToSession");
        String fieldAsString2 = streamMessage.getFieldAsString("ToChannel");
        if (this.local) {
            divconq.session.Session lookup = Hub.instance.getSessions().lookup(fieldAsString);
            if (lookup == null) {
                operationResult.error(1L, "Unable to find session: " + fieldAsString, new String[0]);
                streamMessage.release();
                return operationResult;
            }
            DataStreamChannel channel = lookup.getChannel(fieldAsString2);
            if (channel != null) {
                channel.deliverMessage(streamMessage);
                return operationResult;
            }
            operationResult.error(1L, "Unable to find channel: " + fieldAsString2, new String[0]);
            streamMessage.release();
            return operationResult;
        }
        String str = (StringUtil.isEmpty(fieldAsString) || StringUtil.isEmpty(fieldAsString2)) ? null : fieldAsString + "_" + fieldAsString2;
        if (StringUtil.isNotEmpty(str)) {
            this.sessionlock.lock();
            try {
                StreamPath streamPath = this.streampaths.get(str);
                if (streamPath != null) {
                    streamPath.touched = System.currentTimeMillis();
                }
                if (streamPath != null) {
                    if (streamPath.direct != null) {
                        if (!streamPath.direct.write(streamMessage)) {
                            operationResult.error(1L, "Unable to route message to remote hub: " + streamMessage, new String[0]);
                        }
                        return operationResult;
                    }
                    if (streamPath.tunneled != null) {
                        streamPath.tunneled.deliverMessage(streamMessage);
                        return operationResult;
                    }
                }
            } finally {
            }
        }
        StreamSession nextDirectStreamRoute = nextDirectStreamRoute();
        if (nextDirectStreamRoute != null) {
            if (!nextDirectStreamRoute.write(streamMessage)) {
                operationResult.error(1L, "Unable to route message to remote hub: " + streamMessage, new String[0]);
            } else if (StringUtil.isNotEmpty(str)) {
                StreamPath streamPath2 = new StreamPath();
                streamPath2.direct = nextDirectStreamRoute;
                this.sessionlock.lock();
                try {
                    this.streampaths.put(str, streamPath2);
                    this.sessionlock.unlock();
                } finally {
                    this.sessionlock.unlock();
                }
            }
            return operationResult;
        }
        HubRouter nextTunnelRoute = nextTunnelRoute();
        if (nextTunnelRoute == null) {
            if (!"HELLO".equals(streamMessage.getFieldAsString("Kind"))) {
                operationResult.error(1L, "Unable to route message to proxied hub: " + streamMessage, new String[0]);
            }
            return operationResult;
        }
        if (StringUtil.isNotEmpty(str)) {
            StreamPath streamPath3 = new StreamPath();
            streamPath3.tunneled = nextTunnelRoute;
            this.sessionlock.lock();
            try {
                this.streampaths.put(str, streamPath3);
                this.sessionlock.unlock();
            } finally {
                this.sessionlock.unlock();
            }
        }
        return nextTunnelRoute.deliverMessage(streamMessage);
    }

    public StreamSession nextDirectStreamRoute() {
        this.sessionlock.lock();
        try {
            int size = this.streamsessions.size();
            if (size == 0) {
                System.out.println("Missing stream routes to " + this.hubid);
                this.sessionlock.unlock();
                return null;
            }
            if (this.streamnext >= size) {
                this.streamnext = 0;
            }
            StreamSession streamSession = this.streamsessions.get(this.streamnext);
            this.next++;
            this.sessionlock.unlock();
            return streamSession;
        } catch (Throwable th) {
            this.sessionlock.unlock();
            throw th;
        }
    }

    public void keepAlive() {
        if (this.usekeepalive) {
            Iterator<Session> it = this.sessions.iterator();
            while (it.hasNext()) {
                it.next().keepAlive();
            }
            Iterator<StreamSession> it2 = this.streamsessions.iterator();
            while (it2.hasNext()) {
                it2.next().keepAlive();
            }
        }
    }

    public void cleanup() {
        long currentTimeMillis = System.currentTimeMillis() - 60000;
        ArrayList arrayList = new ArrayList();
        this.sessionlock.lock();
        try {
            for (StreamPath streamPath : this.streampaths.values()) {
                if (streamPath.touched < currentTimeMillis) {
                    arrayList.add(streamPath);
                }
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                this.streampaths.remove((StreamPath) it.next());
            }
        } finally {
            this.sessionlock.unlock();
        }
    }
}
