package com.tokera.ate.io.kafka;

import com.tokera.ate.KafkaServer;
import com.tokera.ate.dao.TopicAndPartition;
import com.tokera.ate.dao.kafka.MessageSerializer;
import com.tokera.ate.dao.msg.MessageBase;
import com.tokera.ate.dao.msg.MessageData;
import com.tokera.ate.delegates.AteDelegate;
import com.tokera.ate.dto.msg.MessageBaseDto;
import com.tokera.ate.dto.msg.MessageDataDto;
import com.tokera.ate.dto.msg.MessageSyncDto;
import com.tokera.ate.io.api.IPartitionKey;
import com.tokera.ate.io.kafka.KafkaConfigTools;
import com.tokera.ate.io.repo.DataPartitionChain;
import com.tokera.ate.io.repo.IDataPartitionBridge;
import com.tokera.ate.providers.PartitionKeySerializer;
import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.UUID;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:com/tokera/ate/io/kafka/KafkaPartitionBridge.class */
public class KafkaPartitionBridge implements IDataPartitionBridge {
    public final AteDelegate d;
    public final IPartitionKey where;
    public final DataPartitionChain chain;
    private volatile MessageSyncDto loadSync = null;

    public KafkaPartitionBridge(AteDelegate ateDelegate, IPartitionKey iPartitionKey, DataPartitionChain dataPartitionChain) {
        this.d = ateDelegate;
        this.where = iPartitionKey;
        this.chain = dataPartitionChain;
    }

    @Override // com.tokera.ate.io.repo.IDataPartitionBridge
    public void send(MessageBaseDto messageBaseDto) {
        ProducerRecord<String, MessageBase> producerRecord = new ProducerRecord<>(this.where.partitionTopic(), Integer.valueOf(this.where.partitionIndex()), MessageSerializer.getKey(messageBaseDto), messageBaseDto.createBaseFlatBuffer());
        KafkaProducer<String, MessageBase> kafkaProducer = this.d.kafkaOutbox.get();
        if (kafkaProducer != null) {
            kafkaProducer.send(producerRecord);
        }
        this.d.debugLogging.logKafkaSend(producerRecord, messageBaseDto);
    }

    @Override // com.tokera.ate.io.repo.IDataPartitionBridge
    public void deleteMany(Collection<String> collection) {
        KafkaProducer<String, MessageBase> kafkaProducer = this.d.kafkaOutbox.get();
        if (kafkaProducer == null) {
            return;
        }
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            ProducerRecord<String, MessageBase> producerRecord = new ProducerRecord<>(this.where.partitionTopic(), Integer.valueOf(this.where.partitionIndex()), it.next(), (Object) null);
            kafkaProducer.send(producerRecord);
            this.d.debugLogging.logKafkaDelete(producerRecord);
        }
    }

    @Override // com.tokera.ate.io.repo.IDataPartitionBridge
    public MessageDataDto getVersion(UUID uuid, long j) {
        TopicPartition topicPartition = new TopicPartition(this.where.partitionTopic(), this.where.partitionIndex());
        LinkedList linkedList = new LinkedList();
        linkedList.add(topicPartition);
        KafkaConsumer<String, MessageBase> newConsumer = this.d.kafkaConfig.newConsumer(KafkaConfigTools.TopicRole.Consumer, KafkaConfigTools.TopicType.Dao, KafkaServer.getKafkaBootstrap());
        newConsumer.assign(linkedList);
        newConsumer.seek(topicPartition, j);
        ConsumerRecords poll = newConsumer.poll(Duration.ofMillis(5000L));
        if (poll.isEmpty()) {
            return null;
        }
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            if (consumerRecord.partition() == partitionKey().partitionIndex() && consumerRecord.offset() == j && ((MessageBase) consumerRecord.value()).msgType() == 1) {
                MessageData messageData = (MessageData) ((MessageBase) consumerRecord.value()).msg(new MessageData());
                if (messageData == null) {
                    return null;
                }
                return new MessageDataDto(messageData);
            }
        }
        return null;
    }

    public void sendLoadSync() {
        MessageSyncDto startSync = this.d.partitionSyncManager.startSync();
        send(startSync);
        this.loadSync = startSync;
        this.d.debugLogging.logBeginLoad(this.where);
    }

    @Override // com.tokera.ate.io.repo.IDataPartitionBridge
    public void waitTillLoaded() {
        boolean z = false;
        boolean z2 = false;
        boolean z3 = false;
        if (this.loadSync != null) {
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();
            while (!this.d.partitionSyncManager.hasFinishSync(this.loadSync)) {
                if (stopWatch.getTime() > 5000 && !z) {
                    sendLoadSync();
                    z = true;
                }
                if (stopWatch.getTime() > 8000 && !z3) {
                    this.d.kafkaInbox.addPartition(new TopicAndPartition(this.where));
                    this.d.dataMaintenance.addPartition(new TopicAndPartition(this.where));
                    z3 = true;
                }
                if (stopWatch.getTime() > 15000 && !z2) {
                    createTopic();
                    this.d.kafkaInbox.addPartition(new TopicAndPartition(this.where));
                    this.d.dataMaintenance.addPartition(new TopicAndPartition(this.where));
                    z2 = true;
                }
                if (stopWatch.getTime() > 25000) {
                    throw new RuntimeException("Busy loading data partition [" + PartitionKeySerializer.toString(this.where) + "]");
                }
                try {
                    Thread.sleep(50L);
                } catch (InterruptedException e) {
                    return;
                }
            }
            this.d.debugLogging.logFinishLoad(this.where);
            this.loadSync = null;
        }
    }

    public void createTopic() {
        switch (AteDelegate.get().kafkaTopicFactory.create(this.where.partitionTopic(), this.where.partitionType())) {
            case AlreadyExists:
            default:
                return;
            case WasCreated:
                AteDelegate.get().genericLogger.info("partition [" + this.where + "]: loaded-created");
                return;
            case Failed:
                throw new WebApplicationException("Failed to create the new partitions.", Response.Status.INTERNAL_SERVER_ERROR);
        }
    }

    @Override // com.tokera.ate.io.repo.IDataPartitionBridge
    public IPartitionKey partitionKey() {
        return this.where;
    }

    @Override // com.tokera.ate.io.repo.IDataPartitionBridge
    public DataPartitionChain chain() {
        return this.chain;
    }

    @Override // com.tokera.ate.io.repo.IDataPartitionBridge
    public boolean hasLoaded() {
        return this.loadSync == null;
    }
}
