package com.microsoft.reef.io.network.group.impl.operators.faulty;

import com.microsoft.reef.exception.evaluator.NetworkException;
import com.microsoft.reef.io.network.Message;
import com.microsoft.reef.io.network.group.impl.operators.faulty.BroadRedHandler;
import com.microsoft.reef.io.network.group.impl.operators.faulty.BroadReduceConfig;
import com.microsoft.reef.io.network.proto.ReefNetworkGroupCommProtos;
import com.microsoft.tang.annotations.Parameter;
import com.microsoft.wake.EventHandler;
import com.microsoft.wake.Identifier;
import com.microsoft.wake.IdentifierFactory;
import com.microsoft.wake.remote.Codec;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;

/* loaded from: input_file:com/microsoft/reef/io/network/group/impl/operators/faulty/ReduceHandler.class */
public final class ReduceHandler implements EventHandler<Message<ReefNetworkGroupCommProtos.GroupCommMessage>> {
    private static final Logger LOG = Logger.getLogger(ReduceHandler.class.getName());
    private final ConcurrentHashMap<Identifier, BlockingQueue<ReefNetworkGroupCommProtos.GroupCommMessage>> id2dataQue = new ConcurrentHashMap<>();
    private final BlockingQueue<ReefNetworkGroupCommProtos.GroupCommMessage> ctrlQue = new LinkedBlockingQueue();
    private final IdentifierFactory idFac;

    @Inject
    public ReduceHandler(@Parameter(BroadRedHandler.IDs.class) Set<String> set, @Parameter(BroadReduceConfig.IdFactory.class) IdentifierFactory identifierFactory) {
        this.idFac = identifierFactory;
        LOG.log(Level.FINEST, "\t\tI can listen from:");
        Iterator<String> it = set.iterator();
        while (it.hasNext()) {
            Identifier newInstance = identifierFactory.getNewInstance(it.next());
            addChild(newInstance);
            LOG.log(Level.FINEST, "\t\t{0}", newInstance);
        }
    }

    public synchronized void addChild(Identifier identifier) {
        LOG.log(Level.FINEST, "Adding {0} as one of the senders to which I can listen from", identifier);
        this.id2dataQue.put(identifier, new LinkedBlockingQueue());
    }

    public synchronized void removeChild(Identifier identifier) {
        LOG.log(Level.FINEST, "Removing {0} as one of the senders to which I can listen from", identifier);
        this.id2dataQue.remove(identifier);
    }

    public void onNext(Message<ReefNetworkGroupCommProtos.GroupCommMessage> message) {
        ReefNetworkGroupCommProtos.GroupCommMessage next = message.getData().iterator().hasNext() ? message.getData().iterator().next() : null;
        Identifier newInstance = this.idFac.getNewInstance(next.getSrcid());
        try {
            LOG.log(Level.FINEST, "\t\t{0} from: {1}", new Object[]{next.getType(), newInstance});
            switch (next.getType()) {
                case SourceAdd:
                    this.ctrlQue.put(next);
                    break;
                case SourceDead:
                    this.ctrlQue.put(next);
                    if (this.id2dataQue.containsKey(newInstance)) {
                        this.id2dataQue.get(newInstance).put(next);
                        break;
                    }
                    break;
                default:
                    if (!this.id2dataQue.containsKey(newInstance)) {
                        LOG.log(Level.FINEST, "Ignoring msg as I am not configured to recv from " + newInstance);
                        return;
                    } else {
                        this.id2dataQue.get(newInstance).put(next);
                        break;
                    }
            }
        } catch (InterruptedException e) {
            String str = "Could not put " + next + " into the queue of " + newInstance;
            LOG.log(Level.WARNING, str, (Throwable) e);
            throw new RuntimeException(str, e);
        }
    }

    public void sync(Map<Identifier, Integer> map) {
        LOG.log(Level.FINEST, "Synching any control messages");
        while (!this.ctrlQue.isEmpty()) {
            ReefNetworkGroupCommProtos.GroupCommMessage poll = this.ctrlQue.poll();
            Identifier newInstance = this.idFac.getNewInstance(poll.getSrcid());
            int intValue = map.containsKey(newInstance) ? map.get(newInstance).intValue() : 0;
            if (poll.getType() == ReefNetworkGroupCommProtos.GroupCommMessage.Type.SourceAdd) {
                map.put(newInstance, Integer.valueOf(intValue + 1));
            } else {
                map.put(newInstance, Integer.valueOf(intValue - 1));
            }
        }
        LOG.log(Level.FINEST, "Id to life status: {0}", map);
        for (Identifier identifier : map.keySet()) {
            int intValue2 = map.get(identifier).intValue();
            if (intValue2 < 0) {
                LOG.log(Level.FINEST, "{0} is dead({1}). Removing from handler", new Object[]{identifier, Integer.valueOf(intValue2)});
                removeChild(identifier);
            } else if (intValue2 > 0) {
                LOG.log(Level.FINEST, "{0} is alive({1}). Adding to handler", new Object[]{identifier, Integer.valueOf(intValue2)});
                addChild(identifier);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <T> T get(Identifier identifier, Codec<T> codec) throws InterruptedException, NetworkException {
        LOG.log(Level.FINEST, "\t\tget from {0}", identifier);
        if (!this.id2dataQue.containsKey(identifier)) {
            LOG.log(Level.FINEST, "\t\tCan't receive from a non-child");
            throw new RuntimeException("Can't receive from a non-child");
        }
        ReefNetworkGroupCommProtos.GroupCommMessage take = this.id2dataQue.get(identifier).take();
        if (take.getType() == ReefNetworkGroupCommProtos.GroupCommMessage.Type.SourceDead) {
            LOG.log(Level.FINEST, "\t\tGot src dead msg from driver. Terminating wait and returning null");
            return null;
        }
        T t = null;
        Iterator<ReefNetworkGroupCommProtos.GroupMessageBody> it = take.getMsgsList().iterator();
        while (it.hasNext()) {
            t = codec.decode(it.next().getData().toByteArray());
        }
        return t;
    }
}
