package cn.jmicro.api.pubsub;

import cn.jmicro.api.JMicroContext;
import cn.jmicro.api.annotation.Cfg;
import cn.jmicro.api.annotation.Component;
import cn.jmicro.api.annotation.Inject;
import cn.jmicro.api.annotation.Reference;
import cn.jmicro.api.async.AsyncFailResult;
import cn.jmicro.api.client.IAsyncCallback;
import cn.jmicro.api.config.Config;
import cn.jmicro.api.executor.ExecutorConfig;
import cn.jmicro.api.executor.ExecutorFactory;
import cn.jmicro.api.idgenerator.ComponentIdServer;
import cn.jmicro.api.internal.pubsub.genclient.IInternalSubRpc$JMAsyncClient;
import cn.jmicro.api.monitor.LG;
import cn.jmicro.api.net.Message;
import cn.jmicro.api.objectfactory.ProxyObject;
import cn.jmicro.api.persist.IObjectStorage;
import cn.jmicro.api.profile.ProfileManager;
import cn.jmicro.api.raft.IDataOperator;
import cn.jmicro.api.registry.AsyncConfig;
import cn.jmicro.api.security.ActInfo;
import cn.jmicro.api.service.ServiceInvokeManager;
import cn.jmicro.api.utils.TimeUtils;
import cn.jmicro.common.util.JsonUtils;
import cn.jmicro.common.util.StringUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component("pubSubManager")
/* loaded from: input_file:cn/jmicro/api/pubsub/PubSubManager.class */
public class PubSubManager {
    public static final String PROFILE_PUBSUB = "pubsub";
    public static final String TABLE_PUBSUB_ITEMS = "t_pubsub_items";
    public static final int PUB_OK = 0;
    public static final int PUB_SERVER_NOT_AVAILABALE = -1;
    public static final int PUB_SERVER_DISCARD = -2;
    public static final int PUB_SERVER_BUSUY = -3;
    public static final int PUB_TOPIC_INVALID = -4;
    private static final Logger logger = LoggerFactory.getLogger(PubSubManager.class);

    @Inject
    private ComponentIdServer idGenerator;

    @Inject
    private ServiceInvokeManager siManager;

    @Reference(namespace = "*", version = "0.0.1", required = false)
    private IInternalSubRpc$JMAsyncClient defaultServer;

    @Inject
    private IDataOperator dataOp;

    @Inject
    private ExecutorFactory ef;

    @Inject
    private ProfileManager pm;

    @Inject(required = false)
    private IObjectStorage objStorage;
    private ExecutorService executor = null;

    @Cfg(value = "/PubSubManager/enable", defGlobal = false)
    private boolean enable = true;

    @Cfg(value = "/PubSubManager/openDebug", defGlobal = false)
    private boolean openDebug = true;

    @Cfg(value = "/PubSubManager/maxPsItem", defGlobal = false)
    private int maxPsItem = 10000;

    @Cfg(value = "/PubSubManager/maxSentItems", defGlobal = false)
    private int maxSentItems = 50;
    private Map<String, List<PSData>> topicSubmitItems = new HashMap();
    private Map<String, Long> topicLastSubmitTime = new HashMap();
    private Object locker = new Object();
    private AtomicLong curItemCount = new AtomicLong(0);
    private Object runLocker = new Object();
    private Boolean isRunning = false;

    /* loaded from: input_file:cn/jmicro/api/pubsub/PubSubManager$AsyncCallback.class */
    private class AsyncCallback implements IAsyncCallback<Integer> {
        private List<PSData> list;

        private AsyncCallback(List<PSData> list) {
            this.list = list;
        }

