package org.apache.tubemq.client.producer;

import com.google.protobuf.ByteString;
import java.nio.ByteBuffer;
import java.security.Security;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.tubemq.client.config.TubeClientConfig;
import org.apache.tubemq.client.exception.TubeClientException;
import org.apache.tubemq.client.factory.InnerSessionFactory;
import org.apache.tubemq.client.producer.qltystats.DefaultBrokerRcvQltyStats;
import org.apache.tubemq.corebase.Message;
import org.apache.tubemq.corebase.cluster.BrokerInfo;
import org.apache.tubemq.corebase.cluster.Partition;
import org.apache.tubemq.corebase.protobuf.generated.ClientBroker;
import org.apache.tubemq.corebase.utils.AddressUtils;
import org.apache.tubemq.corebase.utils.MessageFlagUtils;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.corerpc.RpcConfig;
import org.apache.tubemq.corerpc.RpcServiceFactory;
import org.apache.tubemq.corerpc.client.Callback;
import org.apache.tubemq.corerpc.exception.LocalConnException;
import org.apache.tubemq.corerpc.service.BrokerWriteService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tubemq/client/producer/SimpleMessageProducer.class */
public class SimpleMessageProducer implements MessageProducer {
    private static final Logger logger = LoggerFactory.getLogger(SimpleMessageProducer.class);
    private final TubeClientConfig producerConfig;
    private final InnerSessionFactory sessionFactory;
    private final RpcServiceFactory rpcServiceFactory;
    private final ProducerManager producerManager;
    private final PartitionRouter partitionRouter;
    private final DefaultBrokerRcvQltyStats brokerRcvQltyStats;
    private final ConcurrentHashMap<String, Long> publishTopicMap = new ConcurrentHashMap<>();
    private final RpcConfig rpcConfig = new RpcConfig();
    private AtomicBoolean isShutDown = new AtomicBoolean(false);

    public SimpleMessageProducer(InnerSessionFactory innerSessionFactory, TubeClientConfig tubeClientConfig) throws TubeClientException {
        Security.setProperty("networkaddress.cache.ttl", "3");
        Security.setProperty("networkaddress.cache.negative.ttl", "1");
        if (innerSessionFactory == null || tubeClientConfig == null) {
            throw new TubeClientException("Illegal parameter: messageSessionFactory or tubeClientConfig is null!");
        }
        this.producerConfig = tubeClientConfig;
        this.sessionFactory = innerSessionFactory;
        this.rpcServiceFactory = this.sessionFactory.getRpcServiceFactory();
        this.producerManager = this.sessionFactory.getProducerManager();
        this.brokerRcvQltyStats = innerSessionFactory.getBrokerRcvQltyStats();
        this.partitionRouter = new RoundRobinPartitionRouter();
        this.rpcConfig.put("rpc.connect.timeout", 3000);
        this.rpcConfig.put("rpc.request.timeout", Long.valueOf(tubeClientConfig.getRpcTimeoutMs()));
        this.rpcConfig.put("rpc.netty.write.highmark", Long.valueOf(tubeClientConfig.getNettyWriteBufferHighWaterMark()));
        this.rpcConfig.put("rpc.netty.write.lowmark", Long.valueOf(tubeClientConfig.getNettyWriteBufferLowWaterMark()));
        this.rpcConfig.put("rpc.netty.worker.count", Integer.valueOf(tubeClientConfig.getRpcConnProcessorCnt()));
        this.rpcConfig.put("rpc.netty.worker.thread.name", "tube_producer_netty_worker-");
        this.rpcConfig.put("rpc.netty.worker.mem.size", Integer.valueOf(tubeClientConfig.getRpcNettyWorkMemorySize()));
        this.rpcConfig.put("rpc.netty.callback.count", Integer.valueOf(tubeClientConfig.getRpcRspCallBackThreadCnt()));
    }

    @Override // org.apache.tubemq.client.producer.MessageProducer
    public void publish(String str) throws TubeClientException {
        if (this.isShutDown.get()) {
            throw new TubeClientException("Status error: producer has been shutdown!");
        }
        if (TStringUtils.isBlank(str)) {
            throw new TubeClientException("Illegal parameter: blank topic!");
        }
        this.producerManager.publish(str.trim());
        this.publishTopicMap.putIfAbsent(str.trim(), Long.valueOf(System.currentTimeMillis()));
    }

