package cz.pumpitup.driver8.kafka.adapters;

import cz.pumpitup.driver8.base.webdriver.managers.Session;
import cz.pumpitup.driver8.kafka.responses.KafkaListResponse;
import cz.pumpitup.driver8.kafka.responses.KafkaMessage;
import cz.pumpitup.driver8.kafka.responses.KafkaReadResponse;
import cz.pumpitup.driver8.kafka.responses.KafkaWriteResponse;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.tinylog.Logger;

/* loaded from: input_file:cz/pumpitup/driver8/kafka/adapters/KafkaAdapter.class */
public class KafkaAdapter implements AutoCloseable {
    private static final String RESOURCE_NAME = "kafkaAdapter";
    private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(2);
    private final Properties connectionProperties = new Properties();
    private AdminClient myAdminClient;
    private KafkaConsumer<String, String> myConsumer;
    private KafkaProducer<String, String> myProducer;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cz/pumpitup/driver8/kafka/adapters/KafkaAdapter$PartitionOffsetRange.class */
    public static class PartitionOffsetRange {
        int partition;
        long startOffset;
        long endOffset;

        PartitionOffsetRange(int i, long j, long j2) {
            this.partition = i;
            this.startOffset = j;
            this.endOffset = j2;
        }
    }

    public static KafkaAdapter get(Session session) {
        AutoCloseable autoCloseable = session.resources.get(RESOURCE_NAME);
        return autoCloseable != null ? (KafkaAdapter) autoCloseable : new KafkaAdapter(session);
    }

