package de.caluga.morphium.messaging;

import de.caluga.morphium.Morphium;
import de.caluga.morphium.ShutdownListener;
import de.caluga.morphium.async.AsyncOperationCallback;
import de.caluga.morphium.async.AsyncOperationType;
import de.caluga.morphium.changestream.ChangeStreamMonitor;
import de.caluga.morphium.driver.MorphiumId;
import de.caluga.morphium.messaging.Msg;
import de.caluga.morphium.query.MorphiumIterator;
import de.caluga.morphium.query.Query;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/caluga/morphium/messaging/Messaging.class */
public class Messaging extends Thread implements ShutdownListener {
    private static Logger log = LoggerFactory.getLogger(Messaging.class);
    private Morphium morphium;
    private boolean running;
    private int pause;
    private String id;
    private boolean autoAnswer;
    private String hostname;
    private boolean processMultiple;
    private List<MessageListener> listeners;
    private Map<String, Long> pauseMessages;
    private Map<String, List<MessageListener>> listenerByName;
    private String queueName;
    private ThreadPoolExecutor threadPool;
    private ScheduledThreadPoolExecutor decouplePool;
    private boolean multithreadded;
    private int windowSize;
    private boolean useChangeStream;
    private ChangeStreamMonitor changeStreamMonitor;
    private Map<MorphiumId, Msg> waitingForAnswers;
    private Map<MorphiumId, Msg> waitingForMessages;
    private List<MorphiumId> processing;

    public Messaging(Morphium morphium, int i, boolean z) {
        this(morphium, null, i, z);
    }

    public Messaging(Morphium morphium) {
        this(morphium, null, 500, false, false, 100);
    }

    public Messaging(Morphium morphium, int i, boolean z, boolean z2, int i2) {
        this(morphium, null, i, z, z2, i2);
    }

    public Messaging(Morphium morphium, String str, int i, boolean z) {
        this(morphium, str, i, z, false, 1000);
    }

    public Messaging(Morphium morphium, String str, int i, boolean z, boolean z2, int i2) {
        this(morphium, str, i, z, z2, i2, morphium.isReplicaSet());
    }

