package com.microsoft.reef.io.network.group.impl.operators.faulty;

import com.microsoft.reef.exception.evaluator.NetworkException;
import com.microsoft.reef.io.network.Message;
import com.microsoft.reef.io.network.group.impl.operators.faulty.AllReduceConfig;
import com.microsoft.reef.io.network.proto.ReefNetworkGroupCommProtos;
import com.microsoft.tang.annotations.Name;
import com.microsoft.tang.annotations.NamedParameter;
import com.microsoft.tang.annotations.Parameter;
import com.microsoft.wake.EventHandler;
import com.microsoft.wake.Identifier;
import com.microsoft.wake.IdentifierFactory;
import com.microsoft.wake.remote.Codec;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;

/* loaded from: input_file:com/microsoft/reef/io/network/group/impl/operators/faulty/AllReduceHandler.class */
public class AllReduceHandler implements EventHandler<Message<ReefNetworkGroupCommProtos.GroupCommMessage>> {
    private static final Logger LOG = Logger.getLogger(AllReduceHandler.class.getName());
    private final ConcurrentHashMap<Identifier, BlockingQueue<ReefNetworkGroupCommProtos.GroupCommMessage>> id2que = new ConcurrentHashMap<>();
    private final IdentifierFactory idFac;

    @NamedParameter(doc = "List of Identifiers on which the handler should listen")
    /* loaded from: input_file:com/microsoft/reef/io/network/group/impl/operators/faulty/AllReduceHandler$IDs.class */
    public static class IDs implements Name<Set<String>> {
    }

    @Inject
    public AllReduceHandler(@Parameter(IDs.class) Set<String> set, @Parameter(AllReduceConfig.IdFactory.class) IdentifierFactory identifierFactory) {
        this.idFac = identifierFactory;
        Iterator<String> it = set.iterator();
        while (it.hasNext()) {
            Identifier newInstance = identifierFactory.getNewInstance(it.next());
            this.id2que.put(newInstance, new LinkedBlockingQueue());
            LOG.log(Level.FINEST, "Listen from: {0}", newInstance);
        }
    }

    public void onNext(Message<ReefNetworkGroupCommProtos.GroupCommMessage> message) {
        ReefNetworkGroupCommProtos.GroupCommMessage next = message.getData().iterator().next();
        Identifier newInstance = this.idFac.getNewInstance(next.getSrcid());
        try {
            LOG.log(Level.FINEST, "Message {0} from: {1}", new Object[]{next, newInstance});
            this.id2que.get(newInstance).put(next);
        } catch (InterruptedException e) {
            String str = "Could not put " + next + " into the queue of " + newInstance;
            LOG.log(Level.WARNING, str, (Throwable) e);
            throw new RuntimeException(str, e);
        }
    }

    public <T> T get(Identifier identifier, Codec<T> codec) throws InterruptedException, NetworkException {
        LOG.log(Level.FINEST, "Get from {0}", identifier);
        if (!this.id2que.containsKey(identifier)) {
            RuntimeException runtimeException = new RuntimeException("Can't receive from a non-child");
            LOG.log(Level.WARNING, "Can't receive from a non-child", (Throwable) runtimeException);
            throw runtimeException;
        }
        ReefNetworkGroupCommProtos.GroupCommMessage take = this.id2que.get(identifier).take();
        if (take.getType() == ReefNetworkGroupCommProtos.GroupCommMessage.Type.SourceDead) {
            LOG.log(Level.FINEST, "Got src dead msg from driver. Terminating wait and returning null");
            return null;
        }
        Object obj = null;
        Iterator<ReefNetworkGroupCommProtos.GroupMessageBody> it = take.getMsgsList().iterator();
        while (it.hasNext()) {
            obj = codec.decode(it.next().getData().toByteArray());
        }
        LOG.log(Level.FINEST, "\t\tReturning {0}", obj);
        return (T) obj;
    }
}
