package de.caluga.morphium.messaging;

import de.caluga.morphium.Morphium;
import de.caluga.morphium.ShutdownListener;
import de.caluga.morphium.async.AsyncOperationCallback;
import de.caluga.morphium.async.AsyncOperationType;
import de.caluga.morphium.driver.MorphiumId;
import de.caluga.morphium.messaging.Msg;
import de.caluga.morphium.query.MorphiumIterator;
import de.caluga.morphium.query.Query;
import de.caluga.morphium.replicaset.OplogListener;
import de.caluga.morphium.replicaset.OplogMonitor;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/caluga/morphium/messaging/Messaging.class */
public class Messaging extends Thread implements ShutdownListener {
    private static Logger log = LoggerFactory.getLogger(Messaging.class);
    private Morphium morphium;
    private boolean running;
    private int pause;
    private String id;
    private boolean autoAnswer;
    private String hostname;
    private boolean processMultiple;
    private List<MessageListener> listeners;
    private Map<String, List<MessageListener>> listenerByName;
    private String queueName;
    private ThreadPoolExecutor threadPool;
    private boolean multithreadded;
    private int windowSize;
    private boolean useOplogMonitor;
    private OplogMonitor oplogMonitor;

    public Messaging(Morphium morphium, int i, boolean z) {
        this(morphium, null, i, z);
    }

    public Messaging(Morphium morphium, int i, boolean z, boolean z2, int i2) {
        this(morphium, null, i, z, z2, i2);
    }

    public Messaging(Morphium morphium, String str, int i, boolean z) {
        this(morphium, str, i, z, false, 1000);
    }

    public Messaging(Morphium morphium, String str, int i, boolean z, boolean z2, int i2) {
        this(morphium, str, i, z, z2, i2, morphium.isReplicaSet());
    }