    public Messaging(Morphium morphium, String str, int i, boolean z, boolean z2, int i2, boolean z3) {
        this.autoAnswer = false;
        this.pauseMessages = new ConcurrentHashMap();
        this.waitingForAnswers = new ConcurrentHashMap();
        this.waitingForMessages = new ConcurrentHashMap();
        this.processing = new Vector();
        this.multithreadded = z2;
        this.windowSize = i2;
        this.morphium = morphium;
        this.useChangeStream = z3;
        if (z2) {
            this.threadPool = new ThreadPoolExecutor(this.morphium.getConfig().getThreadPoolMessagingCoreSize(), this.morphium.getConfig().getThreadPoolMessagingMaxSize(), this.morphium.getConfig().getThreadPoolMessagingKeepAliveTime(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() { // from class: de.caluga.morphium.messaging.Messaging.1
                @Override // java.util.concurrent.LinkedBlockingQueue, java.util.Queue, java.util.concurrent.BlockingQueue
                public boolean offer(Runnable runnable) {
                    int poolSize = Messaging.this.threadPool.getPoolSize();
                    if (poolSize >= Messaging.this.threadPool.getMaximumPoolSize() || poolSize > Messaging.this.threadPool.getActiveCount()) {
                        return super.offer((AnonymousClass1) runnable);
                    }
                    return false;
                }
            });
            this.threadPool.setRejectedExecutionHandler((runnable, threadPoolExecutor) -> {
                try {
                    threadPoolExecutor.getQueue().put(runnable);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
            this.threadPool.setThreadFactory(new ThreadFactory() { // from class: de.caluga.morphium.messaging.Messaging.2
                private AtomicInteger num = new AtomicInteger(1);

                @Override // java.util.concurrent.ThreadFactory
                public Thread newThread(Runnable runnable2) {
                    Thread thread = new Thread(runnable2, "messaging " + this.num);
                    this.num.set(this.num.get() + 1);
                    thread.setDaemon(true);
                    return thread;
                }
            });
        }
        this.decouplePool = new ScheduledThreadPoolExecutor(1);
        this.decouplePool.setThreadFactory(new ThreadFactory() { // from class: de.caluga.morphium.messaging.Messaging.3
            private AtomicInteger num = new AtomicInteger(1);

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable2) {
                Thread thread = new Thread(runnable2, "decouple_thr_" + this.num);
                this.num.set(this.num.get() + 1);
                thread.setDaemon(true);
                return thread;
            }
        });
        this.morphium.addShutdownListener(this);
        this.queueName = str;
        this.running = true;
        this.pause = i;
        this.processMultiple = z;
        this.id = UUID.randomUUID().toString();
        this.hostname = System.getenv("HOSTNAME");
        if (this.hostname == null) {
            try {
                this.hostname = InetAddress.getLocalHost().getHostName();
            } catch (UnknownHostException e) {
            }
        }
        if (this.hostname == null) {
            this.hostname = "unknown host";
        }
        morphium.ensureIndicesFor(Msg.class, getCollectionName());
        this.listeners = new CopyOnWriteArrayList();
        this.listenerByName = new HashMap();
    }

    public long getMessageCount() {
        return this.morphium.createQueryFor(Msg.class).setCollectionName(getCollectionName()).countAll();
    }

    public void removeMessage(Msg msg) {
        this.morphium.delete(msg, getCollectionName());
    }

    public List<Msg> findMessages(Query<Msg> query) {
        try {
            query = query.clone();
        } catch (CloneNotSupportedException e) {
        }
        query.setCollectionName(getCollectionName());
        return query.asList();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        setName("Msg " + this.id);
        if (log.isDebugEnabled()) {
            log.debug("Messaging " + this.id + " started");
        }
        if (this.useChangeStream) {
            log.debug("Before running the changestream monitor - check of already existing messages");
            try {
                findAndProcessPendingMessages(null);
                if (this.multithreadded) {
                    while (this.threadPool.getActiveCount() > 0) {
                        Thread.yield();
                    }
                }
            } catch (Exception e) {
                log.error("Error processing existing messages in queue", e);
            }
            log.debug("init Messaging  using changestream monitor");
            this.changeStreamMonitor = new ChangeStreamMonitor(this.morphium, getCollectionName(), true);
            this.changeStreamMonitor.addListener(changeStreamEvent -> {
                if (changeStreamEvent != null) {
                    try {
                    } catch (Exception e2) {
                        log.error("Error during event processing in changestream", e2);
                    }
                    if (changeStreamEvent.getOperationType() != null) {
                        if (!changeStreamEvent.getOperationType().equals("insert")) {
                            if (changeStreamEvent.getOperationType().equals("update") && changeStreamEvent.getFullDocument() != null && changeStreamEvent.getFullDocument().get("_id") != null) {
                                Msg msg = (Msg) this.morphium.findById(Msg.class, new MorphiumId(changeStreamEvent.getFullDocument().get("_id").toString()), getCollectionName());
                                if (msg == null) {
                                    return this.running;
                                }
                                if (msg.getInAnswerTo() != null && this.waitingForMessages.containsKey(msg.getInAnswerTo())) {
                                    if (msg.isExclusive()) {
                                        lockAndProcess(msg);
                                    } else {
                                        ArrayList arrayList = new ArrayList();
                                        arrayList.add(msg);
                                        try {
                                            processMessages(arrayList);
                                        } catch (Exception e3) {
                                            log.error("Error during message processing ", e3);
                                        }
                                    }
                                }
                                if (this.listenerByName.get(msg.getName()) == null && this.listeners.size() == 0 && (msg.getInAnswerTo() == null || !this.waitingForMessages.containsKey(msg.getInAnswerTo()))) {
                                    return this.running;
                                }
                                if (msg != null && msg.isExclusive() && msg.getLockedBy() == null && !this.pauseMessages.containsKey(msg.getName()) && (msg.getRecipient() == null || msg.getRecipient().equals(this.id))) {
                                    log.debug("Update of msg - trying to lock");
                                    lockAndProcess(msg);
                                }
                            }
                            return this.running;
                        }
                        Msg msg2 = (Msg) this.morphium.getMapper().deserialize(Msg.class, changeStreamEvent.getFullDocument());
                        if (msg2.getInAnswerTo() != null && this.waitingForMessages.containsKey(msg2.getInAnswerTo())) {
                            if (log.isDebugEnabled()) {
                                log.debug("processing answer " + msg2.getMsgId() + " in answer to " + msg2.getInAnswerTo());
                            }
                            ArrayList arrayList2 = new ArrayList();
                            arrayList2.add(msg2);
                            try {
                                processMessages(arrayList2);
                            } catch (Exception e4) {
                                log.error("Error during message processing ", e4);
                            }
                            return this.running;
                        }
                        if (this.listenerByName.get(msg2.getName()) == null && this.listeners.size() == 0) {
                            return this.running;
                        }
                        if (msg2.getSender().equals(this.id) || ((msg2.getProcessedBy() != null && msg2.getProcessedBy().contains(this.id)) || !(msg2.getRecipient() == null || msg2.getRecipient().equals(this.id)))) {
                            return this.running;
                        }
                        if (this.pauseMessages.containsKey(msg2.getName())) {
                            if (log.isDebugEnabled()) {
                                log.debug("Not processing message - processing paused for " + msg2.getName());
                            }
                            return this.running;
                        }
                        if (msg2.isExclusive() && msg2.getLockedBy() == null && ((msg2.getRecipient() == null || msg2.getRecipient().equals(this.id)) && (msg2.getProcessedBy() == null || !msg2.getProcessedBy().contains(this.id)))) {
                            lockAndProcess(msg2);
                        } else if (msg2.isExclusive() && (msg2.getRecipient() == null || !msg2.getRecipient().equals(this.id))) {
                            log.debug("Message is not for me");
                        } else {
                            if (this.processing.contains(msg2.getMsgId())) {
                                return this.running;
                            }
                            ArrayList arrayList3 = new ArrayList();
                            arrayList3.add(msg2);
                            try {
                                processMessages(arrayList3);
                            } catch (Exception e5) {
                                log.error("Error during message processing ", e5);
                            }
                        }
                        return this.running;
                        log.error("Error during event processing in changestream", e2);
                        return this.running;
                    }
                }
                return this.running;
            });
            this.changeStreamMonitor.start();
            return;
        }
        while (this.running) {
            try {
                try {
                    findAndProcessMessages(this.processMultiple);
                    try {
                        sleep(this.pause);
                    } catch (InterruptedException e2) {
                    }
                } catch (Throwable th) {
                    try {
                        sleep(this.pause);
                    } catch (InterruptedException e3) {
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                log.error("Unhandled exception " + th2.getMessage(), th2);
                try {
                    sleep(this.pause);
                } catch (InterruptedException e4) {
                }
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("Messaging " + this.id + " stopped!");
        }
        if (this.running) {
            return;
        }
        this.listeners.clear();
        this.listenerByName.clear();
    }

    public void pauseProcessingOfMessagesNamed(String str) {
        this.pauseMessages.putIfAbsent(str, Long.valueOf(System.currentTimeMillis()));
    }

    public Long unpauseProcessingOfMessagesNamed(String str) {
        processMessages(findMessages(str, this.processMultiple));
        Long remove = this.pauseMessages.remove(str);
        if (remove != null) {
            remove = Long.valueOf(System.currentTimeMillis() - remove.longValue());
        }
        return remove;
    }

    public void findAndProcessPendingMessages(String str) {
        queueOrRun(() -> {
            while (true) {
                MorphiumIterator<Msg> findMessages = findMessages(str, this.processMultiple);
                if (!findMessages.hasNext()) {
                    return;
                }
                processMessages(findMessages);
                try {
                    Thread.sleep(this.pause);
                } catch (InterruptedException e) {
                }
            }
        });
    }

    private MorphiumIterator<Msg> findMessages(String str, boolean z) {
        HashMap hashMap = new HashMap();
        Query createQueryFor = this.morphium.createQueryFor(Msg.class);
        createQueryFor.setCollectionName(getCollectionName());
        Query eq = createQueryFor.q().f(Msg.Fields.sender).ne(this.id).f(Msg.Fields.lockedBy).eq(null).f(Msg.Fields.processedBy).ne(this.id).f(Msg.Fields.recipient).eq(null);
        Query eq2 = createQueryFor.q().f(Msg.Fields.sender).ne(this.id).f(Msg.Fields.processedBy).ne(this.id).f(Msg.Fields.recipient).eq(this.id);
        Set<String> keySet = this.pauseMessages.keySet();
        if (str != null) {
            eq.f(Msg.Fields.name).eq(str);
            eq2.f(Msg.Fields.name).eq(str);
        } else {
            if (!this.pauseMessages.isEmpty()) {
                eq.f(Msg.Fields.name).nin(keySet);
                eq2.f(Msg.Fields.name).nin(keySet);
            }
            if (this.listeners.isEmpty() && !this.listenerByName.isEmpty()) {
                eq.f(Msg.Fields.name).in(this.listenerByName.keySet());
                eq2.f(Msg.Fields.name).in(this.listenerByName.keySet());
            } else if (this.listenerByName.isEmpty() && this.listeners.isEmpty()) {
                return createQueryFor.q().f(Msg.Fields.inAnswerTo).in(this.waitingForMessages.keySet()).asIterable();
            }
        }
        ArrayList arrayList = new ArrayList(this.processing);
        if (!this.processing.isEmpty()) {
            createQueryFor.f("_id").nin(arrayList);
        }
        createQueryFor.or(eq, eq2);
        createQueryFor.sort(Msg.Fields.priority, Msg.Fields.timestamp);
        if (!z) {
            createQueryFor.limit(1);
        }
        hashMap.put("locked_by", this.id);
        hashMap.put("locked", Long.valueOf(System.currentTimeMillis()));
        this.morphium.set(createQueryFor.q().f("_id").in(createQueryFor.idList()), (Map<String, Object>) hashMap, false, z);
        Query q = createQueryFor.q();
        if (str != null) {
            q.f(Msg.Fields.name).eq(str);
        } else if (!this.pauseMessages.isEmpty()) {
            q.f(Msg.Fields.name).nin(keySet);
        }
        q.f("_id").nin(arrayList);
        if (str != null) {
            q.f(Msg.Fields.name).eq(str);
        } else if (!this.pauseMessages.isEmpty()) {
            q.f(Msg.Fields.name).nin(keySet);
        }
        q.or(q.q().f(Msg.Fields.lockedBy).eq(this.id), q.q().f(Msg.Fields.lockedBy).eq("ALL").f(Msg.Fields.processedBy).ne(this.id).f(Msg.Fields.recipient).eq(this.id), q.q().f(Msg.Fields.lockedBy).eq("ALL").f(Msg.Fields.processedBy).ne(this.id).f(Msg.Fields.recipient).eq(null));
        q.sort(Msg.Fields.priority, Msg.Fields.timestamp);
        if (!z) {
            q.limit(1);
        }
        MorphiumIterator<Msg> asIterable = q.asIterable(this.windowSize);
        asIterable.setMultithreaddedAccess(this.multithreadded);
        return asIterable;
    }

    private void findAndProcessMessages(boolean z) {
        processMessages(findMessages(null, z));
    }

    private void lockAndProcess(Msg msg) {
        Query<?> createQueryFor = this.morphium.createQueryFor(Msg.class);
        createQueryFor.setCollectionName(getCollectionName());
        createQueryFor.f("_id").eq(msg.getMsgId());
        createQueryFor.f(Msg.Fields.processedBy).ne(this.id);
        createQueryFor.f(Msg.Fields.lockedBy).eq(null);
        HashMap hashMap = new HashMap();
        hashMap.put("locked_by", this.id);
        hashMap.put("locked", Long.valueOf(System.currentTimeMillis()));
        this.morphium.set(createQueryFor, (Map<String, Object>) hashMap, false, this.processMultiple);
        try {
            Thread.sleep(10L);
        } catch (InterruptedException e) {
        }
        Msg msg2 = (Msg) this.morphium.reread(msg, getCollectionName());
        if (msg2 == null || msg2.getLockedBy() == null || !msg2.getLockedBy().equals(this.id)) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add(msg2);
        if (log.isDebugEnabled()) {
            log.debug("locked messages: " + arrayList.size());
        }
        try {
            processMessages(arrayList);
        } catch (Exception e2) {
            log.error("Error during message processing ", e2);
        }
    }

    private void processMessages(Iterable<Msg> iterable) {
        Msg msg;
        for (Msg msg2 : iterable) {
            if (msg2 != null && (msg = (Msg) this.morphium.reread(msg2, getCollectionName())) != null) {
                if (msg.getInAnswerTo() != null && this.waitingForMessages.get(msg.getInAnswerTo()) != null) {
                    if (log.isDebugEnabled()) {
                        log.debug(getSenderId() + ": Got a message, we are waiting for...");
                    }
                    this.waitingForAnswers.put((MorphiumId) msg.getInAnswerTo(), msg);
                    this.processing.remove(msg2.getMsgId());
                    this.morphium.delete(msg, getCollectionName());
                    return;
                }
                if (this.listeners.isEmpty() && this.listenerByName.isEmpty()) {
                    updateProcessedByAndReleaseLock(msg);
                    log.error(getSenderId() + ": should not be here. not processing message, as no listeners are defined " + msg.getMsgId());
                    return;
                } else if (!this.processing.contains(msg2.getMsgId())) {
                    this.processing.add(msg2.getMsgId());
                    queueOrRun(() -> {
                        if (msg2.getProcessedBy() == null || !msg2.getProcessedBy().contains(this.id)) {
                            if (msg == null) {
                                this.processing.remove(msg2.getMsgId());
                                return;
                            }
                            if (msg.getProcessedBy() != null && msg.getProcessedBy().contains(this.id)) {
                                this.processing.remove(msg2.getMsgId());
                                return;
                            }
                            if (!msg.getLockedBy().equals(this.id) && !msg.getLockedBy().equals("ALL")) {
                                this.processing.remove(msg2.getMsgId());
                                return;
                            }
                            if (msg.getTtl() < System.currentTimeMillis() - msg.getTimestamp()) {
                                if (log.isDebugEnabled()) {
                                    log.debug(getSenderId() + ": Found outdated message - deleting it!");
                                }
                                this.morphium.delete(msg, getCollectionName());
                                this.processing.remove(msg2.getMsgId());
                                return;
                            }
                            boolean z = false;
                            boolean z2 = false;
                            ArrayList<MessageRejectedException> arrayList = new ArrayList();
                            ArrayList arrayList2 = new ArrayList(this.listeners);
                            if (this.listenerByName.get(msg.getName()) != null) {
                                arrayList2.addAll(this.listenerByName.get(msg.getName()));
                            }
                            if (arrayList2.isEmpty()) {
                                if (log.isDebugEnabled()) {
                                    log.debug(getSenderId() + ": Message did not have a listener registered");
                                }
                                z = true;
                            }
                            Iterator it = arrayList2.iterator();
                            while (it.hasNext()) {
                                try {
                                    Msg onMessage = ((MessageListener) it.next()).onMessage(this, msg);
                                    z = true;
                                    if (this.autoAnswer && onMessage == null) {
                                        onMessage = new Msg(msg.getName(), "received", "");
                                    }
                                    if (onMessage != null) {
                                        msg.sendAnswer(this, onMessage);
                                        if (log.isDebugEnabled()) {
                                            log.debug("sent answer to " + onMessage.getInAnswerTo() + " recipient: " + onMessage.getRecipient() + " id: " + onMessage.getMsgId());
                                        }
                                        if (onMessage.getRecipient() == null) {
                                            log.warn("Recipient of answer is null?!?!");
                                        }
                                    }
                                } catch (MessageRejectedException e) {
                                    log.warn("Message was rejected by listener", e);
                                    z2 = true;
                                    arrayList.add(e);
                                } catch (Exception e2) {
                                    log.error("listener Processing failed", e2);
                                }
                            }
                            if (z2) {
                                for (MessageRejectedException messageRejectedException : arrayList) {
                                    if (messageRejectedException.isSendAnswer()) {
                                        msg.sendAnswer(this, new Msg(msg.getName(), "message rejected by listener", messageRejectedException.getMessage()));
                                    }
                                    if (messageRejectedException.isContinueProcessing()) {
                                        updateProcessedByAndReleaseLock(msg);
                                        this.processing.remove(msg2.getMsgId());
                                        log.info("Message will be re-processed by others");
                                    }
                                }
                            }
                            if (!z && !z2) {
                                log.error("message was not processed");
                            } else if (z2) {
                                log.debug("Message rejected");
                            }
                            if ((msg.getLockedBy() == null || !msg.getLockedBy().equals("ALL")) && (msg.getRecipient() == null || !msg.getRecipient().equals(this.id) || msg.getInAnswerTo() == null)) {
                                this.morphium.delete(msg, getCollectionName());
                            } else {
                                updateProcessedByAndReleaseLock(msg);
                            }
                            Runnable runnable = () -> {
                                this.processing.remove(msg2.getMsgId());
                            };
                            while (!this.decouplePool.isTerminated() && !this.decouplePool.isTerminating() && !this.decouplePool.isShutdown()) {
                                try {
                                    this.decouplePool.schedule(runnable, msg2.getTtl(), TimeUnit.MILLISECONDS);
                                } catch (RejectedExecutionException e3) {
                                    try {
                                        Thread.sleep(this.pause);
                                    } catch (InterruptedException e4) {
                                    }
                                }
                            }
                        }
                    });
                }
            }
        }
        while (this.morphium.getWriteBufferCount() > 0) {
            Thread.yield();
        }
    }

    private void updateProcessedByAndReleaseLock(Msg msg) {
        Query<?> createQueryFor = this.morphium.createQueryFor(Msg.class);
        createQueryFor.setCollectionName(getCollectionName());
        createQueryFor.f(Msg.Fields.msgId).eq(msg.getMsgId());
        if (msg.getLockedBy().equals(this.id)) {
            this.morphium.set((Query) createQueryFor, (Enum) Msg.Fields.lockedBy, (Object) null);
        }
        this.morphium.push(createQueryFor, Msg.Fields.processedBy, this.id);
    }

    private void queueOrRun(Runnable runnable) {
        if (!this.multithreadded) {
            runnable.run();
            return;
        }
        boolean z = false;
        while (!z) {
            try {
                this.threadPool.execute(runnable);
                z = true;
            } catch (Throwable th) {
            }
        }
        while (this.threadPool.getActiveCount() > this.windowSize) {
            Thread.yield();
        }
    }

    public String getCollectionName() {
        return (this.queueName == null || this.queueName.isEmpty()) ? "msg" : "mmsg_" + this.queueName;
    }

    public void addListenerForMessageNamed(String str, MessageListener messageListener) {
        if (this.listenerByName.get(str) == null) {
            HashMap hashMap = (HashMap) ((HashMap) this.listenerByName).clone();
            hashMap.put(str, new ArrayList());
            this.listenerByName = hashMap;
        }
        this.listenerByName.get(str).add(messageListener);
    }

    public void removeListenerForMessageNamed(String str, MessageListener messageListener) {
        if (this.listenerByName.get(str) == null) {
            return;
        }
        HashMap hashMap = (HashMap) ((HashMap) this.listenerByName).clone();
        ((List) hashMap.get(str)).remove(messageListener);
        this.listenerByName = hashMap;
    }

    public String getSenderId() {
        return this.id;
    }

    public Messaging setSenderId(String str) {
        this.id = str;
        return this;
    }

    public int getPause() {
        return this.pause;
    }

    public Messaging setPause(int i) {
        this.pause = i;
        return this;
    }

    public boolean isRunning() {
        return this.useChangeStream ? this.changeStreamMonitor != null && this.changeStreamMonitor.isRunning() : this.running;
    }

    @Deprecated
    public void setRunning(boolean z) {
        if (!z && this.changeStreamMonitor != null) {
            this.changeStreamMonitor.stop();
        }
        this.running = z;
    }

    public void terminate() {
        this.running = false;
        if (this.decouplePool != null) {
            int size = this.decouplePool.shutdownNow().size();
            if (log.isDebugEnabled()) {
                log.debug("Shutting down with " + size + " runnables still scheduled");
            }
        }
        if (this.threadPool != null) {
            int size2 = this.threadPool.shutdownNow().size();
            if (log.isDebugEnabled()) {
                log.debug("Shutting down with " + size2 + " runnables still pending in pool");
            }
        }
        if (this.changeStreamMonitor != null) {
            this.changeStreamMonitor.stop();
        }
        sendMessageToSelf(new Msg("info", "going down", "now"));
        if (isAlive()) {
            interrupt();
        }
        if (isAlive()) {
            stop();
        }
    }

    public void addMessageListener(MessageListener messageListener) {
        this.listeners.add(messageListener);
    }

    public void removeMessageListener(MessageListener messageListener) {
        this.listeners.remove(messageListener);
    }

    public void queueMessage(Msg msg) {
        storeMsg(msg, true);
    }

    @Override // java.lang.Thread
    public synchronized void start() {
        super.start();
        if (this.useChangeStream) {
            try {
                Thread.sleep(250L);
            } catch (Exception e) {
                log.error("error:" + e.getMessage());
            }
        }
    }

    public void storeMessage(Msg msg) {
        storeMsg(msg, false);
    }

    public long getNumberOfMessages() {
        Query createQueryFor = this.morphium.createQueryFor(Msg.class);
        createQueryFor.setCollectionName(getCollectionName());
        return createQueryFor.countAll();
    }

    private void storeMsg(Msg msg, boolean z) {
        AsyncOperationCallback asyncOperationCallback = z ? new AsyncOperationCallback() { // from class: de.caluga.morphium.messaging.Messaging.4
            @Override // de.caluga.morphium.async.AsyncOperationCallback
            public void onOperationSucceeded(AsyncOperationType asyncOperationType, Query query, long j, List list, Object obj, Object... objArr) {
            }

            @Override // de.caluga.morphium.async.AsyncOperationCallback
            public void onOperationError(AsyncOperationType asyncOperationType, Query query, long j, String str, Throwable th, Object obj, Object... objArr) {
                Messaging.log.error("Error storing msg", th);
            }
        } : null;
        msg.setSender(this.id);
        msg.addProcessedId(this.id);
        msg.setSenderHost(this.hostname);
        if (msg.getTo() == null || msg.getTo().isEmpty()) {
            this.morphium.storeNoCache(msg, getCollectionName(), asyncOperationCallback);
            return;
        }
        for (String str : msg.getTo()) {
            try {
                Msg msg2 = (Msg) msg.getClass().getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                for (Field field : this.morphium.getARHelper().getAllFields(msg.getClass())) {
                    field.setAccessible(true);
                    field.set(msg2, field.get(msg));
                }
                msg2.setMsgId(null);
                msg2.setRecipient(str);
                this.morphium.storeNoCache(msg2, getCollectionName(), asyncOperationCallback);
            } catch (Exception e) {
                throw new RuntimeException("Sending of answer failed", e);
            }
        }
    }

    public void sendMessageToSelf(Msg msg) {
        sendMessageToSelf(msg, false);
    }

    public void queueMessagetoSelf(Msg msg) {
        sendMessageToSelf(msg, true);
    }

    private void sendMessageToSelf(Msg msg, boolean z) {
        if (z) {
        }
        msg.setSender("self");
        msg.setRecipient(this.id);
        msg.setSenderHost(this.hostname);
        this.morphium.storeNoCache((Morphium) msg, getCollectionName());
    }

    public boolean isAutoAnswer() {
        return this.autoAnswer;
    }

    public Messaging setAutoAnswer(boolean z) {
        this.autoAnswer = z;
        return this;
    }

    @Override // de.caluga.morphium.ShutdownListener
    public void onShutdown(Morphium morphium) {
        try {
            this.running = false;
            if (this.threadPool != null) {
                this.threadPool.shutdown();
                Thread.sleep(200L);
                if (this.threadPool != null) {
                    this.threadPool.shutdownNow();
                }
                this.threadPool = null;
            }
            if (this.changeStreamMonitor != null) {
                this.changeStreamMonitor.stop();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public <T extends Msg> T sendAndAwaitFirstAnswer(T t, long j) {
        t.setMsgId(new MorphiumId());
        this.waitingForMessages.put(t.getMsgId(), t);
        storeMessage(t);
        long currentTimeMillis = System.currentTimeMillis();
        while (!this.waitingForAnswers.containsKey(t.getMsgId())) {
            if (System.currentTimeMillis() - currentTimeMillis > j) {
                log.error("Did not receive answer " + t.getName() + "/" + t.getMsgId() + " in time (" + j + "ms)");
                this.waitingForMessages.remove(t.getMsgId());
                throw new RuntimeException("Did not receive answer for message " + t.getName() + "/" + t.getMsgId() + " in time (" + j + "ms)");
            }
            Thread.yield();
        }
        if (log.isDebugEnabled()) {
            log.debug("got message after: " + (System.currentTimeMillis() - currentTimeMillis) + "ms");
        }
        this.waitingForMessages.remove(t.getMsgId());
        return (T) this.waitingForAnswers.remove(t.getMsgId());
    }

    public <T extends Msg> List<T> sendAndAwaitAnswers(T t, int i, long j) {
        ArrayList arrayList = new ArrayList();
        storeMessage(t);
        this.waitingForMessages.put(t.getMsgId(), t);
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            if (this.waitingForAnswers.get(t.getMsgId()) != null) {
                arrayList.add(this.waitingForAnswers.remove(t.getMsgId()));
            }
            if ((i < 0 || arrayList.size() < i) && System.currentTimeMillis() - currentTimeMillis <= j) {
                Thread.yield();
            }
        }
        this.waitingForMessages.remove(t.getMsgId());
        return arrayList;
    }

    public boolean isProcessMultiple() {
        return this.processMultiple;
    }

    public Messaging setProcessMultiple(boolean z) {
        this.processMultiple = z;
        return this;
    }

    public String getQueueName() {
        return this.queueName;
    }

    public Messaging setQueueName(String str) {
        this.queueName = str;
        return this;
    }

    public boolean isMultithreadded() {
        return this.multithreadded;
    }

    public Messaging setMultithreadded(boolean z) {
        this.multithreadded = z;
        return this;
    }

    public int getWindowSize() {
        return this.windowSize;
    }

    public Messaging setWindowSize(int i) {
        this.windowSize = i;
        return this;
    }

    public boolean isUseChangeStream() {
        return this.useChangeStream;
    }

    public Messaging setUseChangeStream(boolean z) {
        this.useChangeStream = z;
        return this;
    }
}
