package net.sf.jrtps.rtps;

import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import net.sf.jrtps.Configuration;
import net.sf.jrtps.QualityOfService;
import net.sf.jrtps.builtin.SubscriptionData;
import net.sf.jrtps.message.AckNack;
import net.sf.jrtps.message.Data;
import net.sf.jrtps.message.DataEncapsulation;
import net.sf.jrtps.message.Heartbeat;
import net.sf.jrtps.message.InfoTimestamp;
import net.sf.jrtps.message.Message;
import net.sf.jrtps.message.parameter.ParameterList;
import net.sf.jrtps.message.parameter.QosDurability;
import net.sf.jrtps.message.parameter.QosReliability;
import net.sf.jrtps.message.parameter.StatusInfo;
import net.sf.jrtps.types.EntityId;
import net.sf.jrtps.types.Guid;
import net.sf.jrtps.types.GuidPrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/sf/jrtps/rtps/RTPSWriter.class */
public class RTPSWriter<T> extends Endpoint {
    private static final Logger log = LoggerFactory.getLogger(RTPSWriter.class);
    private final Map<Guid, ReaderProxy> readerProxies;
    private final WriterCache<T> writer_cache;
    private final int nackResponseDelay;
    private final int heartbeatPeriod;
    private final boolean pushMode;
    private int hbCount;
    private ScheduledFuture<?> hbAnnounceTask;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RTPSWriter(RTPSParticipant rTPSParticipant, EntityId entityId, String str, WriterCache<T> writerCache, QualityOfService qualityOfService, Configuration configuration) {
        super(rTPSParticipant, entityId, str, qualityOfService, configuration);
        this.readerProxies = new ConcurrentHashMap();
        this.writer_cache = writerCache;
        this.nackResponseDelay = configuration.getNackResponseDelay();
        this.heartbeatPeriod = configuration.getHeartbeatPeriod();
        this.pushMode = configuration.getPushMode();
        if (isReliable()) {
            this.hbAnnounceTask = rTPSParticipant.scheduleAtFixedRate(new Runnable() { // from class: net.sf.jrtps.rtps.RTPSWriter.1
                @Override // java.lang.Runnable
                public void run() {
                    RTPSWriter.log.debug("[{}] Starting periodical notification", RTPSWriter.this.getEntityId());
                    try {
                        RTPSWriter.this.notifyReaders(false);
                    } catch (Exception e) {
                        RTPSWriter.log.error("Got exception while doing periodical notification", e);
                    }
                }
            }, this.heartbeatPeriod);
        }
    }

    public int endpointSetId() {
        return getEntityId().getEndpointSetId();
    }

