package io.kestra.runner.memory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.hash.Hashing;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.QueueService;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.Either;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kestra/runner/memory/MemoryQueue.class */
public class MemoryQueue<T> implements QueueInterface<T> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(MemoryQueue.class);
    private static final ObjectMapper mapper = JacksonMapper.ofJson();
    private final ExecutorService executorService;
    private final QueueService queueService;
    private final Class<T> cls;
    private final Map<String, List<Consumer<Either<T, DeserializationException>>>> queues = new ConcurrentHashMap();

    public MemoryQueue(Class<T> cls, QueueService queueService, ExecutorService executorService) {
        this.executorService = executorService;
        this.queueService = queueService;
        this.cls = cls;
    }

    private static int selectConsumer(String str, int i) {
        return str == null ? new Random().nextInt(i) : Hashing.consistentHash(Hashing.crc32().hashString(str, StandardCharsets.UTF_8), i);
    }

    private void produce(String str, T t) {
        if (log.isTraceEnabled()) {
            log.trace("New message: topic '{}', value {}", this.cls.getName(), t);
        }
        this.queues.forEach((str2, list) -> {
            this.executorService.execute(() -> {
                synchronized (this) {
                    if (list.isEmpty()) {
                        log.debug("No consumer connected on queue '" + this.cls.getName() + "'");
                        return;
                    }
                    Consumer consumer = (Consumer) list.get(selectConsumer(str, list.size()));
                    String str2 = null;
                    try {
                        str2 = mapper.writeValueAsString(t);
                        consumer.accept(Either.left(t == null ? null : mapper.readValue(str2, this.cls)));
                    } catch (JsonProcessingException e) {
                        consumer.accept(Either.right(new DeserializationException(e, str2)));
                    }
                }
            });
        });
    }

    public void emit(String str, T t) {
        produce(this.queueService.key(t), t);
    }

    public void emitAsync(String str, T t) throws QueueException {
        emit(t);
    }

    public void delete(String str, T t) throws QueueException {
        produce(this.queueService.key(t), null);
    }

    public Runnable receive(String str, Consumer<Either<T, DeserializationException>> consumer) {
        return receive(str, null, consumer);
    }

    public synchronized Runnable receive(String str, Class<?> cls, Consumer<Either<T, DeserializationException>> consumer) {
        String uuid = cls == null ? UUID.randomUUID().toString() : cls.getSimpleName();
        if (!this.queues.containsKey(uuid)) {
            this.queues.put(uuid, Collections.synchronizedList(new ArrayList()));
        }
        this.queues.get(uuid).add(consumer);
        int size = this.queues.get(uuid).size() - 1;
        String str2 = uuid;
        return () -> {
            synchronized (this) {
                this.queues.get(str2).remove(size);
                if (this.queues.get(str2).isEmpty()) {
                    this.queues.remove(str2);
                }
            }
        };
    }

    public void pause() {
    }

    public int getSubscribersCount() {
        return ((Integer) this.queues.values().stream().map((v0) -> {
            return v0.size();
        }).reduce(0, (v0, v1) -> {
            return Integer.sum(v0, v1);
        })).intValue();
    }

    public void close() throws IOException {
        if (this.executorService.isShutdown()) {
            return;
        }
        this.executorService.shutdown();
    }
}
