package com.microsoft.reef.io.network.group.impl;

import com.microsoft.reef.io.network.Message;
import com.microsoft.reef.io.network.group.impl.operators.basic.config.GroupParameters;
import com.microsoft.reef.io.network.proto.ReefNetworkGroupCommProtos;
import com.microsoft.reef.io.network.util.Utils;
import com.microsoft.tang.annotations.Name;
import com.microsoft.tang.annotations.NamedParameter;
import com.microsoft.tang.annotations.Parameter;
import com.microsoft.wake.EventHandler;
import com.microsoft.wake.Identifier;
import com.microsoft.wake.IdentifierFactory;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.inject.Inject;

/* loaded from: input_file:com/microsoft/reef/io/network/group/impl/GroupCommNetworkHandler.class */
public class GroupCommNetworkHandler implements EventHandler<Message<ReefNetworkGroupCommProtos.GroupCommMessage>> {
    Map<ReefNetworkGroupCommProtos.GroupCommMessage.Type, GCMHandler> handlerMap;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/microsoft/reef/io/network/group/impl/GroupCommNetworkHandler$GCMHandler.class */
    public static class GCMHandler implements Handler {
        Map<Identifier, BlockingQueue<ReefNetworkGroupCommProtos.GroupCommMessage>> idQues;
        IdentifierFactory idFactory;
        int capacity;

        public GCMHandler(List<Identifier> list, IdentifierFactory identifierFactory, int i) {
            this.idQues = new HashMap(list.size());
            this.idFactory = identifierFactory;
            this.capacity = i;
            Iterator<Identifier> it = list.iterator();
            while (it.hasNext()) {
                this.idQues.put(it.next(), new LinkedBlockingQueue(this.capacity));
            }
        }

        @Override // com.microsoft.reef.io.network.group.impl.Handler
        public ReefNetworkGroupCommProtos.GroupCommMessage getData(Identifier identifier) throws InterruptedException {
            return this.idQues.get(identifier).take();
        }

        public void onNext(ReefNetworkGroupCommProtos.GroupCommMessage groupCommMessage) {
            this.idQues.get(this.idFactory.getNewInstance(groupCommMessage.getSrcid())).add(groupCommMessage);
        }
    }

    @NamedParameter(doc = "List of Identifiers on which the handler should listen")
    /* loaded from: input_file:com/microsoft/reef/io/network/group/impl/GroupCommNetworkHandler$IDs.class */
    public static class IDs implements Name<String> {
    }

    @NamedParameter(doc = "Queue Capacity for the handler of each operation type", default_value = "5")
    /* loaded from: input_file:com/microsoft/reef/io/network/group/impl/GroupCommNetworkHandler$QueueCapacity.class */
    public static class QueueCapacity implements Name<Integer> {
    }

    @Inject
    public GroupCommNetworkHandler(@Parameter(IDs.class) String str, @Parameter(GroupParameters.IDFactory.class) IdentifierFactory identifierFactory, @Parameter(QueueCapacity.class) int i) {
        this(Utils.parseList(str, identifierFactory), identifierFactory, i);
    }

    public GroupCommNetworkHandler(List<Identifier> list, IdentifierFactory identifierFactory, int i) {
        this.handlerMap = new HashMap();
        this.handlerMap.put(ReefNetworkGroupCommProtos.GroupCommMessage.Type.AllGather, new GCMHandler(list, identifierFactory, i));
        this.handlerMap.put(ReefNetworkGroupCommProtos.GroupCommMessage.Type.AllReduce, new GCMHandler(list, identifierFactory, i));
        this.handlerMap.put(ReefNetworkGroupCommProtos.GroupCommMessage.Type.Broadcast, new GCMHandler(list, identifierFactory, i));
        this.handlerMap.put(ReefNetworkGroupCommProtos.GroupCommMessage.Type.Gather, new GCMHandler(list, identifierFactory, i));
        this.handlerMap.put(ReefNetworkGroupCommProtos.GroupCommMessage.Type.Reduce, new GCMHandler(list, identifierFactory, i));
        this.handlerMap.put(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ReduceScatter, new GCMHandler(list, identifierFactory, i));
        this.handlerMap.put(ReefNetworkGroupCommProtos.GroupCommMessage.Type.Scatter, new GCMHandler(list, identifierFactory, i));
    }

    public Handler getHandler(ReefNetworkGroupCommProtos.GroupCommMessage.Type type) {
        return this.handlerMap.get(type);
    }

    public void onNext(Message<ReefNetworkGroupCommProtos.GroupCommMessage> message) {
        ReefNetworkGroupCommProtos.GroupCommMessage next = message.getData().iterator().next();
        getHandler(next.getType()).onNext(next);
    }
}
