package org.apache.giraph.comm;

import java.util.Iterator;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.PairList;
import org.apache.giraph.utils.VertexIdMessages;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/comm/SendMessageCache.class */
public class SendMessageCache<I extends WritableComparable, M extends Writable> extends SendVertexIdDataCache<I, M, VertexIdMessages<I, M>> {
    private static final Logger LOG = Logger.getLogger(SendMessageCache.class);
    protected long totalMsgsSentInSuperstep;
    protected long totalMsgBytesSentInSuperstep;
    protected final int maxMessagesSizePerWorker;
    protected final NettyWorkerClientRequestProcessor<I, ?, ?> clientProcessor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/giraph/comm/SendMessageCache$TargetVertexIdIterator.class */
    public class TargetVertexIdIterator implements Iterator<I> {
        private Iterator<Edge<I, Writable>> edgesIterator;

        private TargetVertexIdIterator(Vertex<I, ?, ?> vertex) {
            this.edgesIterator = vertex.getEdges().iterator();
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.edgesIterator.hasNext();
        }

        @Override // java.util.Iterator
        public I next() {
            return this.edgesIterator.next().getTargetVertexId();
        }

        @Override // java.util.Iterator
        public void remove() {
        }
    }

    public SendMessageCache(ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration, CentralizedServiceWorker<?, ?, ?> centralizedServiceWorker, NettyWorkerClientRequestProcessor<I, ?, ?> nettyWorkerClientRequestProcessor, int i) {
        super(immutableClassesGiraphConfiguration, centralizedServiceWorker, GiraphConstants.MAX_MSG_REQUEST_SIZE.get(immutableClassesGiraphConfiguration), GiraphConstants.ADDITIONAL_MSG_REQUEST_SIZE.get(immutableClassesGiraphConfiguration));
        this.totalMsgsSentInSuperstep = 0L;
        this.totalMsgBytesSentInSuperstep = 0L;
        this.maxMessagesSizePerWorker = i;
        this.clientProcessor = nettyWorkerClientRequestProcessor;
    }

    @Override // org.apache.giraph.comm.SendVertexIdDataCache
    public VertexIdMessages<I, M> createVertexIdData() {
        return new ByteArrayVertexIdMessages(getConf().getOutgoingMessageValueFactory());
    }

    public int addMessage(WorkerInfo workerInfo, int i, I i2, M m) {
        return addData(workerInfo, i, i2, m);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int addMessage(WorkerInfo workerInfo, int i, byte[] bArr, int i2, M m) {
        return addData(workerInfo, i, bArr, i2, m);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PairList<Integer, VertexIdMessages<I, M>> removeWorkerMessages(WorkerInfo workerInfo) {
        return (PairList<Integer, VertexIdMessages<I, M>>) removeWorkerData(workerInfo);
    }

    private PairList<WorkerInfo, PairList<Integer, VertexIdMessages<I, M>>> removeAllMessages() {
        return (PairList<WorkerInfo, PairList<Integer, VertexIdMessages<I, M>>>) removeAllData();
    }

    public void sendMessageRequest(I i, M m) {
        PartitionOwner vertexPartitionOwner = getServiceWorker().getVertexPartitionOwner(i);
        WorkerInfo workerInfo = vertexPartitionOwner.getWorkerInfo();
        int partitionId = vertexPartitionOwner.getPartitionId();
        if (LOG.isTraceEnabled()) {
            LOG.trace("sendMessageRequest: Send bytes (" + m.toString() + ") to " + i + " on worker " + workerInfo);
        }
        this.totalMsgsSentInSuperstep++;
        if (addMessage(workerInfo, partitionId, i, m) >= this.maxMessagesSizePerWorker) {
            SendWorkerMessagesRequest sendWorkerMessagesRequest = new SendWorkerMessagesRequest(removeWorkerMessages(workerInfo));
            this.totalMsgBytesSentInSuperstep += sendWorkerMessagesRequest.getSerializedSize();
            this.clientProcessor.doRequest(workerInfo, sendWorkerMessagesRequest);
            getServiceWorker().getGraphTaskManager().notifySentMessages();
        }
    }

    public void sendMessageToAllRequest(Vertex<I, ?, ?> vertex, M m) {
        sendMessageToAllRequest((Iterator) new TargetVertexIdIterator(vertex), (TargetVertexIdIterator) m);
    }

    public void sendMessageToAllRequest(Iterator<I> it, M m) {
        while (it.hasNext()) {
            sendMessageRequest(it.next(), m);
        }
    }

    public void flush() {
        PairList<WorkerInfo, PairList<Integer, VertexIdMessages<I, M>>>.Iterator iterator = removeAllMessages().getIterator();
        while (iterator.hasNext()) {
            iterator.next();
            SendWorkerMessagesRequest sendWorkerMessagesRequest = new SendWorkerMessagesRequest(iterator.getCurrentSecond());
            this.totalMsgBytesSentInSuperstep += sendWorkerMessagesRequest.getSerializedSize();
            this.clientProcessor.doRequest(iterator.getCurrentFirst(), sendWorkerMessagesRequest);
        }
    }

    public long resetMessageCount() {
        long j = this.totalMsgsSentInSuperstep;
        this.totalMsgsSentInSuperstep = 0L;
        return j;
    }

    public long resetMessageBytesCount() {
        long j = this.totalMsgBytesSentInSuperstep;
        this.totalMsgBytesSentInSuperstep = 0L;
        return j;
    }
}
