package net.dempsy.threading;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import net.dempsy.container.Container;
import net.dempsy.container.ContainerJob;
import net.dempsy.container.ContainerJobMetadata;
import net.dempsy.container.MessageDeliveryJob;
import net.dempsy.util.Functional;
import net.dempsy.util.SafeString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/dempsy/threading/OrderedPerContainerThreadingModel.class */
public class OrderedPerContainerThreadingModel implements ThreadingModel {
    private static final int INTERIM_SPIN_COUNT1 = 100;
    private static final int INTERIM_SPIN_COUNT2 = 500;
    public static final String CONFIG_KEY_MAX_PENDING = "max_pending";
    public static final String DEFAULT_MAX_PENDING = "100000";
    public static final String CONFIG_KEY_HARD_SHUTDOWN = "hard_shutdown";
    public static final String DEFAULT_HARD_SHUTDOWN = "true";
    public static final String CONFIG_KEY_DESERIALIZATION_THREADS = "deserialization_threads";
    public static final String DEFAULT_DESERIALIZATION_THREADS = "2";
    private ExecutorService calcContainersWork;
    private final BlockingQueue<MessageDeliveryJobHolder> inqueue;
    private final AtomicBoolean isStopped;
    private Thread shuttleThread;
    private final AtomicLong numLimitedX;
    private long maxNumWaitingLimitedTasks;
    private int deserializationThreadCount;
    private final Supplier<String> nameSupplier;
    private boolean started;
    private static Logger LOGGER = LoggerFactory.getLogger(OrderedPerContainerThreadingModel.class);
    private static final AtomicLong seq = new AtomicLong(0);
    private static final AtomicLong threadNum = new AtomicLong();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/dempsy/threading/OrderedPerContainerThreadingModel$ContainerJobHolder.class */
    public static class ContainerJobHolder {
        private final ContainerJob job;
        private final MessageDeliveryJobHolder wholeJob;

        public ContainerJobHolder(ContainerJob containerJob, MessageDeliveryJobHolder messageDeliveryJobHolder) {
            this.job = containerJob;
            this.wholeJob = messageDeliveryJobHolder;
            this.wholeJob.preEnqueuedTrackContainerJob();
        }

        public void process(ContainerJobMetadata containerJobMetadata) {
            this.wholeJob.preWorkTrackContainerJob();
            try {
                this.job.execute(containerJobMetadata);
            } finally {
                this.wholeJob.postWorkTrackContainerJob();
            }
        }

