package org.fiware.kiara.ps.subscriber;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.fiware.kiara.ps.qos.policies.HistoryQosPolicy;
import org.fiware.kiara.ps.qos.policies.HistoryQosPolicyKind;
import org.fiware.kiara.ps.qos.policies.OwnershipQosPolicyKind;
import org.fiware.kiara.ps.qos.policies.ResourceLimitsQosPolicy;
import org.fiware.kiara.ps.rtps.attributes.HistoryCacheAttributes;
import org.fiware.kiara.ps.rtps.common.TopicKind;
import org.fiware.kiara.ps.rtps.history.CacheChange;
import org.fiware.kiara.ps.rtps.history.ReaderHistoryCache;
import org.fiware.kiara.ps.rtps.messages.common.types.ChangeKind;
import org.fiware.kiara.ps.rtps.messages.elements.InstanceHandle;
import org.fiware.kiara.ps.rtps.reader.WriterProxy;
import org.fiware.kiara.serialization.impl.BinaryInputStream;
import org.fiware.kiara.serialization.impl.Serializable;
import org.fiware.kiara.util.Pair;
import org.fiware.kiara.util.ReturnParam;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/fiware/kiara/ps/subscriber/SubscriberHistory.class */
public class SubscriberHistory<T> extends ReaderHistoryCache {
    private long m_unreadCacheCount;
    private List<Pair<InstanceHandle, List<CacheChange>>> m_keyedChanges;
    private HistoryQosPolicy m_historyQos;
    private ResourceLimitsQosPolicy m_resourceLimitsQos;
    private Subscriber<T> m_subscriber;
    private T m_getKeyObject;
    private final Lock m_mutex;
    private static final Logger logger = LoggerFactory.getLogger(SubscriberHistory.class);

    public SubscriberHistory(Subscriber<T> subscriber, int i, HistoryQosPolicy historyQosPolicy, ResourceLimitsQosPolicy resourceLimitsQosPolicy) {
        super(new HistoryCacheAttributes(i, resourceLimitsQosPolicy.allocatedSamples, resourceLimitsQosPolicy.maxSamples));
        this.m_mutex = new ReentrantLock(true);
        this.m_unreadCacheCount = 0L;
        this.m_historyQos = historyQosPolicy;
        this.m_resourceLimitsQos = resourceLimitsQosPolicy;
        this.m_subscriber = subscriber;
        this.m_getKeyObject = this.m_subscriber.getType().createData();
        this.m_keyedChanges = new ArrayList();
    }