        public void onResult(Integer num, AsyncFailResult asyncFailResult, Object obj) {
            PubSubManager.this.curItemCount.addAndGet(-this.list.size());
            if (-3 != num.intValue()) {
                if (-1 == num.intValue() || -2 == num.intValue() || -4 == num.intValue()) {
                    for (PSData pSData : this.list) {
                        if (PubSubManager.this.objStorage != null) {
                            PubSubManager.this.objStorage.updateOrSaveById(PubSubManager.TABLE_PUBSUB_ITEMS, pSData, PSData.class, IObjectStorage._ID, true);
                        }
                        if (pSData.getLocalCallback() != null) {
                            pSData.getLocalCallback().call(num.intValue(), pSData);
                        } else if (StringUtils.isNotEmpty(pSData.getCallback())) {
                            PubSubManager.this.doCallback(pSData, num.intValue());
                        } else {
                            PubSubManager.logger.error("Publish message failure with code:" + num + " ,topic:" + pSData.getTopic() + " , message: " + JsonUtils.getIns().toJson(pSData));
                        }
                    }
                    return;
                }
                return;
            }
            PubSubManager.logger.warn("Got bussy result and sleep one seconds");
            for (PSData pSData2 : this.list) {
                if (pSData2.getFailCnt() < 3) {
                    pSData2.setFailCnt(pSData2.getFailCnt() + 1);
                    Integer valueOf = Integer.valueOf(PubSubManager.this.publish(pSData2));
                    num = valueOf;
                    if (valueOf.intValue() != 0) {
                        PubSubManager.this.doCallback(pSData2, num.intValue());
                        if (PubSubManager.this.objStorage != null) {
                            PubSubManager.this.objStorage.updateOrSaveById(PubSubManager.TABLE_PUBSUB_ITEMS, pSData2, PSData.class, IObjectStorage._ID, true);
                        }
                    }
                } else {
                    if (PubSubManager.this.objStorage != null) {
                        PubSubManager.this.objStorage.updateOrSaveById(PubSubManager.TABLE_PUBSUB_ITEMS, pSData2, PSData.class, IObjectStorage.ID, true);
                    }
                    if (pSData2.getLocalCallback() != null) {
                        pSData2.getLocalCallback().call(num.intValue(), pSData2);
                    } else if (StringUtils.isNotEmpty(pSData2.getCallback())) {
                        PubSubManager.this.doCallback(pSData2, num.intValue());
                    } else {
                        PubSubManager.logger.error("Pubsub Server is busuy and retry failure :" + JsonUtils.getIns().toJson(pSData2));
                    }
                }
            }
        }
    }

    /* loaded from: input_file:cn/jmicro/api/pubsub/PubSubManager$Worker.class */
    private class Worker implements Runnable {
        private Map<String, List<PSData>> ms;

        public Worker(Map<String, List<PSData>> map) {
            this.ms = null;
            this.ms = map;
        }

        @Override // java.lang.Runnable
        public void run() {
            JMicroContext.get().setBoolean("fromPubsub", true);
            for (Map.Entry<String, List<PSData>> entry : this.ms.entrySet()) {
                try {
                    List<PSData> value = entry.getValue();
                    if (value != null && !value.isEmpty()) {
                        int size = value.size();
                        if (size == 1) {
                            PSData pSData = value.get(0);
                            if (pSData.getId() <= 0) {
                                pSData.setId(PubSubManager.this.idGenerator.getIntId(PSData.class).intValue());
                            }
                            PubSubManager.this.defaultServer.publishItemJMAsync(pSData, null).then(new AsyncCallback(value));
                        } else if (size > 1) {
                            Long[] longIds = PubSubManager.this.idGenerator.getLongIds(PSData.class.getName(), value.size());
                            PSData[] pSDataArr = new PSData[value.size()];
                            value.toArray(pSDataArr);
                            for (int i = 0; i < pSDataArr.length; i++) {
                                if (pSDataArr[i] != null && pSDataArr[i].getId() <= 0) {
                                    pSDataArr[i].setId(longIds[i].longValue());
                                }
                            }
                            PubSubManager.this.defaultServer.publishItemsJMAsync(entry.getKey(), pSDataArr).then(new AsyncCallback(value));
                        }
                    }
                } catch (Throwable th) {
                    PubSubManager.logger.error(AsyncConfig.ASYNC_DISABLE, th);
                }
            }
        }
    }

    public void ready() {
        ExecutorConfig executorConfig = new ExecutorConfig();
        executorConfig.setMsMaxSize(60);
        executorConfig.setTaskQueueSize(500);
        executorConfig.setThreadNamePrefix("PubSubManager");
        this.executor = this.ef.createExecutor(executorConfig);
    }

    public boolean hasTopic(String str) {
        return this.defaultServer.hasTopic(str);
    }

    public boolean isPubsubEnable(int i) {
        return i == 0 ? this.defaultServer != null : this.defaultServer != null && this.curItemCount.get() + ((long) i) <= ((long) this.maxPsItem) && ProxyObject.isUsableRemoteProxy(this.defaultServer);
    }

    private void doLog(int i, String str) {
        if (LG.isLoggable(2, new int[0])) {
            StringBuffer stringBuffer = new StringBuffer(str + " Pubsub disable by: ");
            if (this.defaultServer == null) {
                stringBuffer.append("Pubsub server is disable!");
            } else if (this.curItemCount.get() + i > this.maxPsItem) {
                stringBuffer.append("cur item [" + this.curItemCount.get() + "] send count [" + i + "] is too max with " + this.maxPsItem);
            } else {
                stringBuffer.append("pubsub server not ready now!");
            }
            LG.log((byte) 2, getClass(), stringBuffer.toString());
        }
    }