        public void reject(ContainerJobMetadata containerJobMetadata) {
            this.wholeJob.preWorkTrackContainerJob();
            try {
                this.job.reject(containerJobMetadata);
            } finally {
                this.wholeJob.postWorkTrackContainerJob();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/dempsy/threading/OrderedPerContainerThreadingModel$ContainerWorker.class */
    public class ContainerWorker implements Runnable {
        public final BlockingQueue<ContainerJobHolder> queue = new LinkedBlockingQueue();
        public final Thread containerThread;
        public final ContainerJobMetadata container;

        public ContainerWorker(ContainerJobMetadata containerJobMetadata) {
            this.container = containerJobMetadata;
            this.containerThread = (Thread) Functional.chain(new Thread(this, OrderedPerContainerThreadingModel.this.nameSupplier.get() + "-ContainerWorker-" + OrderedPerContainerThreadingModel.seq.incrementAndGet()), new Consumer[]{thread -> {
                thread.start();
            }});
        }

        @Override // java.lang.Runnable
        public void run() {
            int i = 0;
            while (!OrderedPerContainerThreadingModel.this.isStopped.get()) {
                try {
                    ContainerJobHolder poll = this.queue.poll();
                    if (poll != null) {
                        i = 0;
                        poll.process(this.container);
                    } else {
                        i++;
                        if (i > OrderedPerContainerThreadingModel.INTERIM_SPIN_COUNT2) {
                            Functional.ignore(() -> {
                                Thread.sleep(1L);
                            });
                        } else if (i > OrderedPerContainerThreadingModel.INTERIM_SPIN_COUNT1) {
                            Thread.yield();
                        }
                    }
                } catch (Throwable th) {
                    OrderedPerContainerThreadingModel.LOGGER.error("Completely unexpected exception:", th);
                }
            }
            ArrayList arrayList = new ArrayList(this.queue.size());
            this.queue.drainTo(arrayList);
            arrayList.forEach(containerJobHolder -> {
                containerJobHolder.reject(this.container);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/dempsy/threading/OrderedPerContainerThreadingModel$MessageDeliveryJobHolder.class */
    public static class MessageDeliveryJobHolder {
        private final MessageDeliveryJob job;
        private final boolean limited;
        private final AtomicLong numLimited;
        private final AtomicLong queuedContainerJobsX = new AtomicLong(0);
        private final AtomicLong unfinishedContainerJobsX = new AtomicLong(0);

        public MessageDeliveryJobHolder(MessageDeliveryJob messageDeliveryJob, boolean z, AtomicLong atomicLong, long j) {
            this.job = messageDeliveryJob;
            this.limited = z;
            this.numLimited = atomicLong;
            if (z) {
                atomicLong.incrementAndGet();
            }
        }

        public final void reject() {
            if (this.limited) {
                this.numLimited.decrementAndGet();
            }
            this.job.rejected();
        }

        public final boolean areContainersCalculated() {
            return this.job.containersCalculated();
        }

        public final void calculateContainers() {
            this.job.calculateContainers();
        }

        public final void preEnqueuedTrackContainerJob() {
            this.queuedContainerJobsX.incrementAndGet();
            this.unfinishedContainerJobsX.incrementAndGet();
        }

        public final void preWorkTrackContainerJob() {
            if (this.queuedContainerJobsX.decrementAndGet() == 0 && this.limited) {
                this.numLimited.decrementAndGet();
            }
        }

        public final void postWorkTrackContainerJob() {
            if (this.unfinishedContainerJobsX.decrementAndGet() == 0) {
                this.job.individuatedJobsComplete();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/dempsy/threading/OrderedPerContainerThreadingModel$Shuttler.class */
    public class Shuttler implements Runnable {
        private final Map<Container, ContainerWorker> containerWorkers = new HashMap();
        private final BlockingQueue<MessageDeliveryJobHolder> deserQueue = new LinkedBlockingQueue();

        private Shuttler() {
        }

        private void handleCalculatedContainerMessage(MessageDeliveryJobHolder messageDeliveryJobHolder) {
            ContainerJobMetadata[] containerData = messageDeliveryJobHolder.job.containerData();
            if (containerData == null || containerData.length <= 0) {
                OrderedPerContainerThreadingModel.LOGGER.info("Message didn't deserialize correctly.");
                messageDeliveryJobHolder.reject();
                return;
            }
            int i = 0;
            for (ContainerJobHolder containerJobHolder : (ContainerJobHolder[]) messageDeliveryJobHolder.job.individuate().stream().map(containerJob -> {
                return new ContainerJobHolder(containerJob, messageDeliveryJobHolder);
            }).toArray(i2 -> {
                return new ContainerJobHolder[i2];
            })) {
                ContainerJobMetadata containerJobMetadata = containerData[i];
                Container container = containerJobMetadata.container;
                if (!this.containerWorkers.computeIfAbsent(container, container2 -> {
                    return new ContainerWorker(containerJobMetadata);
                }).queue.offer(containerJobHolder)) {
                    containerJobHolder.reject(containerJobMetadata);
                    OrderedPerContainerThreadingModel.LOGGER.warn("Message {} failed to be queued to container {}.", SafeString.objectDescription(messageDeliveryJobHolder), container.getClusterId());
                }
                i++;
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            int i = 0;
            while (!OrderedPerContainerThreadingModel.this.isStopped.get()) {
                boolean z = false;
                try {
                    MessageDeliveryJobHolder poll = OrderedPerContainerThreadingModel.this.inqueue.poll();
                    if (poll != null) {
                        z = true;
                        i = 0;
                        if (poll.areContainersCalculated()) {
                            handleCalculatedContainerMessage(poll);
                        } else if (this.deserQueue.offer(poll)) {
                            OrderedPerContainerThreadingModel.this.calcContainersWork.submit(() -> {
                                poll.calculateContainers();
                            });
                        } else {
                            poll.reject();
                        }
                    }
                } catch (RuntimeException e) {
                    OrderedPerContainerThreadingModel.LOGGER.error("Error while dequeing.", e);
                }
                try {
                    MessageDeliveryJobHolder peek = this.deserQueue.peek();
                    if (peek != null && peek.areContainersCalculated()) {
                        z = true;
                        i = 0;
                        handleCalculatedContainerMessage(this.deserQueue.poll());
                    }
                } catch (RuntimeException e2) {
                    OrderedPerContainerThreadingModel.LOGGER.error("Error while dequeing.", e2);
                }
                if (!z) {
                    i++;
                    if (i > OrderedPerContainerThreadingModel.INTERIM_SPIN_COUNT2) {
                        Functional.ignore(() -> {
                            Thread.sleep(1L);
                        });
                    } else if (i > OrderedPerContainerThreadingModel.INTERIM_SPIN_COUNT1) {
                        Thread.yield();
                    }
                }
            }
        }
    }

    private OrderedPerContainerThreadingModel(Supplier<String> supplier, int i) {
        this.calcContainersWork = null;
        this.inqueue = new LinkedBlockingQueue();
        this.isStopped = new AtomicBoolean(false);
        this.shuttleThread = null;
        this.numLimitedX = new AtomicLong(0L);
        this.deserializationThreadCount = Integer.parseInt(DEFAULT_DESERIALIZATION_THREADS);
        this.started = false;
        this.nameSupplier = supplier;
        this.maxNumWaitingLimitedTasks = i;
    }

    private OrderedPerContainerThreadingModel(Supplier<String> supplier) {
        this(supplier, Integer.parseInt("100000"));
    }

    private static Supplier<String> bakedDefaultName(String str) {
        long andIncrement = threadNum.getAndIncrement();
        return () -> {
            return str + "-" + andIncrement;
        };
    }

    public OrderedPerContainerThreadingModel(String str) {
        this(bakedDefaultName(str));
    }

    public OrderedPerContainerThreadingModel(String str, int i) {
        this(bakedDefaultName(str), i);
    }

    public OrderedPerContainerThreadingModel setDeserializationThreadCount(int i) {
        this.deserializationThreadCount = i;
        return this;
    }

    public static String configKey(String str) {
        return OrderedPerContainerThreadingModel.class.getPackageName() + "." + str;
    }

    @Override // net.dempsy.threading.ThreadingModel
    public synchronized boolean isStarted() {
        return this.started;
    }

    @Override // net.dempsy.threading.ThreadingModel
    public OrderedPerContainerThreadingModel start() {
        this.shuttleThread = (Thread) Functional.chain(newThread(new Shuttler(), this.nameSupplier.get() + "-Shuttle"), new Consumer[]{thread -> {
            thread.start();
        }});
        this.calcContainersWork = Executors.newFixedThreadPool(this.deserializationThreadCount, runnable -> {
            return new Thread(runnable, this.nameSupplier.get() + "-Deser-" + seq.getAndIncrement());
        });
        this.started = true;
        return this;
    }

    private static String getConfigValue(Map<String, String> map, String str, String str2) {
        String str3 = OrderedPerContainerThreadingModel.class.getPackage().getName() + "." + str;
        return map.containsKey(str3) ? map.get(str3) : str2;
    }

    public OrderedPerContainerThreadingModel configure(Map<String, String> map) {
        setMaxNumberOfQueuedLimitedTasks(Integer.parseInt(getConfigValue(map, "max_pending", "100000")));
        setDeserializationThreadCount(Integer.parseInt(getConfigValue(map, CONFIG_KEY_DESERIALIZATION_THREADS, DEFAULT_DESERIALIZATION_THREADS)));
        return this;
    }

    public int getMaxNumberOfQueuedLimitedTasks() {
        return (int) this.maxNumWaitingLimitedTasks;
    }

    public OrderedPerContainerThreadingModel setMaxNumberOfQueuedLimitedTasks(long j) {
        this.maxNumWaitingLimitedTasks = j;
        return this;
    }

    @Override // net.dempsy.threading.ThreadingModel, java.lang.AutoCloseable
    public void close() {
        this.isStopped.set(true);
        Functional.ignore(() -> {
            this.shuttleThread.join(10000L);
        });
        if (this.shuttleThread.isAlive()) {
            LOGGER.warn("Couldn't stop the dequeing thread.");
        }
    }

    @Override // net.dempsy.threading.ThreadingModel
    public int getNumberLimitedPending() {
        return this.numLimitedX.intValue();
    }

    @Override // net.dempsy.threading.ThreadingModel
    public void submit(MessageDeliveryJob messageDeliveryJob) {
        MessageDeliveryJobHolder messageDeliveryJobHolder = new MessageDeliveryJobHolder(messageDeliveryJob, false, this.numLimitedX, this.maxNumWaitingLimitedTasks);
        if (this.inqueue.offer(messageDeliveryJobHolder)) {
            return;
        }
        messageDeliveryJobHolder.reject();
        LOGGER.error("Failed to queue message destined for {}", Optional.ofNullable(messageDeliveryJob.containerData()).map(containerJobMetadataArr -> {
            return (List) Arrays.stream(containerJobMetadataArr).map(containerJobMetadata -> {
                return containerJobMetadata.container;
            }).filter(container -> {
                return container != null;
            }).map(container2 -> {
                return container2.getClusterId();
            }).map(clusterId -> {
                return clusterId.toString();
            }).collect(Collectors.toList());
        }).orElse(List.of("null")));
    }

    @Override // net.dempsy.threading.ThreadingModel
    public void submitLimited(MessageDeliveryJob messageDeliveryJob) {
        MessageDeliveryJobHolder messageDeliveryJobHolder = new MessageDeliveryJobHolder(messageDeliveryJob, true, this.numLimitedX, this.maxNumWaitingLimitedTasks);
        if (this.inqueue.offer(messageDeliveryJobHolder)) {
            return;
        }
        messageDeliveryJobHolder.reject();
        LOGGER.error("Failed to queue message destined for {}", Optional.ofNullable(messageDeliveryJob.containerData()).map(containerJobMetadataArr -> {
            return (List) Arrays.stream(containerJobMetadataArr).map(containerJobMetadata -> {
                return containerJobMetadata.container;
            }).filter(container -> {
                return container != null;
            }).map(container2 -> {
                return container2.getClusterId();
            }).map(clusterId -> {
                return clusterId.toString();
            }).collect(Collectors.toList());
        }).orElse(List.of("null")));
    }
}
