package com.microsoft.reef.io.network.group.config;

import com.microsoft.reef.exception.evaluator.NetworkException;
import com.microsoft.reef.io.network.Connection;
import com.microsoft.reef.io.network.Message;
import com.microsoft.reef.io.network.group.config.TaskTree;
import com.microsoft.reef.io.network.group.impl.GCMCodec;
import com.microsoft.reef.io.network.group.impl.operators.faulty.BroadRedHandler;
import com.microsoft.reef.io.network.group.impl.operators.faulty.BroadReduceConfig;
import com.microsoft.reef.io.network.group.impl.operators.faulty.ExceptionHandler;
import com.microsoft.reef.io.network.group.operators.Reduce;
import com.microsoft.reef.io.network.impl.MessagingTransportFactory;
import com.microsoft.reef.io.network.impl.NetworkService;
import com.microsoft.reef.io.network.impl.NetworkServiceParameters;
import com.microsoft.reef.io.network.naming.NameServerParameters;
import com.microsoft.reef.io.network.proto.ReefNetworkGroupCommProtos;
import com.microsoft.reef.io.network.util.StringIdentifierFactory;
import com.microsoft.reef.io.network.util.Utils;
import com.microsoft.reef.io.serialization.SerializableCodec;
import com.microsoft.tang.Configuration;
import com.microsoft.tang.JavaConfigurationBuilder;
import com.microsoft.tang.Tang;
import com.microsoft.tang.exceptions.BindException;
import com.microsoft.wake.ComparableIdentifier;
import com.microsoft.wake.EventHandler;
import com.microsoft.wake.Identifier;
import com.microsoft.wake.impl.LoggingEventHandler;
import com.microsoft.wake.impl.SingleThreadStage;
import com.microsoft.wake.impl.ThreadPoolStage;
import com.microsoft.wake.remote.Codec;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/microsoft/reef/io/network/group/config/BRManager.class */
public class BRManager {
    private static final Logger LOG = Logger.getLogger(BRManager.class.getName());
    private static final Tang tang = Tang.Factory.getTang();
    private final Configuration reduceBaseConf;
    private final Class<? extends Codec<?>> brDataCodecClass;
    private final Class<? extends Codec<?>> redDataCodecClass;
    private final Class<? extends Reduce.ReduceFunction<?>> redFuncClass;
    private final String nameServiceAddr;
    private final int nameServicePort;
    private final NetworkService<ReefNetworkGroupCommProtos.GroupCommMessage> ns;
    private final ThreadPoolStage<ReefNetworkGroupCommProtos.GroupCommMessage> senderStage;
    private final StringIdentifierFactory idFac = new StringIdentifierFactory();
    private final ComparableIdentifier driverId = this.idFac.getNewInstance("driver");
    private final ConcurrentHashMap<Identifier, BlockingQueue<ReefNetworkGroupCommProtos.GroupCommMessage>> srcAdds = new ConcurrentHashMap<>();
    private TaskTree tree = null;