    @Override // org.fiware.kiara.ps.rtps.history.ReaderHistoryCache
    public boolean receivedChange(CacheChange cacheChange) {
        this.m_mutex.lock();
        try {
            if (this.m_isHistoryFull) {
                logger.warn("Attempting to add Data to full ReaderHistory");
                this.m_mutex.unlock();
                return false;
            }
            if (cacheChange.getSequenceNumber().isLowerThan(this.m_maxSeqCacheChange.getSequenceNumber())) {
                for (CacheChange cacheChange2 : this.m_changes) {
                    if (!cacheChange2.getSequenceNumber().equals(cacheChange.getSequenceNumber()) || !cacheChange2.getWriterGUID().equals(cacheChange.getWriterGUID())) {
                        if (cacheChange2.getWriterGUID().equals(cacheChange.getWriterGUID()) && cacheChange2.getSequenceNumber().isLowerThan(cacheChange.getSequenceNumber())) {
                            break;
                        }
                    } else {
                        logger.debug("Change (seqNum: {}) already in ReaderHistory", Long.valueOf(cacheChange.getSequenceNumber().toLong()));
                        this.m_mutex.unlock();
                        return false;
                    }
                }
            }
            if (this.m_subscriber.getAttributes().topic.topicKind == TopicKind.NO_KEY) {
                boolean z = false;
                if (this.m_historyQos.kind == HistoryQosPolicyKind.KEEP_ALL_HISTORY_QOS) {
                    z = true;
                } else if (this.m_historyQos.kind == HistoryQosPolicyKind.KEEP_LAST_HISTORY_QOS) {
                    if (this.m_changes.size() < this.m_historyQos.depth) {
                        z = true;
                    } else {
                        boolean z2 = this.m_minSeqCacheChange.isRead();
                        if (removeChangeSub(this.m_minSeqCacheChange, null)) {
                            if (!z2) {
                                decreaseUnreadCount();
                            }
                            z = true;
                        }
                    }
                }
                if (!z) {
                    return false;
                }
                if (addChange(cacheChange)) {
                    increaseUnreadCount();
                    if (cacheChange.getSequenceNumber().isLowerThan(this.m_maxSeqCacheChange.getSequenceNumber())) {
                        sortCacheChanges();
                    }
                    updateMaxMinSeqNum();
                    if (this.m_changes.size() == this.m_resourceLimitsQos.maxSamples) {
                        this.m_isHistoryFull = true;
                    }
                    logger.debug("Change added from {} ", cacheChange.getWriterGUID());
                    this.m_mutex.unlock();
                    return true;
                }
            } else if (this.m_subscriber.getAttributes().topic.topicKind == TopicKind.WITH_KEY) {
                if (!cacheChange.getInstanceHandle().isDefined() && this.m_subscriber.getType() != null) {
                    logger.debug("Getting Key of change with no Key transmitted");
                    try {
                        ((Serializable) this.m_getKeyObject).deserialize(cacheChange.getSerializedPayload().getSerializer(), new BinaryInputStream(cacheChange.getSerializedPayload().getBuffer()), "");
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    if (!this.m_subscriber.getType().getKey(this.m_getKeyObject, cacheChange.getInstanceHandle())) {
                        this.m_mutex.unlock();
                        return false;
                    }
                } else if (!cacheChange.getInstanceHandle().isDefined()) {
                    logger.warn("NO KEY in topic " + this.m_subscriber.getAttributes().topic.topicName + " and no method to obtain it");
                    this.m_mutex.unlock();
                    return false;
                }
                Pair<InstanceHandle, List<CacheChange>> findKey = findKey(cacheChange);
                if (findKey != null) {
                    boolean z3 = false;
                    if (this.m_historyQos.kind == HistoryQosPolicyKind.KEEP_ALL_HISTORY_QOS) {
                        if (findKey.getSecond().size() >= this.m_resourceLimitsQos.maxSamplesPerInstance) {
                            logger.warn("Change not added due to maximum number of samples per instance");
                            this.m_mutex.unlock();
                            return false;
                        }
                        z3 = true;
                    } else if (this.m_historyQos.kind == HistoryQosPolicyKind.KEEP_LAST_HISTORY_QOS) {
                        if (findKey.getSecond().size() < this.m_historyQos.depth) {
                            z3 = true;
                        } else {
                            boolean z4 = findKey.getSecond().get(0).isRead();
                            if (removeChangeSub(findKey.getSecond().get(0), findKey)) {
                                if (!z4) {
                                    decreaseUnreadCount();
                                }
                                z3 = true;
                            }
                        }
                    }
                    if (!z3) {
                        this.m_mutex.unlock();
                        return false;
                    }
                    if (addChange(cacheChange)) {
                        increaseUnreadCount();
                        if (cacheChange.getSequenceNumber().isLowerThan(this.m_maxSeqCacheChange.getSequenceNumber())) {
                            sortCacheChanges();
                        }
                        updateMaxMinSeqNum();
                        if (this.m_changes.size() == this.m_resourceLimitsQos.maxSamples) {
                            this.m_isHistoryFull = true;
                        }
                        if (findKey.getSecond().size() == 0) {
                            findKey.getSecond().add(cacheChange);
                        } else if (findKey.getSecond().get(findKey.getSecond().size() - 1).getSequenceNumber().isLowerThan(cacheChange.getSequenceNumber())) {
                            findKey.getSecond().add(cacheChange);
                        } else {
                            findKey.getSecond().add(cacheChange);
                            Collections.sort(findKey.getSecond());
                        }
                        logger.debug("Change {} added from {} with key {}", new Object[]{Long.valueOf(cacheChange.getSequenceNumber().toLong()), cacheChange.getWriterGUID(), cacheChange.getInstanceHandle()});
                        this.m_mutex.unlock();
                        return true;
                    }
                }
            }
            this.m_mutex.unlock();
            return false;
        } finally {
            this.m_mutex.unlock();
        }
    }

    public void increaseUnreadCount() {
        this.m_unreadCacheCount++;
    }

    public void decreaseUnreadCount() {
        if (this.m_unreadCacheCount > 0) {
            this.m_unreadCacheCount--;
        }
    }

    private Pair<InstanceHandle, List<CacheChange>> findKey(CacheChange cacheChange) {
        for (Pair<InstanceHandle, List<CacheChange>> pair : this.m_keyedChanges) {
            if (cacheChange.getInstanceHandle().equals(pair.getFirst())) {
                return pair;
            }
        }
        if (this.m_keyedChanges.size() < this.m_resourceLimitsQos.maxInstances) {
            Pair<InstanceHandle, List<CacheChange>> pair2 = new Pair<>(cacheChange.getInstanceHandle(), new ArrayList());
            this.m_keyedChanges.add(pair2);
            return pair2;
        }
        for (int i = 0; i < this.m_keyedChanges.size(); i++) {
            Pair<InstanceHandle, List<CacheChange>> pair3 = this.m_keyedChanges.get(i);
            if (pair3.getSecond().size() == 0) {
                this.m_keyedChanges.remove(pair3);
                int i2 = i - 1;
                Pair<InstanceHandle, List<CacheChange>> pair4 = new Pair<>(cacheChange.getInstanceHandle(), new ArrayList());
                this.m_keyedChanges.add(pair4);
                return pair4;
            }
        }
        return null;
    }

    public boolean removeChangeSub(CacheChange cacheChange, Pair<InstanceHandle, List<CacheChange>> pair) {
        Pair<InstanceHandle, List<CacheChange>> findKey;
        this.m_mutex.lock();
        try {
            if (this.m_subscriber.getAttributes().topic.topicKind == TopicKind.NO_KEY) {
                boolean removeChange = removeChange(cacheChange);
                this.m_mutex.unlock();
                return removeChange;
            }
            if (pair != null) {
                findKey = pair;
            } else {
                findKey = findKey(cacheChange);
                if (findKey == null) {
                    return false;
                }
            }
            for (int i = 0; i < findKey.getSecond().size(); i++) {
                CacheChange cacheChange2 = findKey.getSecond().get(i);
                if (cacheChange2.getSequenceNumber().equals(cacheChange.getSequenceNumber()) && cacheChange2.getWriterGUID().equals(cacheChange.getWriterGUID()) && removeChange(cacheChange)) {
                    findKey.getSecond().remove(cacheChange2);
                    int i2 = i - 1;
                    this.m_mutex.unlock();
                    return true;
                }
            }
            logger.error("Change not found, something went wrong");
            this.m_mutex.unlock();
            return false;
        } finally {
            this.m_mutex.unlock();
        }
    }

    public long getUnreadCount() {
        return this.m_unreadCacheCount;
    }

    public T readNextData(SampleInfo sampleInfo) {
        this.m_mutex.lock();
        try {
            ReturnParam<CacheChange> returnParam = new ReturnParam<>();
            ReturnParam<WriterProxy> returnParam2 = new ReturnParam<>();
            if (this.m_reader.nextUnreadCache(returnParam, returnParam2)) {
                returnParam.value.setRead(true);
                decreaseUnreadCount();
                logger.info(this.m_reader.getGuid().getEntityId() + ": reading " + returnParam.value.getSequenceNumber().toLong());
                if (returnParam.value.getKind() == ChangeKind.ALIVE) {
                    try {
                        this.m_subscriber.getType().deserialize(returnParam.value.getSerializedPayload());
                    } catch (IllegalAccessException e) {
                        e.printStackTrace();
                    } catch (InstantiationException e2) {
                        e2.printStackTrace();
                    }
                }
                if (sampleInfo != null) {
                    sampleInfo.sampleKind = returnParam.value.getKind();
                    sampleInfo.writerGUID = returnParam.value.getWriterGUID();
                    sampleInfo.sourceTimestamp = returnParam.value.getSourceTimestamp();
                    if (this.m_subscriber.getAttributes().qos.ownership.kind == OwnershipQosPolicyKind.EXCLUSIVE_OWNERSHIP_QOS) {
                        sampleInfo.ownershipStrength = returnParam2.value.att.ownershipStrength;
                    }
                    if (this.m_subscriber.getAttributes().topic.topicKind != TopicKind.WITH_KEY || !returnParam.value.getInstanceHandle().equals(new InstanceHandle()) || returnParam.value.getKind() == ChangeKind.ALIVE) {
                    }
                    sampleInfo.handle = returnParam.value.getInstanceHandle();
                }
            }
            return null;
        } finally {
            this.m_mutex.unlock();
        }
    }

    public T takeNextData(SampleInfo sampleInfo) {
        this.m_mutex.lock();
        try {
            T t = null;
            ReturnParam<CacheChange> returnParam = new ReturnParam<>();
            ReturnParam<WriterProxy> returnParam2 = new ReturnParam<>();
            if (!this.m_reader.nextUntakenCache(returnParam, returnParam2)) {
                this.m_mutex.unlock();
                return null;
            }
            if (!returnParam.value.isRead()) {
                decreaseUnreadCount();
            }
            returnParam.value.setRead(true);
            logger.debug("Taking seqNum {} from writer {}", Long.valueOf(returnParam.value.getSequenceNumber().toLong()), returnParam.value.getWriterGUID());
            if (returnParam.value.getKind() == ChangeKind.ALIVE) {
                try {
                    t = this.m_subscriber.getType().deserialize(returnParam.value.getSerializedPayload());
                } catch (IllegalAccessException | InstantiationException e) {
                    e.printStackTrace();
                }
            }
            if (sampleInfo != null) {
                sampleInfo.sampleKind = returnParam.value.getKind();
                sampleInfo.writerGUID = returnParam.value.getWriterGUID();
                sampleInfo.sourceTimestamp = returnParam.value.getSourceTimestamp();
                if (this.m_subscriber.getAttributes().qos.ownership.kind == OwnershipQosPolicyKind.EXCLUSIVE_OWNERSHIP_QOS) {
                    sampleInfo.ownershipStrength = returnParam2.value.att.ownershipStrength;
                }
                if (this.m_subscriber.getAttributes().topic.topicKind == TopicKind.WITH_KEY && returnParam.value.getInstanceHandle().equals(new InstanceHandle()) && returnParam.value.getKind() == ChangeKind.ALIVE) {
                    this.m_subscriber.getType().getKey(t, returnParam.value.getInstanceHandle());
                }
                sampleInfo.handle = returnParam.value.getInstanceHandle();
            }
            removeChangeSub(returnParam.value, null);
            T t2 = t;
            this.m_mutex.unlock();
            return t2;
        } catch (Throwable th) {
            this.m_mutex.unlock();
            throw th;
        }
    }
}