    @Override // org.apache.tubemq.client.producer.MessageProducer
    public Set<String> publish(Set<String> set) throws TubeClientException {
        if (this.isShutDown.get()) {
            throw new TubeClientException("Status error: producer has been shutdown!");
        }
        if (set == null || set.isEmpty()) {
            throw new TubeClientException("Illegal parameter: topicSet is null or empty!");
        }
        HashSet hashSet = new HashSet();
        for (String str : set) {
            if (TStringUtils.isBlank(str)) {
                throw new TubeClientException(new StringBuilder(256).append("Illegal parameter: found blank topic value in topicSet : ").append(set).toString());
            }
            hashSet.add(str.trim());
        }
        long currentTimeMillis = System.currentTimeMillis();
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            this.publishTopicMap.putIfAbsent((String) it.next(), Long.valueOf(currentTimeMillis));
        }
        return this.producerManager.publish(hashSet);
    }

    @Override // org.apache.tubemq.client.producer.MessageProducer
    public Set<String> getPublishedTopicSet() throws TubeClientException {
        if (this.isShutDown.get()) {
            throw new TubeClientException("Status error: producer has been shutdown!");
        }
        return this.publishTopicMap.keySet();
    }

    @Override // org.apache.tubemq.client.producer.MessageProducer
    public boolean isTopicCurAcceptPublish(String str) throws TubeClientException {
        if (this.isShutDown.get()) {
            throw new TubeClientException("Status error: producer has been shutdown!");
        }
        if (TStringUtils.isBlank(str)) {
            throw new TubeClientException("Illegal parameter: blank topic!");
        }
        return (this.publishTopicMap.get(str) == null || this.producerManager.getTopicPartition(str) == null) ? false : true;
    }

    @Override // org.apache.tubemq.client.producer.MessageProducer
    public void shutdown() throws Throwable {
        logger.info("[ShutDown] begin shutdown producer...");
        if (!this.isShutDown.get() && this.isShutDown.compareAndSet(false, true)) {
            this.producerManager.removeTopic(this.publishTopicMap.keySet());
            this.publishTopicMap.clear();
            this.sessionFactory.removeClient(this);
            logger.info("[ShutDown] producer has stopped!");
        }
    }

    @Override // org.apache.tubemq.client.producer.MessageProducer
    public MessageSentResult sendMessage(Message message) throws TubeClientException, InterruptedException {
        checkMessageAndStatus(message);
        Partition selectPartition = selectPartition(message, BrokerWriteService.class);
        int brokerId = selectPartition.getBrokerId();
        try {
            this.brokerRcvQltyStats.addSendStatistic(brokerId);
            ClientBroker.SendMessageResponseB2P sendMessageP2B = getBrokerService(selectPartition.getBroker()).sendMessageP2B(createSendMessageRequest(selectPartition, message), AddressUtils.getLocalAddress(), this.producerConfig.isTlsEnable());
            this.rpcServiceFactory.resetRmtAddrErrCount(selectPartition.getBroker().getBrokerAddr());
            this.brokerRcvQltyStats.addReceiveStatistic(brokerId, sendMessageP2B.getSuccess());
            if (!sendMessageP2B.getSuccess() && sendMessageP2B.getErrCode() == 503) {
                this.rpcServiceFactory.addUnavailableBroker(brokerId);
            }
            return buildMsgSentResult(message, selectPartition, sendMessageP2B);
        } catch (Throwable th) {
            if (th instanceof LocalConnException) {
                this.rpcServiceFactory.addRmtAddrErrCount(selectPartition.getBroker().getBrokerAddr());
            }
            selectPartition.increRetries(1);
            this.brokerRcvQltyStats.addReceiveStatistic(brokerId, false);
            throw new TubeClientException("Send message failed", th);
        }
    }

    @Override // org.apache.tubemq.client.producer.MessageProducer
    public void sendMessage(final Message message, final MessageSentCallback messageSentCallback) throws TubeClientException, InterruptedException {
        checkMessageAndStatus(message);
        final Partition selectPartition = selectPartition(message, BrokerWriteService.AsyncService.class);
        final int brokerId = selectPartition.getBrokerId();
        try {
            this.brokerRcvQltyStats.addSendStatistic(brokerId);
            getAsyncBrokerService(selectPartition.getBroker()).sendMessageP2B(createSendMessageRequest(selectPartition, message), AddressUtils.getLocalAddress(), this.producerConfig.isTlsEnable(), new Callback() { // from class: org.apache.tubemq.client.producer.SimpleMessageProducer.1
                public void handleResult(Object obj) {
                    if (obj instanceof ClientBroker.SendMessageResponseB2P) {
                        ClientBroker.SendMessageResponseB2P sendMessageResponseB2P = (ClientBroker.SendMessageResponseB2P) obj;
                        MessageSentResult buildMsgSentResult = SimpleMessageProducer.this.buildMsgSentResult(message, selectPartition, sendMessageResponseB2P);
                        selectPartition.resetRetries();
                        SimpleMessageProducer.this.brokerRcvQltyStats.addReceiveStatistic(brokerId, sendMessageResponseB2P.getSuccess());
                        if (!sendMessageResponseB2P.getSuccess() && sendMessageResponseB2P.getErrCode() == 503) {
                            SimpleMessageProducer.this.rpcServiceFactory.addUnavailableBroker(brokerId);
                        }
                        messageSentCallback.onMessageSent(buildMsgSentResult);
                    }
                }

                public void handleError(Throwable th) {
                    selectPartition.increRetries(1);
                    SimpleMessageProducer.this.brokerRcvQltyStats.addReceiveStatistic(brokerId, false);
                    messageSentCallback.onException(th);
                }
            });
            this.rpcServiceFactory.resetRmtAddrErrCount(selectPartition.getBroker().getBrokerAddr());
        } catch (Throwable th) {
            if (th instanceof LocalConnException) {
                this.rpcServiceFactory.addRmtAddrErrCount(selectPartition.getBroker().getBrokerAddr());
            }
            selectPartition.increRetries(1);
            this.brokerRcvQltyStats.addReceiveStatistic(brokerId, false);
            messageSentCallback.onException(th);
        }
    }

    private void checkMessageAndStatus(Message message) throws TubeClientException {
        if (message == null) {
            throw new TubeClientException("Illegal parameter: null message package!");
        }
        if (TStringUtils.isBlank(message.getTopic())) {
            throw new TubeClientException("Illegal parameter: blank topic in message package!");
        }
        if (message.getData() == null || message.getData().length == 0) {
            throw new TubeClientException("Illegal parameter: null data in message package!");
        }
        if (this.publishTopicMap.get(message.getTopic()) == null) {
            throw new TubeClientException(new StringBuilder(512).append("Topic ").append(message.getTopic()).append(" not publish, please publish first!").toString());
        }
        if (this.producerManager.getTopicPartition(message.getTopic()) == null) {
            throw new TubeClientException(new StringBuilder(512).append("Topic ").append(message.getTopic()).append(" not publish, make sure the topic exist or acceptPublish and try later!").toString());
        }
        int length = TStringUtils.isBlank(message.getAttribute()) ? message.getData().length : message.getData().length + message.getAttribute().length();
        if (length > this.producerManager.getMaxMsgSize()) {
            throw new TubeClientException(new StringBuilder(512).append("Illegal parameter: over max message length for the total size of").append(" message data and attribute, allowed size is ").append(this.producerManager.getMaxMsgSize()).append(", message's real size is ").append(length).toString());
        }
        if (this.isShutDown.get()) {
            throw new TubeClientException("Status error: producer has been shutdown!");
        }
    }

    private ClientBroker.SendMessageRequestP2B createSendMessageRequest(Partition partition, Message message) {
        ClientBroker.SendMessageRequestP2B.Builder newBuilder = ClientBroker.SendMessageRequestP2B.newBuilder();
        newBuilder.setClientId(this.producerManager.getProducerId());
        newBuilder.setTopicName(partition.getTopic());
        newBuilder.setPartitionId(partition.getPartitionId());
        newBuilder.setData(ByteString.copyFrom(encodePayload(message)));
        newBuilder.setFlag(MessageFlagUtils.getFlag(message));
        newBuilder.setSentAddr(this.producerManager.getProducerAddrId());
        newBuilder.setCheckSum(-1);
        if (TStringUtils.isNotBlank(message.getMsgType())) {
            newBuilder.setMsgType(message.getMsgType());
        }
        if (TStringUtils.isNotBlank(message.getMsgTime())) {
            newBuilder.setMsgTime(message.getMsgTime());
        }
        return this.producerManager.setAuthorizedTokenInfo(newBuilder).build();
    }

    private byte[] encodePayload(Message message) {
        byte[] data = message.getData();
        String attribute = message.getAttribute();
        if (TStringUtils.isBlank(attribute)) {
            return data;
        }
        byte[] bytesUtf8 = StringUtils.getBytesUtf8(attribute);
        ByteBuffer allocate = ByteBuffer.allocate(4 + bytesUtf8.length + data.length);
        allocate.putInt(bytesUtf8.length);
        allocate.put(bytesUtf8);
        allocate.put(data);
        return allocate.array();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MessageSentResult buildMsgSentResult(Message message, Partition partition, ClientBroker.SendMessageResponseB2P sendMessageResponseB2P) {
        String errMsg = sendMessageResponseB2P.getErrMsg();
        return sendMessageResponseB2P.getErrCode() == 200 ? sendMessageResponseB2P.hasMessageId() ? new MessageSentResult(true, sendMessageResponseB2P.getErrCode(), "Ok!", message, sendMessageResponseB2P.getMessageId(), partition, sendMessageResponseB2P.getAppendTime(), sendMessageResponseB2P.getAppendOffset()) : new MessageSentResult(true, sendMessageResponseB2P.getErrCode(), "Ok!", message, Long.parseLong(errMsg), partition) : new MessageSentResult(false, sendMessageResponseB2P.getErrCode(), errMsg, message, -2L, partition);
    }

    private Partition selectPartition(Message message, Class cls) throws TubeClientException {
        String topic = message.getTopic();
        StringBuilder sb = new StringBuilder(512);
        Map<Integer, List<Partition>> topicPartition = this.producerManager.getTopicPartition(topic);
        if (topicPartition == null || topicPartition.isEmpty()) {
            throw new TubeClientException(sb.append("Null partition for topic: ").append(message.getTopic()).append(", please try later!").toString());
        }
        List<Partition> allowedBrokerPartitions = this.brokerRcvQltyStats.getAllowedBrokerPartitions(topicPartition);
        if (allowedBrokerPartitions == null || allowedBrokerPartitions.isEmpty()) {
            throw new TubeClientException(sb.append("No available partition for topic: ").append(message.getTopic()).toString());
        }
        Partition partition = this.partitionRouter.getPartition(message, allowedBrokerPartitions);
        if (partition == null) {
            throw new TubeClientException(new StringBuilder(512).append("Not found available partition for topic: ").append(message.getTopic()).toString());
        }
        if (this.rpcServiceFactory.isServiceEmpty()) {
            return partition;
        }
        BrokerInfo broker = partition.getBroker();
        int i = 0;
        while (true) {
            int i2 = i;
            i++;
            if (i2 >= allowedBrokerPartitions.size() || this.rpcServiceFactory.getOrCreateService(cls, broker, this.rpcConfig) != null) {
                break;
            }
            partition = this.partitionRouter.getPartition(message, allowedBrokerPartitions);
            broker = partition.getBroker();
        }
        return partition;
    }

    private BrokerWriteService getBrokerService(BrokerInfo brokerInfo) {
        return (BrokerWriteService) this.rpcServiceFactory.getService(BrokerWriteService.class, brokerInfo, this.rpcConfig);
    }

    private BrokerWriteService.AsyncService getAsyncBrokerService(BrokerInfo brokerInfo) {
        return (BrokerWriteService.AsyncService) this.rpcServiceFactory.getService(BrokerWriteService.AsyncService.class, brokerInfo, this.rpcConfig);
    }
}
