package net.wicp.tams.common.kafka.plugin;

import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import net.wicp.tams.common.Result;
import net.wicp.tams.common.apiext.StringUtil;
import net.wicp.tams.common.binlog.alone.DuckulaAssit;
import net.wicp.tams.common.binlog.alone.ListenerConf;
import net.wicp.tams.common.binlog.alone.binlog.bean.Rule;
import net.wicp.tams.common.binlog.alone.binlog.bean.RuleItem;
import net.wicp.tams.common.binlog.alone.binlog.listener.AbsBinlogListener;
import net.wicp.tams.common.constant.FieldFormart;
import net.wicp.tams.common.constant.ods.AddColName;
import net.wicp.tams.common.kafka.KafkaAssitInst;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/wicp/tams/common/kafka/plugin/ListenerKafka.class */
public class ListenerKafka extends AbsBinlogListener {
    private static final Logger log = LoggerFactory.getLogger(ListenerKafka.class);
    private final Map<String, Integer> topicPartitionsMap = new HashMap();
    private final KafkaProducer<String, byte[]> producer = KafkaAssitInst.getInst().getKafkaProducer(byte[].class);

    public void doInit(Rule rule, int i) {
        Validate.isTrue(StringUtil.isNotNull(rule.getItems().get(RuleItem.topic)), "每组规则都需要配置topic。", new Object[0]);
        String str = (String) rule.getItems().get(RuleItem.topic);
        if (this.topicPartitionsMap.containsKey(str)) {
            return;
        }
        List partitionsFor = this.producer.partitionsFor(str);
        this.topicPartitionsMap.put(str, Integer.valueOf(partitionsFor.size()));
        log.info("the topic:{} partitions:{}", str, Integer.valueOf(partitionsFor.size()));
    }

    public Result doAlterTableCallBack(Rule rule, ListenerConf.ColHis colHis, String str, String[] strArr, String str2, String[] strArr2, FieldFormart fieldFormart, String str3, String str4) {
        return Result.getSuc();
    }

    private void doBusiTrue(Rule rule, final ListenerConf.DuckulaEvent duckulaEvent, Map<AddColName, Serializable> map, boolean z, boolean z2) {
        ProducerRecord producerRecord;
        String str = (String) rule.getItems().get(RuleItem.topic);
        if (z || duckulaEvent.getIsError()) {
            if (z) {
                int intValue = this.topicPartitionsMap.get(str).intValue();
                String keyJoin = DuckulaAssit.getKeyJoin(duckulaEvent.toBuilder(), 0, "~");
                producerRecord = new ProducerRecord(str, Integer.valueOf(intValue < 2 ? 0 : StringUtil.partition(keyJoin, intValue)), String.format("%s|%s|%s", keyJoin, duckulaEvent.getDb(), duckulaEvent.getTb()), duckulaEvent.toByteArray());
            } else {
                producerRecord = new ProducerRecord(str, String.format("%s|%s|%s", duckulaEvent.getItemsCount() == 1 ? DuckulaAssit.getKeyJoin(duckulaEvent.toBuilder(), 0, "~") : "moreone", duckulaEvent.getDb(), duckulaEvent.getTb()), duckulaEvent.toByteArray());
            }
            this.producer.send(producerRecord, new Callback() { // from class: net.wicp.tams.common.kafka.plugin.ListenerKafka.1
                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                    if (exc != null) {
                        for (int i = 0; i < duckulaEvent.getItemsCount(); i++) {
                            ListenerKafka.log.error("TimeoutException: Batch Expired,send again:{}", DuckulaAssit.getKey(duckulaEvent.toBuilder(), i));
                        }
                    }
                }
            });
            return;
        }
        int intValue2 = this.topicPartitionsMap.get(str).intValue();
        final CountDownLatch countDownLatch = new CountDownLatch(duckulaEvent.getItemsCount());
        for (int i = 0; i < duckulaEvent.getItemsCount(); i++) {
            final String keyJoin2 = DuckulaAssit.getKeyJoin(duckulaEvent.toBuilder(), i, "~");
            try {
                this.producer.send(new ProducerRecord(str, Integer.valueOf(intValue2 < 2 ? 0 : StringUtil.partition(keyJoin2, intValue2)), String.format("%s|%s|%s", keyJoin2, duckulaEvent.getDb(), duckulaEvent.getTb()), duckulaEvent.toByteArray()), new Callback() { // from class: net.wicp.tams.common.kafka.plugin.ListenerKafka.2
                    public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                        if (exc != null) {
                            ListenerKafka.log.error("TimeoutException: Batch Expired,send again:{}", keyJoin2);
                        } else {
                            countDownLatch.countDown();
                        }
                    }
                });
            } catch (Exception e) {
                log.error(String.format("send error,first colvalue:[%s]", keyJoin2), e);
                throw new IllegalAccessError("发送消息时异常");
            }
        }
        try {
            countDownLatch.await(60L, TimeUnit.SECONDS);
        } catch (InterruptedException e2) {
            log.error("发送中断", e2);
        }
    }

    public void doBusiAsyncTrue(boolean z, boolean z2, Map<Rule, List<Pair<ListenerConf.DuckulaEvent, Map<AddColName, Serializable>>>> map) {
        for (Rule rule : map.keySet()) {
            for (Pair<ListenerConf.DuckulaEvent, Map<AddColName, Serializable>> pair : map.get(rule)) {
                doBusiTrue(rule, (ListenerConf.DuckulaEvent) pair.getLeft(), (Map) pair.getRight(), z, z2);
            }
        }
    }
}