    public Messaging(Morphium morphium, String str, int i, boolean z, boolean z2, int i2, boolean z3) {
        this.pause = 5000;
        this.autoAnswer = false;
        this.processMultiple = false;
        this.multithreadded = false;
        this.windowSize = 1000;
        this.useOplogMonitor = false;
        this.multithreadded = z2;
        this.windowSize = i2;
        this.morphium = morphium;
        this.useOplogMonitor = z3;
        if (z2) {
            this.threadPool = new ThreadPoolExecutor(this.morphium.getConfig().getThreadPoolMessagingCoreSize(), this.morphium.getConfig().getThreadPoolMessagingMaxSize(), this.morphium.getConfig().getThreadPoolMessagingKeepAliveTime(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
            this.threadPool.setThreadFactory(new ThreadFactory() { // from class: de.caluga.morphium.messaging.Messaging.1
                private AtomicInteger num = new AtomicInteger(1);

                @Override // java.util.concurrent.ThreadFactory
                public Thread newThread(Runnable runnable) {
                    Thread thread = new Thread(runnable, "messaging " + this.num);
                    this.num.set(this.num.get() + 1);
                    thread.setDaemon(true);
                    return thread;
                }
            });
        }
        this.morphium.addShutdownListener(this);
        this.queueName = str;
        this.running = true;
        this.pause = i;
        this.processMultiple = z;
        this.id = UUID.randomUUID().toString();
        this.hostname = System.getenv("HOSTNAME");
        if (this.hostname == null) {
            try {
                this.hostname = InetAddress.getLocalHost().getHostName();
            } catch (UnknownHostException e) {
            }
        }
        if (this.hostname == null) {
            this.hostname = "unknown host";
        }
        morphium.ensureIndicesFor(Msg.class, str);
        this.listeners = new CopyOnWriteArrayList();
        this.listenerByName = new HashMap();
    }

    public long getMessageCount() {
        return this.morphium.createQueryFor(Msg.class).setCollectionName(getCollectionName()).countAll();
    }

    public void removeMessage(Msg msg) {
        this.morphium.delete(msg, getCollectionName());
    }

    public List<Msg> findMessages(Query<Msg> query) {
        try {
            query = query.clone();
        } catch (CloneNotSupportedException e) {
        }
        query.setCollectionName(getCollectionName());
        return query.asList();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        if (log.isDebugEnabled()) {
            log.debug("Messaging " + this.id + " started");
        }
        if (this.useOplogMonitor) {
            log.info("init Messaging  using oplogmonitor");
            this.oplogMonitor = new OplogMonitor(this.morphium, getCollectionName(), false);
            this.oplogMonitor.addListener(new OplogListener() { // from class: de.caluga.morphium.messaging.Messaging.2
                @Override // de.caluga.morphium.replicaset.OplogListener
                public void incomingData(Map<String, Object> map) {
                    Msg msg;
                    if (!map.get("op").equals("i")) {
                        if (map.get("op").equals("u") && (msg = (Msg) Messaging.this.morphium.findById(Msg.class, new MorphiumId(((Map) map.get("o2")).get("_id").toString()))) != null && msg.isExclusive() && msg.getLockedBy() == null) {
                            if (msg.getRecipient() == null || msg.getRecipient().equals(Messaging.this.id)) {
                                Messaging.log.debug("Update of msg - trying to lock");
                                Messaging.this.lockAndProcess(msg);
                                return;
                            }
                            return;
                        }
                        return;
                    }
                    Msg msg2 = (Msg) Messaging.this.morphium.getMapper().unmarshall(Msg.class, (Map<String, Object>) map.get("o"));
                    if (msg2.getSender().equals(Messaging.this.id)) {
                        return;
                    }
                    if (msg2.getProcessedBy() == null || !msg2.getProcessedBy().contains(Messaging.this.id)) {
                        if (msg2.getRecipient() == null || msg2.getRecipient().equals(Messaging.this.id)) {
                            if (msg2.isExclusive() && msg2.getLockedBy() == null && ((msg2.getRecipient() == null || msg2.getRecipient().equals(Messaging.this.id)) && (msg2.getProcessedBy() == null || !msg2.getProcessedBy().contains(Messaging.this.id)))) {
                                Messaging.log.info("trying to lock exclusive message");
                                Messaging.this.lockAndProcess(msg2);
                                return;
                            }
                            if (msg2.isExclusive() && (msg2.getRecipient() == null || !msg2.getRecipient().equals(Messaging.this.id))) {
                                Messaging.log.debug("Message is not for me");
                                return;
                            }
                            ArrayList arrayList = new ArrayList();
                            arrayList.add(msg2);
                            try {
                                Messaging.this.processMessages(arrayList);
                            } catch (Exception e) {
                                Messaging.log.error("Error during message processing ", e);
                            }
                        }
                    }
                }
            });
            this.oplogMonitor.start();
            return;
        }
        HashMap hashMap = new HashMap();
        while (this.running) {
            hashMap.clear();
            try {
                try {
                    Query<?> createQueryFor = this.morphium.createQueryFor(Msg.class);
                    createQueryFor.setCollectionName(getCollectionName());
                    createQueryFor.or(createQueryFor.q().f(Msg.Fields.sender).ne(this.id).f(Msg.Fields.lockedBy).eq(null).f(Msg.Fields.processedBy).ne(this.id).f(Msg.Fields.recipient).eq(null), createQueryFor.q().f(Msg.Fields.sender).ne(this.id).f(Msg.Fields.lockedBy).eq(null).f(Msg.Fields.processedBy).ne(this.id).f(Msg.Fields.recipient).eq(this.id));
                    hashMap.put("locked_by", this.id);
                    hashMap.put("locked", Long.valueOf(System.currentTimeMillis()));
                    this.morphium.set(createQueryFor, (Map<String, Object>) hashMap, false, this.processMultiple);
                    Query<?> q = createQueryFor.q();
                    q.or(q.q().f(Msg.Fields.lockedBy).eq(this.id), q.q().f(Msg.Fields.lockedBy).eq("ALL").f(Msg.Fields.processedBy).ne(this.id).f(Msg.Fields.recipient).eq(this.id), q.q().f(Msg.Fields.lockedBy).eq("ALL").f(Msg.Fields.processedBy).ne(this.id).f(Msg.Fields.recipient).eq(null));
                    q.sort(Msg.Fields.timestamp);
                    MorphiumIterator<?> asIterable = q.asIterable(this.windowSize);
                    asIterable.setMultithreaddedAccess(this.multithreadded);
                    processMessages(asIterable);
                    try {
                        sleep(this.pause);
                    } catch (InterruptedException e) {
                    }
                } catch (Throwable th) {
                    try {
                        sleep(this.pause);
                    } catch (InterruptedException e2) {
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                log.error("Unhandled exception " + th2.getMessage(), th2);
                try {
                    sleep(this.pause);
                } catch (InterruptedException e3) {
                }
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("Messaging " + this.id + " stopped!");
        }
        if (this.running) {
            return;
        }
        this.listeners.clear();
        this.listenerByName.clear();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void lockAndProcess(Msg msg) {
        Query<?> createQueryFor = this.morphium.createQueryFor(Msg.class);
        createQueryFor.f("_id").eq(msg.getMsgId());
        HashMap hashMap = new HashMap();
        hashMap.put("locked_by", this.id);
        hashMap.put("locked", Long.valueOf(System.currentTimeMillis()));
        this.morphium.set(createQueryFor, (Map<String, Object>) hashMap, false, false);
        try {
            Thread.sleep(10L);
        } catch (InterruptedException e) {
        }
        Msg msg2 = (Msg) this.morphium.reread(msg);
        if (msg2 == null || msg2.getLockedBy() == null || !msg2.getLockedBy().equals(this.id)) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add(msg2);
        try {
            processMessages(arrayList);
        } catch (Exception e2) {
            log.error("Error during message processing ", e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processMessages(Iterable<Msg> iterable) throws InterruptedException {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        for (Msg msg : iterable) {
            if (msg != null) {
                queueOrRun(() -> {
                    if (msg.getProcessedBy() != null && msg.getProcessedBy().contains(this.id)) {
                        log.error("Was already processed - ERROR?");
                        throw new RuntimeException("was already processed - error on mongo query result!");
                    }
                    Msg msg2 = (Msg) this.morphium.reread(msg, getCollectionName());
                    if (msg2 == null) {
                        return;
                    }
                    if (msg2.getProcessedBy() != null && msg2.getProcessedBy().contains(this.id)) {
                        log.info("Was already processed - multithreadding?");
                        return;
                    }
                    if (msg2.getLockedBy().equals(this.id) || msg2.getLockedBy().equals("ALL")) {
                        if (msg2.getTtl() < System.currentTimeMillis() - msg2.getTimestamp()) {
                            log.info("Found outdated message - deleting it!");
                            this.morphium.delete(msg2, getCollectionName());
                            return;
                        }
                        try {
                            Iterator<MessageListener> it = this.listeners.iterator();
                            while (it.hasNext()) {
                                Msg onMessage = it.next().onMessage(this, msg2);
                                if (this.autoAnswer && onMessage == null) {
                                    onMessage = new Msg(msg2.getName(), "received", "");
                                }
                                if (onMessage != null) {
                                    msg2.sendAnswer(this, onMessage);
                                }
                            }
                            if (this.listenerByName.get(msg2.getName()) != null) {
                                Iterator<MessageListener> it2 = this.listenerByName.get(msg2.getName()).iterator();
                                while (it2.hasNext()) {
                                    Msg onMessage2 = it2.next().onMessage(this, msg2);
                                    if (this.autoAnswer && onMessage2 == null) {
                                        onMessage2 = new Msg(msg2.getName(), "received", "");
                                    }
                                    if (onMessage2 != null) {
                                        msg2.setDeleteAt(new Date(System.currentTimeMillis() + msg2.getTtl()));
                                        msg2.sendAnswer(this, onMessage2);
                                    }
                                }
                            }
                        } catch (MessageRejectedException e) {
                            log.error("Message rejected by listener: " + e.getMessage());
                            if (e.isSendAnswer()) {
                                Msg msg3 = new Msg(msg2.getName(), "message rejected by listener", e.getMessage());
                                msg2.setDeleteAt(new Date(System.currentTimeMillis() + msg2.getTtl()));
                                msg2.sendAnswer(this, msg3);
                            }
                            if (e.isContinueProcessing()) {
                                updateProcessedByAndReleaseLock(msg2);
                                return;
                            }
                        } catch (Throwable th) {
                            log.error("Processing failed", th);
                        }
                        if (msg2.getLockedBy().equals("ALL")) {
                            arrayList2.add(() -> {
                                updateProcessedByAndReleaseLock(msg2);
                            });
                        } else {
                            arrayList3.add(msg2);
                        }
                    }
                });
            }
        }
        if (this.multithreadded) {
            while (this.threadPool != null && this.threadPool.getActiveCount() > 0) {
                Thread.yield();
            }
        }
        this.morphium.storeList(arrayList, getCollectionName());
        this.morphium.delete((List) arrayList3, getCollectionName());
        arrayList2.forEach(this::queueOrRun);
        while (this.morphium.getWriteBufferCount() > 0) {
            Thread.sleep(100L);
        }
    }

    private void updateProcessedByAndReleaseLock(Msg msg) {
        Query<?> createQueryFor = this.morphium.createQueryFor(Msg.class);
        createQueryFor.setCollectionName(getCollectionName());
        createQueryFor.f(Msg.Fields.msgId).eq(msg.getMsgId());
        if (msg.getLockedBy().equals(this.id)) {
            this.morphium.set((Query) createQueryFor, (Enum) Msg.Fields.lockedBy, (Object) null);
        }
        this.morphium.push(createQueryFor, Msg.Fields.processedBy, this.id);
    }

    private void queueOrRun(Runnable runnable) {
        if (!this.multithreadded) {
            runnable.run();
            return;
        }
        boolean z = false;
        while (!z) {
            try {
                this.threadPool.execute(runnable);
                z = true;
            } catch (Throwable th) {
            }
        }
        while (this.threadPool.getActiveCount() > this.windowSize) {
            Thread.yield();
        }
    }

    public String getCollectionName() {
        return (this.queueName == null || this.queueName.isEmpty()) ? "msg" : "mmsg_" + this.queueName;
    }

    public void addListenerForMessageNamed(String str, MessageListener messageListener) {
        if (this.listenerByName.get(str) == null) {
            HashMap hashMap = (HashMap) ((HashMap) this.listenerByName).clone();
            hashMap.put(str, new ArrayList());
            this.listenerByName = hashMap;
        }
        this.listenerByName.get(str).add(messageListener);
    }

    public void removeListenerForMessageNamed(String str, MessageListener messageListener) {
        if (this.listenerByName.get(str) == null) {
            return;
        }
        HashMap hashMap = (HashMap) ((HashMap) this.listenerByName).clone();
        ((List) hashMap.get(str)).remove(messageListener);
        this.listenerByName = hashMap;
    }

    public String getSenderId() {
        return this.id;
    }

    public void setSenderId(String str) {
        this.id = str;
    }

    public int getPause() {
        return this.pause;
    }

    public void setPause(int i) {
        this.pause = i;
    }

    public boolean isRunning() {
        return this.useOplogMonitor ? this.oplogMonitor != null && this.oplogMonitor.isRunning() : this.running;
    }

    @Deprecated
    public void setRunning(boolean z) {
        if (!z && this.oplogMonitor != null) {
            this.oplogMonitor.stop();
        }
        this.running = z;
    }

    public void terminate() {
        this.running = false;
        if (this.oplogMonitor != null) {
            this.oplogMonitor.stop();
        }
    }

    public void addMessageListener(MessageListener messageListener) {
        this.listeners.add(messageListener);
    }

    public void removeMessageListener(MessageListener messageListener) {
        this.listeners.remove(messageListener);
    }

    public void queueMessage(Msg msg) {
        storeMsg(msg, true);
    }

    public void storeMessage(Msg msg) {
        storeMsg(msg, false);
    }

    public long getNumberOfMessages() {
        Query createQueryFor = this.morphium.createQueryFor(Msg.class);
        createQueryFor.setCollectionName(getCollectionName());
        return createQueryFor.countAll();
    }

    private void storeMsg(Msg msg, boolean z) {
        AsyncOperationCallback asyncOperationCallback = z ? new AsyncOperationCallback() { // from class: de.caluga.morphium.messaging.Messaging.3
            @Override // de.caluga.morphium.async.AsyncOperationCallback
            public void onOperationSucceeded(AsyncOperationType asyncOperationType, Query query, long j, List list, Object obj, Object... objArr) {
            }

            @Override // de.caluga.morphium.async.AsyncOperationCallback
            public void onOperationError(AsyncOperationType asyncOperationType, Query query, long j, String str, Throwable th, Object obj, Object... objArr) {
            }
        } : null;
        msg.setDeleteAt(new Date(System.currentTimeMillis() + msg.getTtl()));
        msg.setSender(this.id);
        msg.addProcessedId(this.id);
        msg.setSenderHost(this.hostname);
        if (msg.getTo() == null || msg.getTo().isEmpty()) {
            this.morphium.storeNoCache(msg, getCollectionName(), asyncOperationCallback);
            return;
        }
        for (String str : msg.getTo()) {
            Msg copy = msg.getCopy();
            copy.setRecipient(str);
            this.morphium.storeNoCache(copy, getCollectionName(), asyncOperationCallback);
        }
    }

    public boolean isAutoAnswer() {
        return this.autoAnswer;
    }

    public void setAutoAnswer(boolean z) {
        this.autoAnswer = z;
    }

    @Override // de.caluga.morphium.ShutdownListener
    public void onShutdown(Morphium morphium) {
        try {
            if (this.threadPool != null) {
                this.threadPool.shutdownNow();
                this.threadPool = null;
            }
            if (this.oplogMonitor != null) {
                this.oplogMonitor.stop();
            }
        } catch (Exception e) {
        }
    }
}
