package com.tokera.ate.io.kafka;

import com.tokera.ate.KafkaServer;
import com.tokera.ate.dao.MessageBundle;
import com.tokera.ate.dao.TopicAndPartition;
import com.tokera.ate.dao.msg.MessageBase;
import com.tokera.ate.delegates.AteDelegate;
import com.tokera.ate.io.core.DataPartitionDaemon;
import com.tokera.ate.io.kafka.KafkaConfigTools;
import com.tokera.ate.io.repo.DataSubscriber;
import com.tokera.ate.scopes.Startup;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.enterprise.context.ApplicationScoped;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

@ApplicationScoped
@Startup
/* loaded from: input_file:com/tokera/ate/io/kafka/KafkaInbox.class */
public class KafkaInbox extends DataPartitionDaemon {
    private int pollTimeout = 10;
    protected AtomicInteger targetInit = new AtomicInteger(0);
    private AtomicInteger initLevel = new AtomicInteger(-1);
    private AtomicReference<KafkaConsumer<String, MessageBase>> consumer = new AtomicReference<>();

    @Override // com.tokera.ate.io.core.DataPartitionDaemon
    public void addPartition(TopicAndPartition topicAndPartition) {
        this.targetInit.incrementAndGet();
        super.addPartition(topicAndPartition);
    }

    private KafkaConsumer<String, MessageBase> get() {
        while (true) {
            KafkaConsumer<String, MessageBase> kafkaConsumer = this.consumer.get();
            if (kafkaConsumer != null) {
                return kafkaConsumer;
            }
            synchronized (this) {
                KafkaConsumer<String, MessageBase> kafkaConsumer2 = this.consumer.get();
                if (kafkaConsumer2 != null) {
                    return kafkaConsumer2;
                }
                KafkaConsumer<String, MessageBase> newConsumer = this.d.kafkaConfig.newConsumer(KafkaConfigTools.TopicRole.Consumer, KafkaConfigTools.TopicType.Dao, KafkaServer.getKafkaBootstrap());
                if (this.consumer.compareAndSet(null, newConsumer)) {
                    return newConsumer;
                }
                newConsumer.close();
            }
        }
    }

    private void touchLoad() {
        Integer valueOf = Integer.valueOf(this.initLevel.get());
        Integer valueOf2 = Integer.valueOf(this.targetInit.get());
        if (valueOf == valueOf2 || !this.initLevel.compareAndSet(valueOf.intValue(), valueOf2.intValue())) {
            return;
        }
        load();
    }

    private void load() {
        Set<TopicAndPartition> listPartitions = listPartitions();
        Set set = (Set) listPartitions.stream().map(topicAndPartition -> {
            return new TopicPartition(topicAndPartition.partitionTopic(), topicAndPartition.partitionIndex());
        }).collect(Collectors.toSet());
        KafkaConsumer<String, MessageBase> kafkaConsumer = get();
        Set set2 = (Set) kafkaConsumer.assignment().stream().map(topicPartition -> {
            return new TopicAndPartition(topicPartition.topic(), topicPartition.partition());
        }).collect(Collectors.toSet());
        if (set2.size() == listPartitions.size() && set2.stream().filter(topicAndPartition2 -> {
            return listPartitions.contains(topicAndPartition2);
        }).count() == listPartitions.size()) {
            return;
        }
        kafkaConsumer.assign(set);
        ArrayList arrayList = new ArrayList();
        for (TopicAndPartition topicAndPartition3 : listPartitions) {
            if (!set2.contains(topicAndPartition3)) {
                arrayList.add(new TopicPartition(topicAndPartition3.partitionTopic(), topicAndPartition3.partitionIndex()));
            }
        }
        if (arrayList.size() > 0) {
            kafkaConsumer.seekToBeginning(arrayList);
        }
        synchronized (this) {
            notifyAll();
        }
    }

    @Override // com.tokera.ate.io.core.DataPartitionDaemon
    protected void work() throws InterruptedException {
        touchLoad();
        poll();
    }

    private void idle() {
        DataSubscriber backend = AteDelegate.get().storageFactory.get().backend();
        listPartitions().forEach(topicAndPartition -> {
            backend.idle(topicAndPartition);
        });
    }

    private void poll() {
        KafkaConsumer<String, MessageBase> kafkaConsumer = get();
        Set set = (Set) kafkaConsumer.assignment().stream().map(topicPartition -> {
            return new TopicAndPartition(topicPartition.topic(), topicPartition.partition());
        }).collect(Collectors.toSet());
        if (set.size() <= 0) {
            try {
                Thread.sleep(this.pollTimeout);
                return;
            } catch (InterruptedException e) {
                return;
            }
        }
        ConsumerRecords poll = kafkaConsumer.poll(Duration.ofMillis(this.pollTimeout));
        if (poll.isEmpty()) {
            idle();
            return;
        }
        HashMap hashMap = new HashMap();
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            TopicAndPartition topicAndPartition = new TopicAndPartition(consumerRecord.topic(), consumerRecord.partition());
            set.remove(topicAndPartition);
            ((ArrayList) hashMap.computeIfAbsent(topicAndPartition, topicAndPartition2 -> {
                return new ArrayList();
            })).add(new MessageBundle((String) consumerRecord.key(), consumerRecord.partition(), consumerRecord.offset(), (MessageBase) consumerRecord.value()));
        }
        DataSubscriber backend = AteDelegate.get().storageFactory.get().backend();
        hashMap.entrySet().parallelStream().forEach(entry -> {
            backend.feed((TopicAndPartition) entry.getKey(), (Iterable) entry.getValue(), false);
        });
    }
}