    public void notifyReaders() {
        notifyReaders(this.pushMode);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyReaders(boolean z) {
        if (this.readerProxies.size() > 0) {
            log.debug("[{}] Notifying {} matched readers of changes in history cache", getEntityId(), Integer.valueOf(this.readerProxies.size()));
            Iterator<ReaderProxy> it = this.readerProxies.values().iterator();
            while (it.hasNext()) {
                notifyReader(it.next().getSubscriptionData().getKey(), z);
            }
        }
    }

    public void notifyReader(Guid guid) {
        notifyReader(guid, this.pushMode);
    }

    private void notifyReader(Guid guid, boolean z) {
        ReaderProxy readerProxy = this.readerProxies.get(guid);
        if (readerProxy == null) {
            log.warn("Will not notify, no proxy for {}", guid);
            return;
        }
        if (readerProxy.isReliable() && !z) {
            sendHeartbeat(readerProxy);
            return;
        }
        sendData(readerProxy, readerProxy.getReadersHighestSeqNum());
        if (readerProxy.isReliable()) {
            return;
        }
        readerProxy.setReadersHighestSeqNum(this.writer_cache.getSeqNumMax());
    }

    public void assertLiveliness() {
        Iterator<ReaderProxy> it = this.readerProxies.values().iterator();
        while (it.hasNext()) {
            sendHeartbeat(it.next(), true);
        }
    }

    public void close() {
        if (this.hbAnnounceTask != null) {
            this.hbAnnounceTask.cancel(true);
        }
        this.readerProxies.clear();
    }

    public ReaderProxy addMatchedReader(SubscriptionData subscriptionData) {
        ReaderProxy readerProxy = new ReaderProxy(subscriptionData, getLocators(subscriptionData), false, getConfiguration().getNackSuppressionDuration());
        readerProxy.preferMulticast(getConfiguration().preferMulticast());
        this.readerProxies.put(subscriptionData.getKey(), readerProxy);
        if (QosDurability.Kind.VOLATILE == subscriptionData.getQualityOfService().getDurability().getKind()) {
            log.trace("[{}] Setting highest seqNum to {} for VOLATILE reader", getEntityId(), Long.valueOf(this.writer_cache.getSeqNumMax()));
            readerProxy.setReadersHighestSeqNum(this.writer_cache.getSeqNumMax());
        } else {
            notifyReader(readerProxy.getGuid());
        }
        log.debug("[{}] Added matchedReader {}", getEntityId(), subscriptionData);
        return readerProxy;
    }

    public void removeMatchedReaders(GuidPrefix guidPrefix) {
        for (ReaderProxy readerProxy : this.readerProxies.values()) {
            if (guidPrefix.equals(readerProxy.getGuid().getPrefix())) {
                removeMatchedReader(readerProxy.getSubscriptionData());
            }
        }
    }

    public void removeMatchedReader(SubscriptionData subscriptionData) {
        this.readerProxies.remove(subscriptionData.getKey());
        log.debug("[{}] Removed matchedReader {}, {}", getEntityId(), subscriptionData.getKey());
    }

    public Collection<ReaderProxy> getMatchedReaders() {
        return this.readerProxies.values();
    }

    public Collection<ReaderProxy> getMatchedReaders(GuidPrefix guidPrefix) {
        LinkedList linkedList = new LinkedList();
        for (Guid guid : this.readerProxies.keySet()) {
            if (guid.getPrefix().equals(guidPrefix)) {
                linkedList.add(this.readerProxies.get(guid));
            }
        }
        return linkedList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onAckNack(GuidPrefix guidPrefix, AckNack ackNack) {
        log.debug("[{}] Got AckNack: #{} {}, F:{} from {}", new Object[]{getEntityId(), Integer.valueOf(ackNack.getCount()), ackNack.getReaderSNState(), Boolean.valueOf(ackNack.finalFlag()), guidPrefix});
        ReaderProxy readerProxy = this.readerProxies.get(new Guid(guidPrefix, ackNack.getReaderId()));
        if (readerProxy == null) {
            log.warn("[{}] Discarding AckNack from unknown reader {}", getEntityId(), ackNack.getReaderId());
        } else if (readerProxy.ackNackReceived(ackNack)) {
            log.trace("[{}] Wait for nack response delay: {} ms", getEntityId(), Integer.valueOf(this.nackResponseDelay));
            getParticipant().waitFor(this.nackResponseDelay);
            sendData(readerProxy, ackNack.getReaderSNState().getBitmapBase() - 1);
        }
    }

    private void sendData(ReaderProxy readerProxy, long j) {
        Message message = new Message(getGuid().getPrefix());
        LinkedList<Sample<T>> samplesSince = this.writer_cache.getSamplesSince(j);
        if (samplesSince.size() == 0) {
            log.debug("[{}] Remote reader already has all the data", new Object[]{getEntityId(), readerProxy, Long.valueOf(j)});
            return;
        }
        long j2 = 0;
        EntityId entityId = readerProxy.getEntityId();
        Iterator<Sample<T>> it = samplesSince.iterator();
        while (it.hasNext()) {
            Sample<T> next = it.next();
            try {
                long timestamp = next.getTimestamp();
                if (timestamp > j2) {
                    message.addSubMessage(new InfoTimestamp(timestamp));
                }
                j2 = timestamp;
                log.trace("Marshalling {}", next.getData());
                message.addSubMessage(createData(entityId, next));
            } catch (IOException e) {
                log.warn("[{}] Failed to add Sample to message", getEntityId(), e);
            }
        }
        if (readerProxy.isReliable()) {
            Heartbeat createHeartbeat = createHeartbeat(entityId);
            createHeartbeat.finalFlag(false);
            message.addSubMessage(createHeartbeat);
        }
        log.debug("[{}] Sending Data: {}-{} to {}", new Object[]{getEntityId(), Long.valueOf(samplesSince.getFirst().getSequenceNumber()), Long.valueOf(samplesSince.getLast().getSequenceNumber()), readerProxy});
        if (sendMessage(message, readerProxy)) {
            log.trace("Sending of Data overflowed. Sending HeartBeat to notify reader.");
            sendHeartbeat(readerProxy);
        }
    }

    private void sendHeartbeat(ReaderProxy readerProxy) {
        sendHeartbeat(readerProxy, false);
    }

    private void sendHeartbeat(ReaderProxy readerProxy, boolean z) {
        Message message = new Message(getGuid().getPrefix());
        Heartbeat createHeartbeat = createHeartbeat(readerProxy.getEntityId());
        createHeartbeat.livelinessFlag(z);
        message.addSubMessage(createHeartbeat);
        log.debug("[{}] Sending Heartbeat: #{} {}-{}, F:{}, L:{} to {}", new Object[]{getEntityId(), Integer.valueOf(createHeartbeat.getCount()), Long.valueOf(createHeartbeat.getFirstSequenceNumber()), Long.valueOf(createHeartbeat.getLastSequenceNumber()), Boolean.valueOf(createHeartbeat.finalFlag()), Boolean.valueOf(createHeartbeat.livelinessFlag()), readerProxy.getGuid()});
        sendMessage(message, readerProxy);
        if (z) {
            return;
        }
        readerProxy.heartbeatSent();
    }

    private Heartbeat createHeartbeat(EntityId entityId) {
        if (entityId == null) {
            entityId = EntityId.UNKNOWN_ENTITY;
        }
        EntityId entityId2 = getEntityId();
        long seqNumMin = this.writer_cache.getSeqNumMin();
        long seqNumMax = this.writer_cache.getSeqNumMax();
        int i = this.hbCount;
        this.hbCount = i + 1;
        return new Heartbeat(entityId, entityId2, seqNumMin, seqNumMax, i);
    }

    private Data createData(EntityId entityId, Sample<T> sample) throws IOException {
        DataEncapsulation dataEncapsulation = sample.getDataEncapsulation();
        ParameterList parameterList = new ParameterList();
        if (sample.hasKey()) {
            parameterList.add(sample.getKey());
        }
        if (!sample.getKind().equals(ChangeKind.WRITE)) {
            parameterList.add(new StatusInfo(sample.getKind()));
        }
        return new Data(entityId, getEntityId(), sample.getSequenceNumber(), parameterList, dataEncapsulation);
    }

    public boolean isAcknowledgedByAll(long j) {
        for (ReaderProxy readerProxy : this.readerProxies.values()) {
            if (readerProxy.isActive() && readerProxy.getReadersHighestSeqNum() < j) {
                return false;
            }
        }
        return true;
    }

    public boolean isMatchedWith(SubscriptionData subscriptionData) {
        return this.readerProxies.get(subscriptionData.getKey()) != null;
    }

    boolean isReliable() {
        return getQualityOfService().getReliability().getKind() == QosReliability.Kind.RELIABLE;
    }
}