    public int publish(String str, Object[] objArr, byte b, Map<String, Object> map) {
        if (!isPubsubEnable(1)) {
            doLog(1, "return code: -1 for topic: " + str);
            return -1;
        }
        PSData pSData = new PSData();
        pSData.setTopic(str);
        pSData.setData(objArr);
        pSData.setContext(map);
        pSData.setFlag(b);
        return publish(pSData);
    }

    public int publish(String str, String str2, byte b, Map<String, Object> map) {
        if (!isPubsubEnable(1)) {
            doLog(1, "return code: -1 for topic: " + str);
            return -1;
        }
        PSData pSData = new PSData();
        pSData.setTopic(str);
        pSData.setData(str2);
        pSData.setContext(map);
        pSData.setFlag(b);
        return publish(pSData);
    }

    public int publish(String str, byte[] bArr, byte b, Map<String, Object> map) {
        if (!isPubsubEnable(1)) {
            doLog(1, "return code: -1 for topic: " + str);
            return -1;
        }
        PSData pSData = new PSData();
        pSData.setTopic(str);
        pSData.setData(bArr);
        pSData.setContext(map);
        pSData.setFlag(b);
        return publish(pSData);
    }

    public int publish(PSData[] pSDataArr) {
        if (pSDataArr == null || pSDataArr.length == 0) {
            if (!LG.isLoggable(2, new int[0])) {
                return -5;
            }
            LG.log((byte) 2, getClass(), "send null items");
            return -5;
        }
        if (!isPubsubEnable(1)) {
            doLog(1, "return code: -1");
            return -1;
        }
        if (!this.isRunning.booleanValue()) {
            startChecker();
        }
        this.curItemCount.addAndGet(pSDataArr.length);
        ActInfo account = JMicroContext.get().getAccount();
        if (account != null && ((Boolean) this.pm.getVal(Integer.valueOf(account.getId()), PROFILE_PUBSUB, "needPersist", false, Boolean.class)).booleanValue()) {
            persist2Db(account.getId(), pSDataArr);
        }
        synchronized (this.topicSubmitItems) {
            for (PSData pSData : pSDataArr) {
                if (account != null) {
                    pSData.setSrcClientId(account.getId());
                }
                List<PSData> list = this.topicSubmitItems.get(pSData.getTopic());
                if (list == null) {
                    Map<String, List<PSData>> map = this.topicSubmitItems;
                    String topic = pSData.getTopic();
                    ArrayList arrayList = new ArrayList();
                    list = arrayList;
                    map.put(topic, arrayList);
                    this.topicLastSubmitTime.put(pSData.getTopic(), Long.valueOf(TimeUtils.getCurTime()));
                }
                list.add(pSData);
            }
        }
        synchronized (this.locker) {
            this.locker.notifyAll();
        }
        return 0;
    }

    public int publish(PSData pSData) {
        if (pSData == null) {
            if (!LG.isLoggable(2, new int[0])) {
                return -5;
            }
            LG.log((byte) 2, getClass(), "return PUB_ITEM_IS_NULL=-5");
            return -5;
        }
        if (StringUtils.isEmpty(pSData.getTopic())) {
            if (!LG.isLoggable(2, new int[0])) {
                return -6;
            }
            LG.log((byte) 2, getClass(), "return PUB_TOPIC_IS_NULL=-6");
            return -6;
        }
        if (!isPubsubEnable(1)) {
            doLog(1, "return code: -1");
            return -1;
        }
        if (!this.isRunning.booleanValue()) {
            startChecker();
        }
        ActInfo account = JMicroContext.get().getAccount();
        if (account != null && pSData.isPersist() && ((Boolean) this.pm.getVal(Integer.valueOf(pSData.getSrcClientId()), PROFILE_PUBSUB, "needPersist", false, Boolean.class)).booleanValue()) {
            persit2Db(account.getId(), pSData);
        }
        this.curItemCount.incrementAndGet();
        List<PSData> list = this.topicSubmitItems.get(pSData.getTopic());
        synchronized (this.topicSubmitItems) {
            if (list == null) {
                Map<String, List<PSData>> map = this.topicSubmitItems;
                String topic = pSData.getTopic();
                ArrayList arrayList = new ArrayList();
                list = arrayList;
                map.put(topic, arrayList);
                this.topicLastSubmitTime.put(pSData.getTopic(), Long.valueOf(TimeUtils.getCurTime()));
            }
            list.add(pSData);
        }
        synchronized (this.locker) {
            this.locker.notifyAll();
        }
        return 0;
    }

