package cn.tom.rpc.broker;

import cn.tom.rpc.MessageQueue;
import cn.tom.rpc.Protocol;
import cn.tom.rpc.RpcMessage;
import cn.tom.rpc.RpcMessageAdaptor;
import cn.tom.transport.Id;
import cn.tom.transport.Messager;
import cn.tom.transport.Session;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;

/* loaded from: input_file:cn/tom/rpc/broker/BrokerAdaptor.class */
public class BrokerAdaptor extends RpcMessageAdaptor {
    private final Map<String, MessageQueue> mqTable = new ConcurrentSkipListMap(String.CASE_INSENSITIVE_ORDER);
    private Messager.MessageHandler<RpcMessage> produceHandler = new Messager.MessageHandler<RpcMessage>() { // from class: cn.tom.rpc.broker.BrokerAdaptor.1
        public void handleMessage(RpcMessage rpcMessage, Session<RpcMessage> session) throws IOException {
            MessageQueue findMQ = BrokerAdaptor.this.findMQ(rpcMessage, session);
            if (findMQ == null) {
                return;
            }
            rpcMessage.setFrom(session.id());
            rpcMessage.serialMethod();
            findMQ.produce(rpcMessage);
        }

        public /* bridge */ /* synthetic */ void handleMessage(Id id, Session session) throws IOException {
            handleMessage((RpcMessage) id, (Session<RpcMessage>) session);
        }
    };
    private Messager.MessageHandler<RpcMessage> consumeHandler = new Messager.MessageHandler<RpcMessage>() { // from class: cn.tom.rpc.broker.BrokerAdaptor.2
        public void handleMessage(RpcMessage rpcMessage, Session<RpcMessage> session) throws IOException {
            MessageQueue findMQ = BrokerAdaptor.this.findMQ(rpcMessage, session);
            if (findMQ == null) {
                return;
            }
            Session<RpcMessage> session2 = (Session) BrokerAdaptor.this.getSessions().get(Long.valueOf(rpcMessage.getFrom()));
            if (session2 == null || !session2.isActive()) {
                return;
            }
            findMQ.consume(rpcMessage, session2);
        }

        public /* bridge */ /* synthetic */ void handleMessage(Id id, Session session) throws IOException {
            handleMessage((RpcMessage) id, (Session<RpcMessage>) session);
        }
    };
    private Messager.MessageHandler<RpcMessage> declareHandler = new Messager.MessageHandler<RpcMessage>() { // from class: cn.tom.rpc.broker.BrokerAdaptor.3
        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v18 */
        /* JADX WARN: Type inference failed for: r0v7, types: [java.util.Map] */
        /* JADX WARN: Type inference failed for: r0v8, types: [java.lang.Throwable] */
        public void handleMessage(RpcMessage rpcMessage, Session<RpcMessage> session) throws IOException {
            String trim = rpcMessage.getTopic().trim();
            ?? r0 = BrokerAdaptor.this.mqTable;
            synchronized (r0) {
                session.setAttr(Protocol.TOPIC, trim);
                MessageQueue messageQueue = (MessageQueue) BrokerAdaptor.this.mqTable.get(trim);
                if (messageQueue == null) {
                    RpcDirectMessageQueue rpcDirectMessageQueue = new RpcDirectMessageQueue(trim);
                    rpcDirectMessageQueue.setPushSession(session);
                    BrokerAdaptor.this.mqTable.put(trim, rpcDirectMessageQueue);
                    BrokerAdaptor.log.info("MQ Created: {}", rpcDirectMessageQueue);
                } else {
                    messageQueue.setPushSession(session);
                    BrokerAdaptor.log.info("MQ Created: {}", messageQueue);
                }
                r0 = r0;
                rpcMessage.getHeader().setAsk((byte) 1);
                System.out.println("msg===" + rpcMessage);
                session.write(rpcMessage);
            }
        }

        public /* bridge */ /* synthetic */ void handleMessage(Id id, Session session) throws IOException {
            handleMessage((RpcMessage) id, (Session<RpcMessage>) session);
        }
    };

    public BrokerAdaptor() {
        registerHandler(Protocol.PRODUCE, this.produceHandler);
        registerHandler(Protocol.CONSUME, this.consumeHandler);
        registerHandler(Protocol.DECLARE, this.declareHandler);
    }

    @Override // cn.tom.rpc.RpcMessageAdaptor
    public void onMessage(RpcMessage rpcMessage, Session<RpcMessage> session) throws IOException {
        Messager.MessageHandler<RpcMessage> messageHandler;
        rpcMessage.getHeader().getAsk();
        String findHandlerKey = findHandlerKey(rpcMessage);
        if (findHandlerKey == null) {
            rpcMessage.getHeader().setAsk((byte) 1);
            rpcMessage.setFieldSize(0);
            session.write(rpcMessage);
        } else {
            if (!session.isServer() || (messageHandler = this.handlerMap.get(findHandlerKey)) == null) {
                return;
            }
            messageHandler.handleMessage(rpcMessage, session);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MessageQueue findMQ(RpcMessage rpcMessage, Session<RpcMessage> session) {
        return this.mqTable.get(rpcMessage.getTopic());
    }

    @Override // cn.tom.rpc.RpcMessageAdaptor
    public void heartbeat(Session<RpcMessage> session) {
    }

    public void onSessionDestroyed(Session<RpcMessage> session) throws IOException {
        MessageQueue messageQueue;
        super.onSessionDestroyed(session);
        String str = (String) session.getAttr(Protocol.TOPIC);
        if (str == null || (messageQueue = this.mqTable.get(str)) == null) {
            return;
        }
        messageQueue.cleanSession(session);
    }

    @Override // cn.tom.rpc.RpcMessageAdaptor
    public String findHandlerKey(RpcMessage rpcMessage) {
        rpcMessage.decialMethod();
        return rpcMessage.getCmd();
    }
}
