package com.microsoft.reef.io.network.group.impl.operators;

import com.google.protobuf.ByteString;
import com.microsoft.reef.exception.evaluator.NetworkException;
import com.microsoft.reef.io.network.Connection;
import com.microsoft.reef.io.network.impl.NetworkService;
import com.microsoft.reef.io.network.proto.ReefNetworkGroupCommProtos;
import com.microsoft.reef.io.network.util.ListCodec;
import com.microsoft.tang.annotations.Name;
import com.microsoft.tang.annotations.NamedParameter;
import com.microsoft.tang.annotations.Parameter;
import com.microsoft.wake.Identifier;
import com.microsoft.wake.remote.Codec;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import javax.inject.Inject;

/* loaded from: input_file:com/microsoft/reef/io/network/group/impl/operators/SenderHelperImpl.class */
public class SenderHelperImpl<T> implements SenderHelper<T> {
    final NetworkService<ReefNetworkGroupCommProtos.GroupCommMessage> netService;
    final Codec<T> codec;

    @NamedParameter(doc = "codec for the network service", short_name = "nscodec")
    /* loaded from: input_file:com/microsoft/reef/io/network/group/impl/operators/SenderHelperImpl$SenderCodec.class */
    public static class SenderCodec implements Name<Codec<?>> {
    }

    @Inject
    public SenderHelperImpl(NetworkService<ReefNetworkGroupCommProtos.GroupCommMessage> networkService, @Parameter(SenderCodec.class) Codec<T> codec) {
        this.netService = networkService;
        this.codec = codec;
    }

    @Override // com.microsoft.reef.io.network.group.impl.operators.SenderHelper
    public void send(Identifier identifier, Identifier identifier2, T t, ReefNetworkGroupCommProtos.GroupCommMessage.Type type) throws NetworkException {
        send(identifier, identifier2, (List) Arrays.asList(t), type);
    }

    @Override // com.microsoft.reef.io.network.group.impl.operators.SenderHelper
    public void send(Identifier identifier, Identifier identifier2, List<T> list, ReefNetworkGroupCommProtos.GroupCommMessage.Type type) throws NetworkException {
        ReefNetworkGroupCommProtos.GroupCommMessage.Builder newBuilder = ReefNetworkGroupCommProtos.GroupCommMessage.newBuilder();
        newBuilder.setType(type);
        newBuilder.setSrcid(identifier.toString());
        newBuilder.setDestid(identifier2.toString());
        ReefNetworkGroupCommProtos.GroupMessageBody.Builder newBuilder2 = ReefNetworkGroupCommProtos.GroupMessageBody.newBuilder();
        Iterator<T> it = list.iterator();
        while (it.hasNext()) {
            newBuilder2.setData(ByteString.copyFrom(this.codec.encode(it.next())));
            newBuilder.addMsgs(newBuilder2.m123build());
        }
        netServiceSend(identifier2, newBuilder.m90build());
    }

    @Override // com.microsoft.reef.io.network.group.impl.operators.SenderHelper
    public void sendListOfList(Identifier identifier, Identifier identifier2, List<List<T>> list, ReefNetworkGroupCommProtos.GroupCommMessage.Type type) throws NetworkException {
        ReefNetworkGroupCommProtos.GroupCommMessage.Builder newBuilder = ReefNetworkGroupCommProtos.GroupCommMessage.newBuilder();
        newBuilder.setType(type);
        newBuilder.setSrcid(identifier.toString());
        newBuilder.setDestid(identifier2.toString());
        ReefNetworkGroupCommProtos.GroupMessageBody.Builder newBuilder2 = ReefNetworkGroupCommProtos.GroupMessageBody.newBuilder();
        ListCodec listCodec = new ListCodec(this.codec);
        Iterator<List<T>> it = list.iterator();
        while (it.hasNext()) {
            newBuilder2.setData(ByteString.copyFrom(listCodec.encode((List) it.next())));
            newBuilder.addMsgs(newBuilder2.m123build());
        }
        netServiceSend(identifier2, newBuilder.m90build());
    }

    private void netServiceSend(Identifier identifier, ReefNetworkGroupCommProtos.GroupCommMessage groupCommMessage) throws NetworkException {
        Connection<ReefNetworkGroupCommProtos.GroupCommMessage> newConnection = this.netService.newConnection(identifier);
        newConnection.open();
        newConnection.write(groupCommMessage);
    }

    @Override // com.microsoft.reef.io.network.group.impl.operators.SenderHelper
    public void send(Identifier identifier, List<? extends Identifier> list, List<T> list2, List<Integer> list3, ReefNetworkGroupCommProtos.GroupCommMessage.Type type) throws NetworkException {
        int i = 0;
        Iterator<? extends Identifier> it = list.iterator();
        Iterator<Integer> it2 = list3.iterator();
        while (it2.hasNext()) {
            int intValue = it2.next().intValue();
            send(identifier, it.next(), (List) list2.subList(i, i + intValue), type);
            i += intValue;
        }
    }
}