    private KafkaAdapter(Session session) {
        this.connectionProperties.put("bootstrap.servers", extractKafkaUrl(session));
        String extractKafkaUsername = extractKafkaUsername(session);
        if (extractKafkaUsername != null) {
            String extractSecurityProtocol = extractSecurityProtocol(session);
            String extractKafkaPassword = extractKafkaPassword(session);
            this.connectionProperties.put("security.protocol", extractSecurityProtocol);
            this.connectionProperties.put("sasl.mechanism", "PLAIN");
            this.connectionProperties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + extractKafkaUsername + "\" password=\"" + extractKafkaPassword + "\";");
        }
        String extractGroupId = extractGroupId(session);
        if (extractGroupId != null) {
            this.connectionProperties.put("group.id", extractGroupId);
        }
        this.connectionProperties.put("enable.auto.commit", "false");
        this.connectionProperties.put("auto.offset.reset", "earliest");
        this.connectionProperties.put("key.deserializer", StringDeserializer.class.getCanonicalName());
        this.connectionProperties.put("value.deserializer", StringDeserializer.class.getCanonicalName());
        this.connectionProperties.put("key.serializer", StringSerializer.class.getCanonicalName());
        this.connectionProperties.put("value.serializer", StringSerializer.class.getCanonicalName());
        this.connectionProperties.put("enable.idempotence", false);
        session.resources.put(RESOURCE_NAME, this);
    }

    private static String extractSecurityProtocol(Session session) {
        return session.capabilities.getOrDefault(KafkaCapabilities.KAFKA_SECURITY_PROTOCOL, "SASL_SSL").toString();
    }

    private static String extractKafkaUsername(Session session) {
        String str;
        if (session.capabilities.containsKey(KafkaCapabilities.KAFKA_USERNAME)) {
            str = String.valueOf(session.capabilities.get(KafkaCapabilities.KAFKA_USERNAME));
        } else {
            if (session.capabilities.containsKey("kafka:username")) {
                throw createDeprecatedCapabilityException("kafka:username", KafkaCapabilities.KAFKA_USERNAME);
            }
            str = null;
        }
        return str;
    }

    private static String extractKafkaPassword(Session session) {
        String str;
        if (session.capabilities.containsKey(KafkaCapabilities.KAFKA_PASSWORD)) {
            str = String.valueOf(session.capabilities.get(KafkaCapabilities.KAFKA_PASSWORD));
        } else {
            if (session.capabilities.containsKey("kafka:password")) {
                throw createDeprecatedCapabilityException("kafka:password", KafkaCapabilities.KAFKA_PASSWORD);
            }
            str = "";
        }
        return str;
    }

    private static String extractKafkaUrl(Session session) {
        if (session.capabilities.containsKey(KafkaCapabilities.KAFKA_URL)) {
            return String.valueOf(session.capabilities.get(KafkaCapabilities.KAFKA_URL));
        }
        if (session.capabilities.containsKey("kafka:url")) {
            throw createDeprecatedCapabilityException("kafka:url", KafkaCapabilities.KAFKA_URL);
        }
        throw new IllegalArgumentException(String.format("Kafka url is mandatory under key %s in the session capabilities", KafkaCapabilities.KAFKA_URL));
    }

    private static String extractGroupId(Session session) {
        if (session.capabilities.containsKey(KafkaCapabilities.KAFKA_GROUP_ID)) {
            return String.valueOf(session.capabilities.get(KafkaCapabilities.KAFKA_GROUP_ID));
        }
        if (session.capabilities.containsKey("kafka:groupId")) {
            throw createDeprecatedCapabilityException("kafka:groupId", KafkaCapabilities.KAFKA_GROUP_ID);
        }
        return null;
    }

    private static IllegalArgumentException createDeprecatedCapabilityException(String str, String str2) {
        return new IllegalArgumentException(String.format("\"%s\" capability is deprecated. Please use \"%s\".", str, str2));
    }

    @Override // java.lang.AutoCloseable
    public synchronized void close() {
        if (this.myAdminClient != null) {
            this.myAdminClient.close();
        }
        if (this.myConsumer != null) {
            this.myConsumer.close();
        }
        if (this.myProducer != null) {
            this.myProducer.close();
        }
    }

    private KafkaConsumer<String, String> getConsumer() {
        if (this.myConsumer == null) {
            Logger.debug("Creating consumer client with following properties:\n{}", new Object[]{this.connectionProperties});
            this.myConsumer = new KafkaConsumer<>(this.connectionProperties);
        }
        return this.myConsumer;
    }

    private KafkaProducer<String, String> getProducer() {
        if (this.myProducer == null) {
            Logger.debug("Creating producer client with following properties:\n{}", new Object[]{this.connectionProperties});
            this.myProducer = new KafkaProducer<>(this.connectionProperties);
        }
        return this.myProducer;
    }

    private AdminClient getAdminClient() {
        if (this.myAdminClient == null) {
            Logger.debug("Creating admin client with following properties:\n{}", new Object[]{this.connectionProperties});
            this.myAdminClient = AdminClient.create(this.connectionProperties);
        }
        return this.myAdminClient;
    }

    public synchronized KafkaReadResponse read(SearchFilter searchFilter) throws Exception {
        Logger.debug("Getting producer client");
        KafkaConsumer<String, String> consumer = getConsumer();
        try {
            return new KafkaReadResponse(filterMessages(searchFilter, getKafkaMessages(searchFilter, consumer, seekToNecessaryOffsets(consumer, this.connectionProperties.containsKey("group.id") ? automaticallyAssignPartitions(consumer, searchFilter) : manuallyAssignPartitions(consumer, searchFilter), searchFilter))));
        } catch (Exception e) {
            Logger.error("Unable to list messages from Kafka");
            Logger.error(e);
            throw new Exception("Unable to list messages from Kafka", e);
        }
    }

    private static Collection<TopicPartition> manuallyAssignPartitions(KafkaConsumer<String, String> kafkaConsumer, SearchFilter searchFilter) {
        Collection<TopicPartition> collection = (Collection) kafkaConsumer.partitionsFor(searchFilter.topic).stream().peek(partitionInfo -> {
            Logger.trace("Got from Kafka: " + partitionInfo.toString());
        }).filter(partitionInfo2 -> {
            return searchFilter.partition == -1 || partitionInfo2.partition() == searchFilter.partition;
        }).map(partitionInfo3 -> {
            return new TopicPartition(searchFilter.topic, partitionInfo3.partition());
        }).collect(Collectors.toList());
        kafkaConsumer.assign(collection);
        Logger.debug("Manual assignment successful with {} partitions", new Object[]{Integer.valueOf(collection.size())});
        Logger.trace("Partitions assigned: " + collection);
        return collection;
    }

    private static Collection<TopicPartition> automaticallyAssignPartitions(KafkaConsumer<String, String> kafkaConsumer, SearchFilter searchFilter) {
        kafkaConsumer.subscribe(Collections.singletonList(searchFilter.topic));
        Set assignment = kafkaConsumer.assignment();
        Logger.debug("Automatic assignment successful with {} partitions", new Object[]{Integer.valueOf(assignment.size())});
        Logger.trace("Partitions assigned: " + assignment);
        return assignment;
    }

    private static HashMap<Integer, PartitionOffsetRange> seekToNecessaryOffsets(KafkaConsumer<String, String> kafkaConsumer, Collection<TopicPartition> collection, SearchFilter searchFilter) {
        Map map;
        HashMap<Integer, PartitionOffsetRange> hashMap = new HashMap<>();
        if (searchFilter.offset != -1) {
            collection.forEach(topicPartition -> {
                Logger.debug("Seeking to offset {} on partition {}", new Object[]{Long.valueOf(searchFilter.offset), Integer.valueOf(topicPartition.partition())});
                kafkaConsumer.seek(topicPartition, searchFilter.offset);
                hashMap.put(Integer.valueOf(topicPartition.partition()), new PartitionOffsetRange(topicPartition.partition(), searchFilter.offset, searchFilter.offset));
            });
        } else if (searchFilter.tailOffsets > 0) {
            kafkaConsumer.seekToEnd(collection);
            collection.forEach(topicPartition2 -> {
                long position = kafkaConsumer.position(topicPartition2);
                long max = Math.max(position - searchFilter.tailOffsets, 0L);
                Logger.debug("Seeking to offset {} on partition {}", new Object[]{Long.valueOf(max), Integer.valueOf(topicPartition2.partition())});
                kafkaConsumer.seek(topicPartition2, max);
                hashMap.put(Integer.valueOf(topicPartition2.partition()), new PartitionOffsetRange(topicPartition2.partition(), max, position));
            });
        } else if (searchFilter.dateStart != -1) {
            HashMap hashMap2 = new HashMap();
            collection.forEach(topicPartition3 -> {
                hashMap2.put(topicPartition3, Long.valueOf(searchFilter.dateStart));
            });
            Map offsetsForTimes = kafkaConsumer.offsetsForTimes(hashMap2);
            if (searchFilter.dateEnd != -1) {
                collection.forEach(topicPartition4 -> {
                    hashMap2.put(topicPartition4, Long.valueOf(searchFilter.dateEnd));
                });
                map = kafkaConsumer.offsetsForTimes(hashMap2);
            } else {
                map = null;
            }
            Map map2 = map;
            collection.forEach(topicPartition5 -> {
                OffsetAndTimestamp offsetAndTimestamp = (OffsetAndTimestamp) offsetsForTimes.get(topicPartition5);
                long offset = map2 != null ? map2.get(topicPartition5) != null ? ((OffsetAndTimestamp) map2.get(topicPartition5)).offset() : -1L : -1L;
                if (offsetAndTimestamp == null) {
                    Logger.debug("First offset of partition {} is already after specified start date, not seeking earlier", new Object[]{Integer.valueOf(topicPartition5.partition())});
                    hashMap.put(Integer.valueOf(topicPartition5.partition()), new PartitionOffsetRange(topicPartition5.partition(), kafkaConsumer.position(topicPartition5), offset));
                } else {
                    long offset2 = ((OffsetAndTimestamp) offsetsForTimes.get(topicPartition5)).offset();
                    Logger.debug("Seeking to offset {} on partition {}", new Object[]{Long.valueOf(offset2), Integer.valueOf(topicPartition5.partition())});
                    kafkaConsumer.seek(topicPartition5, offset2);
                    hashMap.put(Integer.valueOf(topicPartition5.partition()), new PartitionOffsetRange(topicPartition5.partition(), offset2, offset));
                }
            });
        } else {
            Logger.debug("Seeking to beginning on all partitions");
            kafkaConsumer.seekToBeginning(collection);
            collection.forEach(topicPartition6 -> {
                long position = kafkaConsumer.position(topicPartition6);
                hashMap.put(Integer.valueOf(topicPartition6.partition()), new PartitionOffsetRange(topicPartition6.partition(), Math.max(position - searchFilter.tailOffsets, 0L), position));
            });
        }
        return hashMap;
    }

    private static List<KafkaMessage> getKafkaMessages(SearchFilter searchFilter, KafkaConsumer<String, String> kafkaConsumer, HashMap<Integer, PartitionOffsetRange> hashMap) {
        ArrayList arrayList = new ArrayList();
        int i = 0;
        Logger.debug("Starting polling loop");
        loop0: while (true) {
            if (arrayList.size() >= searchFilter.maxCount) {
                break;
            }
            ConsumerRecords poll = kafkaConsumer.poll(DEFAULT_TIMEOUT);
            i++;
            if (poll.isEmpty()) {
                Logger.trace("Received records are empty stopping any further polling");
                break;
            }
            Logger.debug("Poll round {} Received from Kafka: {} messages", new Object[]{Integer.valueOf(i), Integer.valueOf(poll.count())});
            Iterator it = poll.iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                if (arrayList.size() >= searchFilter.maxCount) {
                    break loop0;
                }
                PartitionOffsetRange partitionOffsetRange = hashMap.get(Integer.valueOf(consumerRecord.partition()));
                if (partitionOffsetRange != null) {
                    if (partitionOffsetRange.endOffset == -1 || consumerRecord.offset() <= partitionOffsetRange.endOffset) {
                        arrayList.add(new KafkaMessage(consumerRecord));
                        Logger.trace("Received message on partition: {}, offset: {}, timestamp: {}", new Object[]{Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp())});
                    } else {
                        Logger.trace("Received too young message on partition: {}, offset: {}, timestamp: {}, ignoring any other messages from that partition and pausing receiving from it for next polls", new Object[]{Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp())});
                        kafkaConsumer.pause(Collections.singletonList(new TopicPartition(consumerRecord.topic(), consumerRecord.partition())));
                        hashMap.remove(Integer.valueOf(consumerRecord.partition()));
                        if (hashMap.isEmpty()) {
                            Logger.trace("All partitions delivered all expected results, stopping any polling");
                            break loop0;
                        }
                    }
                }
            }
            Logger.debug("Total messages in my buffer after this round: {} messages", new Object[]{Integer.valueOf(arrayList.size())});
        }
        Logger.debug("Total messages in my buffer after all rounds {} messages", new Object[]{Integer.valueOf(arrayList.size())});
        return arrayList;
    }

    private static List<KafkaMessage> filterMessages(SearchFilter searchFilter, List<KafkaMessage> list) {
        List<KafkaMessage> list2;
        Logger.debug("All messages received, applying additional filters");
        long j = searchFilter.maxCount;
        long j2 = searchFilter.dateStart == -1 ? 0L : searchFilter.dateStart;
        long epochMilli = searchFilter.dateEnd == -1 ? Instant.now().plusMillis(SearchFilter.MAX_ALLOWED_FUTURE.longValue()).toEpochMilli() : searchFilter.dateEnd;
        if (searchFilter.tailOffsets > 0) {
            Logger.debug("Filtering {} last messages ordered by timestamp within {} - {} and staying within max count {}", new Object[]{Long.valueOf(searchFilter.tailOffsets), Instant.ofEpochMilli(j2).toString(), Instant.ofEpochMilli(epochMilli).toString(), Long.valueOf(j)});
            list2 = (List) list.stream().sorted(Comparator.comparingLong(kafkaMessage -> {
                return -kafkaMessage.timestamp;
            })).filter(kafkaMessage2 -> {
                return kafkaMessage2.timestamp >= j2 && kafkaMessage2.timestamp <= epochMilli;
            }).limit(Math.min(searchFilter.tailOffsets, j)).sorted(Comparator.comparingLong(kafkaMessage3 -> {
                return kafkaMessage3.timestamp;
            })).collect(Collectors.toList());
        } else {
            Logger.debug("Filtering max {} messages within {} - {} ordered by timestamp", new Object[]{Long.valueOf(j), Instant.ofEpochMilli(j2).toString(), Instant.ofEpochMilli(epochMilli).toString()});
            list2 = (List) list.stream().sorted(Comparator.comparingLong(kafkaMessage4 -> {
                return kafkaMessage4.timestamp;
            })).filter(kafkaMessage5 -> {
                return kafkaMessage5.timestamp >= j2 && kafkaMessage5.timestamp <= epochMilli;
            }).limit(j).collect(Collectors.toList());
        }
        Logger.debug("Final result set has {} messages, sending back to client", new Object[]{Integer.valueOf(list2.size())});
        return list2;
    }

    public synchronized KafkaWriteResponse write(Record record) throws Exception {
        Logger.debug("Getting producer client");
        KafkaProducer<String, String> producer = getProducer();
        try {
            Logger.debug("Sending new message with key {}, partitionId {} and length {} to topic {}", new Object[]{record.key, Integer.valueOf(record.message.length()), record.partitionId, record.topic});
            RecordMetadata recordMetadata = (RecordMetadata) producer.send(record.toProducerRecord()).get();
            Logger.debug("Message seems accepted on partition {}", new Object[]{Integer.valueOf(recordMetadata.partition())});
            return new KafkaWriteResponse(recordMetadata);
        } catch (InterruptedException | ExecutionException | KafkaException e) {
            Logger.error("Unable to write message to Kafka");
            Logger.error(e);
            throw new Exception("Unable to write message to Kafka", e);
        }
    }

    public synchronized KafkaListResponse listTopics() throws Exception {
        Logger.debug("Getting admin client");
        AdminClient adminClient = getAdminClient();
        try {
            Logger.debug("Getting list of available topics");
            Collection collection = (Collection) adminClient.listTopics().listings().get();
            Logger.debug("Got {} topics, sending back to client", new Object[]{Integer.valueOf(collection.size())});
            return new KafkaListResponse(collection);
        } catch (InterruptedException | ExecutionException e) {
            Logger.error("Unable to list topics in Kafka");
            Logger.error(e);
            throw new Exception("Unable to list topics in Kafka", e);
        }
    }

    public synchronized void createTopic(String str) throws Exception {
        Logger.debug("Getting admin client");
        AdminClient adminClient = getAdminClient();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new NewTopic(str, 1, (short) 1));
        try {
            Logger.debug("Creating topic {}", new Object[]{str});
            adminClient.createTopics(arrayList).all().get();
            Logger.debug("Topic creation seems successful");
        } catch (InterruptedException | ExecutionException e) {
            Logger.error("Unable to create topic in Kafka");
            Logger.error(e);
            throw new Exception("Unable to create topic in Kafka", e);
        }
    }

    public synchronized void deleteTopic(String str) throws Exception {
        Logger.debug("Getting admin client");
        AdminClient adminClient = getAdminClient();
        ArrayList arrayList = new ArrayList();
        arrayList.add(str);
        try {
            Logger.debug("Creating topic {}", new Object[]{str});
            adminClient.deleteTopics(arrayList).all().get();
            Logger.debug("Topic deletion seems successful");
        } catch (InterruptedException | ExecutionException e) {
            throw new Exception("Unable to delete topic in Kafka", e);
        }
    }
}