    public void persit2Db(int i, PSData pSData) {
        if (pSData.isPersist()) {
            if (pSData.getId() <= 0) {
                pSData.setId(this.idGenerator.getIntId(PSData.class).intValue());
            }
            pSData.setSrcClientId(i);
            pSData.setPersist(true);
            if (this.objStorage != null) {
                this.objStorage.save(TABLE_PUBSUB_ITEMS, (String) pSData, (Class<String>) PSData.class, true, true);
            }
        }
    }

    public void persist2Db(int i, PSData[] pSDataArr) {
        HashSet hashSet = null;
        for (PSData pSData : pSDataArr) {
            if (pSData.isPersist()) {
                if (pSData.getId() <= 0) {
                    pSData.setId(this.idGenerator.getIntId(PSData.class).intValue());
                }
                pSData.setSrcClientId(i);
                pSData.setPersist(true);
                if (this.objStorage != null) {
                    if (hashSet == null) {
                        hashSet = new HashSet();
                    }
                    hashSet.add(pSData);
                }
            }
        }
        if (this.objStorage == null || hashSet.isEmpty()) {
            return;
        }
        PSData[] pSDataArr2 = new PSData[hashSet.size()];
        hashSet.toArray(pSDataArr2);
        this.objStorage.save(TABLE_PUBSUB_ITEMS, (Object[]) pSDataArr2, PSData.class, true, true);
    }

    private void startChecker() {
        synchronized (this.runLocker) {
            if (this.isRunning.booleanValue()) {
                return;
            }
            if (LG.isLoggable(3, new int[0])) {
                LG.log((byte) 3, getClass(), "Rerun checker thread: " + Config.getInstanceName() + "_PubSubManager_Checker");
            }
            this.isRunning = true;
            Thread thread = new Thread(this::doWork);
            thread.setName(Config.getInstanceName() + "_PubSubManager_Checker");
            thread.start();
        }
    }

    private void doWork() {
        logger.info("START submit worker");
        while (this.isRunning.booleanValue()) {
            while (this.curItemCount.get() == 0) {
                try {
                    synchronized (this.locker) {
                        this.locker.wait(500);
                    }
                } catch (Throwable th) {
                    logger.error(AsyncConfig.ASYNC_DISABLE, th);
                }
            }
            long curTime = TimeUtils.getCurTime();
            HashMap hashMap = new HashMap();
            int i = 0;
            synchronized (this.topicSubmitItems) {
                for (Map.Entry<String, List<PSData>> entry : this.topicSubmitItems.entrySet()) {
                    if (!entry.getValue().isEmpty()) {
                        int size = entry.getValue().size();
                        if (size >= 50 || curTime - this.topicLastSubmitTime.get(entry.getKey()).longValue() >= 500) {
                            this.topicLastSubmitTime.put(entry.getKey(), Long.valueOf(TimeUtils.getCurTime()));
                            List list = (List) hashMap.get(entry.getKey());
                            if (!hashMap.containsKey(entry.getKey())) {
                                String key = entry.getKey();
                                ArrayList arrayList = new ArrayList();
                                list = arrayList;
                                hashMap.put(key, arrayList);
                            }
                            List<PSData> value = entry.getValue();
                            if (size > 50) {
                                size = 50;
                            }
                            Iterator<PSData> it = value.iterator();
                            while (size > 0 && it.hasNext()) {
                                PSData next = it.next();
                                if (next != null) {
                                    i++;
                                    list.add(next);
                                    it.remove();
                                }
                                size--;
                            }
                        }
                    }
                }
            }
            if (i > 0) {
                this.executor.submit(new Worker(hashMap));
            }
        }
    }

    public void doCallback(PSData pSData, int i) {
        if (!StringUtils.isNotEmpty(pSData.getCallback())) {
            logger.error("Pubsub Server is disable now:" + JsonUtils.getIns().toJson(pSData));
            return;
        }
        if (Message.is(pSData.getFlag(), 0)) {
            this.siManager.call(pSData.getCallback(), new Object[]{Integer.valueOf(i), Long.valueOf(pSData.getId()), pSData.getContext()}).then((obj, asyncFailResult, obj2) -> {
                if (asyncFailResult != null) {
                    logger.error(asyncFailResult.toString());
                }
            });
            return;
        }
        if (Message.is(pSData.getFlag(), 16)) {
            if (i == -3 || i == -1 || i == -2) {
                logger.error("Pubsub Server is disable now:" + JsonUtils.getIns().toJson(pSData));
            } else {
                publish(pSData.getCallback(), new Object[]{Integer.valueOf(i), Long.valueOf(pSData.getId())}, (byte) 0, pSData.getContext());
            }
        }
    }
}
