package asia.stampy.server.listener.subscription;

import asia.stampy.client.message.ack.AckHeader;
import asia.stampy.client.message.ack.AckMessage;
import asia.stampy.client.message.nack.NackHeader;
import asia.stampy.client.message.nack.NackMessage;
import asia.stampy.common.StampyLibrary;
import asia.stampy.common.gateway.AbstractStampyMessageGateway;
import asia.stampy.common.gateway.HostPort;
import asia.stampy.common.gateway.StampyMessageListener;
import asia.stampy.common.message.StampyMessage;
import asia.stampy.common.message.StompMessageType;
import asia.stampy.common.message.interceptor.AbstractOutgoingMessageInterceptor;
import asia.stampy.common.message.interceptor.InterceptException;
import asia.stampy.server.message.message.MessageHeader;
import asia.stampy.server.message.message.MessageMessage;
import java.util.Map;
import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.lang.StringUtils;

@StampyLibrary(libraryName = "stampy-client-server")
/* loaded from: input_file:asia/stampy/server/listener/subscription/AbstractAcknowledgementListenerAndInterceptor.class */
public abstract class AbstractAcknowledgementListenerAndInterceptor<SVR extends AbstractStampyMessageGateway> extends AbstractOutgoingMessageInterceptor<SVR> implements StampyMessageListener {
    private static final StompMessageType[] TYPES = {StompMessageType.ACK, StompMessageType.NACK, StompMessageType.MESSAGE};
    private StampyAcknowledgementHandler handler;
    protected Map<HostPort, Queue<String>> messages = new ConcurrentHashMap();
    private Timer ackTimer = new Timer("Stampy Acknowledgement Timer", true);
    private long ackTimeoutMillis = 60000;

    @Override // asia.stampy.common.message.interceptor.StampyOutgoingMessageInterceptor, asia.stampy.common.gateway.StampyMessageListener
    public StompMessageType[] getMessageTypes() {
        return TYPES;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // asia.stampy.common.message.interceptor.StampyOutgoingMessageInterceptor, asia.stampy.common.gateway.StampyMessageListener
    public boolean isForMessage(StampyMessage<?> stampyMessage) {
        switch (stampyMessage.getMessageType()) {
            case MESSAGE:
                return StringUtils.isNotEmpty(((MessageHeader) ((MessageMessage) stampyMessage).getHeader()).getAck());
            case ACK:
            case NACK:
                return true;
            default:
                return false;
        }
    }

    @Override // asia.stampy.common.gateway.StampyMessageListener
    public void messageReceived(StampyMessage<?> stampyMessage, HostPort hostPort) throws Exception {
        switch (stampyMessage.getMessageType()) {
            case ACK:
                evaluateAck(((AckMessage) stampyMessage).getHeader(), hostPort);
                return;
            case NACK:
                evaluateNack(((NackMessage) stampyMessage).getHeader(), hostPort);
                return;
            default:
                return;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // asia.stampy.common.message.interceptor.StampyOutgoingMessageInterceptor
    public void interceptMessage(StampyMessage<?> stampyMessage, HostPort hostPort) throws InterceptException {
        String ack = ((MessageHeader) ((MessageMessage) stampyMessage).getHeader()).getAck();
        Queue<String> queue = this.messages.get(hostPort);
        if (queue == null) {
            queue = new ConcurrentLinkedQueue();
            this.messages.put(hostPort, queue);
        }
        queue.add(ack);
        startTimerTask(hostPort, ack);
    }

    private void startTimerTask(final HostPort hostPort, final String str) {
        this.ackTimer.schedule(new TimerTask() { // from class: asia.stampy.server.listener.subscription.AbstractAcknowledgementListenerAndInterceptor.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                Queue<String> queue = AbstractAcknowledgementListenerAndInterceptor.this.messages.get(hostPort);
                if (queue == null || !queue.contains(str)) {
                    return;
                }
                AbstractAcknowledgementListenerAndInterceptor.this.getHandler().noAcknowledgementReceived(str);
                queue.remove(str);
            }
        }, getAckTimeoutMillis());
    }

    private void evaluateNack(NackHeader nackHeader, HostPort hostPort) throws Exception {
        String id = nackHeader.getId();
        if (!hasMessageAck(id, hostPort)) {
            throw new UnexpectedAcknowledgementException("No NACK message expected, yet received id " + id + " from " + hostPort);
        }
        clearMessageAck(id, hostPort);
        getHandler().nackReceived(id, nackHeader.getReceipt(), nackHeader.getTransaction());
    }

    private void evaluateAck(AckHeader ackHeader, HostPort hostPort) throws Exception {
        String id = ackHeader.getId();
        if (!hasMessageAck(id, hostPort)) {
            throw new UnexpectedAcknowledgementException("No ACK message expected, yet received id " + id + " from " + hostPort);
        }
        clearMessageAck(id, hostPort);
        getHandler().ackReceived(id, ackHeader.getReceipt(), ackHeader.getTransaction());
    }

    private boolean hasMessageAck(String str, HostPort hostPort) {
        Queue<String> queue = this.messages.get(hostPort);
        if (queue == null || queue.isEmpty()) {
            return false;
        }
        return queue.contains(str);
    }

    private void clearMessageAck(String str, HostPort hostPort) {
        Queue<String> queue = this.messages.get(hostPort);
        if (queue == null) {
            return;
        }
        queue.remove(str);
    }

    public long getAckTimeoutMillis() {
        return this.ackTimeoutMillis;
    }

    public void setAckTimeoutMillis(long j) {
        this.ackTimeoutMillis = j;
    }

    @Override // asia.stampy.common.message.interceptor.AbstractOutgoingMessageInterceptor
    public void setGateway(SVR svr) {
        super.setGateway(svr);
        ensureCleanup();
    }

    protected abstract void ensureCleanup();

    public StampyAcknowledgementHandler getHandler() {
        return this.handler;
    }

    public void setHandler(StampyAcknowledgementHandler stampyAcknowledgementHandler) {
        this.handler = stampyAcknowledgementHandler;
    }
}