    public BRManager(Class<? extends Codec<?>> cls, Class<? extends Codec<?>> cls2, Class<? extends Reduce.ReduceFunction<?>> cls3, String str, int i) throws BindException {
        this.brDataCodecClass = cls;
        this.redDataCodecClass = cls2;
        this.redFuncClass = cls3;
        this.nameServiceAddr = str;
        this.nameServicePort = i;
        this.reduceBaseConf = tang.newConfigurationBuilder().bindNamedParameter(BroadReduceConfig.BroadcastConfig.DataCodec.class, this.brDataCodecClass).bindNamedParameter(BroadReduceConfig.ReduceConfig.DataCodec.class, this.redDataCodecClass).bindNamedParameter(BroadReduceConfig.ReduceConfig.ReduceFunction.class, this.redFuncClass).bindNamedParameter(NetworkServiceParameters.NetworkServiceCodec.class, GCMCodec.class).bindNamedParameter(NetworkServiceParameters.NetworkServiceHandler.class, BroadRedHandler.class).bindNamedParameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class, ExceptionHandler.class).bindNamedParameter(NameServerParameters.NameServerAddr.class, str).bindNamedParameter(NameServerParameters.NameServerPort.class, Integer.toString(i)).build();
        this.ns = new NetworkService<>(this.idFac, 0, str, i, new GCMCodec(), new MessagingTransportFactory(), new EventHandler<Message<ReefNetworkGroupCommProtos.GroupCommMessage>>() { // from class: com.microsoft.reef.io.network.group.config.BRManager.1
            static final /* synthetic */ boolean $assertionsDisabled;

            public void onNext(Message<ReefNetworkGroupCommProtos.GroupCommMessage> message) {
                ReefNetworkGroupCommProtos.GroupCommMessage next = message.getData().iterator().next();
                if (!$assertionsDisabled && next.getType() != ReefNetworkGroupCommProtos.GroupCommMessage.Type.SourceAdd) {
                    throw new AssertionError();
                }
                new SingleThreadStage(new EventHandler<ReefNetworkGroupCommProtos.GroupCommMessage>() { // from class: com.microsoft.reef.io.network.group.config.BRManager.1.1
                    public void onNext(ReefNetworkGroupCommProtos.GroupCommMessage groupCommMessage) {
                        SerializableCodec serializableCodec = new SerializableCodec();
                        Iterator<ReefNetworkGroupCommProtos.GroupMessageBody> it = groupCommMessage.getMsgsList().iterator();
                        while (it.hasNext()) {
                            Set set = (Set) serializableCodec.decode(it.next().getData().toByteArray());
                            BRManager.LOG.log(Level.FINEST, "Received req to send srcAdd for {0}", set);
                            Iterator it2 = set.iterator();
                            while (it2.hasNext()) {
                                Identifier newInstance = BRManager.this.idFac.getNewInstance("ComputeGradientTask" + ((Integer) it2.next()).intValue());
                                BRManager.this.srcAdds.putIfAbsent(newInstance, new LinkedBlockingQueue(1));
                                BlockingQueue blockingQueue = (BlockingQueue) BRManager.this.srcAdds.get(newInstance);
                                try {
                                    BRManager.LOG.log(Level.FINEST, "Waiting for srcAdd msg from: {0}", newInstance);
                                    ReefNetworkGroupCommProtos.GroupCommMessage groupCommMessage2 = (ReefNetworkGroupCommProtos.GroupCommMessage) blockingQueue.take();
                                    BRManager.LOG.log(Level.FINEST, "Found a srcAdd msg from: {0}", newInstance);
                                    BRManager.this.senderStage.onNext(groupCommMessage2);
                                } catch (InterruptedException e) {
                                    BRManager.LOG.log(Level.WARNING, "Interrupted wait for: " + newInstance, (Throwable) e);
                                    throw new RuntimeException(e);
                                }
                            }
                        }
                    }
                }, 5).onNext(next);
            }

            static {
                $assertionsDisabled = !BRManager.class.desiredAssertionStatus();
            }
        }, new LoggingEventHandler());
        this.ns.registerId(this.driverId);
        this.senderStage = new ThreadPoolStage<>("SrcCtrlMsgSender", new EventHandler<ReefNetworkGroupCommProtos.GroupCommMessage>() { // from class: com.microsoft.reef.io.network.group.config.BRManager.2
            public void onNext(ReefNetworkGroupCommProtos.GroupCommMessage groupCommMessage) {
                Identifier newInstance = BRManager.this.idFac.getNewInstance(groupCommMessage.getDestid());
                if (BRManager.this.tree.getStatus((ComparableIdentifier) newInstance) != TaskTree.Status.SCHEDULED) {
                    return;
                }
                Connection newConnection = BRManager.this.ns.newConnection(newInstance);
                try {
                    newConnection.open();
                    BRManager.LOG.log(Level.FINEST, "Sending source ctrl msg {0} for {1} to {2}", new Object[]{groupCommMessage.getType(), groupCommMessage.getSrcid(), newInstance});
                    newConnection.write(groupCommMessage);
                } catch (NetworkException e) {
                    BRManager.LOG.log(Level.WARNING, "Unable to send ctrl task msg to parent " + newInstance, e);
                    throw new RuntimeException("Unable to send ctrl task msg to parent " + newInstance, e);
                }
            }
        }, 5);
    }

    public void close() throws Exception {
        this.senderStage.close();
        this.ns.close();
    }

    public int childrenSupported(ComparableIdentifier comparableIdentifier) {
        return this.tree.childrenSupported(comparableIdentifier);
    }

    public synchronized void add(ComparableIdentifier comparableIdentifier) {
        if (this.tree == null) {
            LOG.log(Level.FINEST, "Adding controller");
            this.tree = new TaskTreeImpl();
        } else {
            LOG.log(Level.FINEST, "Adding Compute task. First updating tree");
        }
        this.tree.add(comparableIdentifier);
    }

    public Configuration getControllerContextConf(ComparableIdentifier comparableIdentifier) throws BindException {
        JavaConfigurationBuilder newConfigurationBuilder = tang.newConfigurationBuilder(new Configuration[]{this.reduceBaseConf});
        newConfigurationBuilder.addConfiguration(createNetworkServiceConf(this.nameServiceAddr, this.nameServicePort, this.tree.neighbors(comparableIdentifier), 0));
        return newConfigurationBuilder.build();
    }

    public Configuration getControllerActConf(ComparableIdentifier comparableIdentifier) throws BindException {
        JavaConfigurationBuilder newConfigurationBuilder = tang.newConfigurationBuilder();
        newConfigurationBuilder.bindNamedParameter(BroadReduceConfig.ReduceConfig.Receiver.SelfId.class, comparableIdentifier.toString());
        newConfigurationBuilder.bindNamedParameter(BroadReduceConfig.BroadcastConfig.Sender.SelfId.class, comparableIdentifier.toString());
        for (ComparableIdentifier comparableIdentifier2 : this.tree.scheduledChildren(comparableIdentifier)) {
            newConfigurationBuilder.bindSetEntry(BroadReduceConfig.ReduceConfig.Receiver.ChildIds.class, comparableIdentifier2.toString());
            newConfigurationBuilder.bindSetEntry(BroadReduceConfig.BroadcastConfig.Sender.ChildIds.class, comparableIdentifier2.toString());
        }
        return newConfigurationBuilder.build();
    }

    public Configuration getComputeContextConf(ComparableIdentifier comparableIdentifier) throws BindException {
        JavaConfigurationBuilder newConfigurationBuilder = tang.newConfigurationBuilder(new Configuration[]{this.reduceBaseConf});
        newConfigurationBuilder.addConfiguration(createNetworkServiceConf(this.nameServiceAddr, this.nameServicePort, this.tree.neighbors(comparableIdentifier), 0));
        return newConfigurationBuilder.build();
    }

    public Configuration getComputeActConf(ComparableIdentifier comparableIdentifier) throws BindException {
        JavaConfigurationBuilder newConfigurationBuilder = tang.newConfigurationBuilder();
        newConfigurationBuilder.bindNamedParameter(BroadReduceConfig.ReduceConfig.Sender.SelfId.class, comparableIdentifier.toString());
        newConfigurationBuilder.bindNamedParameter(BroadReduceConfig.BroadcastConfig.Receiver.SelfId.class, comparableIdentifier.toString());
        ComparableIdentifier parent = this.tree.parent(comparableIdentifier);
        if (parent != null && TaskTree.Status.SCHEDULED == this.tree.getStatus(parent)) {
            newConfigurationBuilder.bindNamedParameter(BroadReduceConfig.ReduceConfig.Sender.ParentId.class, this.tree.parent(comparableIdentifier).toString());
            newConfigurationBuilder.bindNamedParameter(BroadReduceConfig.BroadcastConfig.Receiver.ParentId.class, this.tree.parent(comparableIdentifier).toString());
        }
        for (ComparableIdentifier comparableIdentifier2 : this.tree.scheduledChildren(comparableIdentifier)) {
            newConfigurationBuilder.bindSetEntry(BroadReduceConfig.ReduceConfig.Sender.ChildIds.class, comparableIdentifier2.toString());
            newConfigurationBuilder.bindSetEntry(BroadReduceConfig.BroadcastConfig.Receiver.ChildIds.class, comparableIdentifier2.toString());
        }
        return newConfigurationBuilder.build();
    }

    private Configuration createHandlerConf(List<ComparableIdentifier> list) throws BindException {
        JavaConfigurationBuilder newConfigurationBuilder = tang.newConfigurationBuilder();
        Iterator<ComparableIdentifier> it = list.iterator();
        while (it.hasNext()) {
            newConfigurationBuilder.bindSetEntry(BroadRedHandler.IDs.class, it.next().toString());
        }
        return newConfigurationBuilder.build();
    }

    private Configuration createNetworkServiceConf(String str, int i, List<ComparableIdentifier> list, int i2) throws BindException {
        JavaConfigurationBuilder bindNamedParameter = tang.newConfigurationBuilder().bindNamedParameter(NetworkServiceParameters.NetworkServicePort.class, Integer.toString(i2));
        bindNamedParameter.addConfiguration(createHandlerConf(list));
        return bindNamedParameter.build();
    }

    public void remove(ComparableIdentifier comparableIdentifier) {
        this.tree.remove(comparableIdentifier);
    }

    public synchronized void schedule(ComparableIdentifier comparableIdentifier, boolean z) {
        if (TaskTree.Status.SCHEDULED == this.tree.getStatus(comparableIdentifier)) {
            return;
        }
        this.tree.setStatus(comparableIdentifier, TaskTree.Status.SCHEDULED);
        List<ComparableIdentifier> scheduledNeighbors = this.tree.scheduledNeighbors(comparableIdentifier);
        if (scheduledNeighbors.isEmpty()) {
            ComparableIdentifier parent = this.tree.parent(comparableIdentifier);
            if (this.tree.parent(comparableIdentifier) != null) {
                LOG.log(Level.FINEST, "Parent {0} was alive while submitting.\nWhile scheduling found that parent is not scheduled.\nSending Source Dead message.", parent);
                sendSrcDeadMsg(parent, comparableIdentifier);
                return;
            }
            return;
        }
        for (ComparableIdentifier comparableIdentifier2 : scheduledNeighbors) {
            LOG.log(Level.FINEST, "Adding {0} as neighbor of {1}", new Object[]{comparableIdentifier, comparableIdentifier2});
            sendSrcAddMsg(comparableIdentifier, comparableIdentifier2, z);
        }
    }

    /* JADX WARN: Type inference failed for: r3v1, types: [byte[], byte[][]] */
    private void sendSrcAddMsg(ComparableIdentifier comparableIdentifier, ComparableIdentifier comparableIdentifier2, boolean z) {
        ReefNetworkGroupCommProtos.GroupCommMessage bldGCM = Utils.bldGCM(ReefNetworkGroupCommProtos.GroupCommMessage.Type.SourceAdd, comparableIdentifier, comparableIdentifier2, new byte[]{new byte[0]});
        if (!z) {
            this.senderStage.onNext(bldGCM);
            return;
        }
        LOG.log(Level.FINEST, "SrcAdd from: {0} queued up", comparableIdentifier);
        this.srcAdds.putIfAbsent(comparableIdentifier, new LinkedBlockingQueue(1));
        this.srcAdds.get(comparableIdentifier).add(bldGCM);
    }

    public List<ComparableIdentifier> tasksToSchedule(ComparableIdentifier comparableIdentifier) {
        List<ComparableIdentifier> children = this.tree.children(comparableIdentifier);
        ArrayList arrayList = new ArrayList();
        for (ComparableIdentifier comparableIdentifier2 : children) {
            if (TaskTree.Status.COMPLETED == this.tree.getStatus(comparableIdentifier2)) {
                arrayList.add(comparableIdentifier2);
            }
        }
        children.removeAll(arrayList);
        return children;
    }

    public synchronized void unschedule(ComparableIdentifier comparableIdentifier) {
        LOG.log(Level.FINEST, "BRManager unscheduling: {0}", comparableIdentifier);
        this.tree.setStatus(comparableIdentifier, TaskTree.Status.UNSCHEDULED);
        Iterator<ComparableIdentifier> it = this.tree.scheduledNeighbors(comparableIdentifier).iterator();
        while (it.hasNext()) {
            sendSrcDeadMsg(comparableIdentifier, it.next());
        }
    }

    /* JADX WARN: Type inference failed for: r3v1, types: [byte[], byte[][]] */
    private void sendSrcDeadMsg(ComparableIdentifier comparableIdentifier, ComparableIdentifier comparableIdentifier2) {
        this.senderStage.onNext(Utils.bldGCM(ReefNetworkGroupCommProtos.GroupCommMessage.Type.SourceDead, comparableIdentifier, comparableIdentifier2, new byte[]{new byte[0]}));
    }

    public boolean canReschedule(ComparableIdentifier comparableIdentifier) {
        ComparableIdentifier parent = this.tree.parent(comparableIdentifier);
        return parent != null && TaskTree.Status.SCHEDULED == this.tree.getStatus(parent);
    }

    public void complete(ComparableIdentifier comparableIdentifier) {
        this.tree.setStatus(comparableIdentifier, TaskTree.Status.COMPLETED);
    }
}
