package com.microsoft.reef.io.network.group.impl.operators.faulty;

import com.microsoft.reef.exception.evaluator.NetworkException;
import com.microsoft.reef.io.network.Connection;
import com.microsoft.reef.io.network.group.impl.operators.faulty.AllReduceConfig;
import com.microsoft.reef.io.network.group.operators.Reduce;
import com.microsoft.reef.io.network.impl.NetworkService;
import com.microsoft.reef.io.network.proto.ReefNetworkGroupCommProtos;
import com.microsoft.reef.io.network.util.Utils;
import com.microsoft.tang.annotations.Parameter;
import com.microsoft.wake.Identifier;
import com.microsoft.wake.IdentifierFactory;
import com.microsoft.wake.remote.Codec;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;

/* loaded from: input_file:com/microsoft/reef/io/network/group/impl/operators/faulty/AllReduceOp.class */
public class AllReduceOp<V> {
    private static final Logger LOG = Logger.getLogger(AllReduceOp.class.getName());
    private final Random toss = new Random();
    private final Identifier self;
    private final Identifier parent;
    private final List<Identifier> children;
    private final Codec<V> codec;
    private final Reduce.ReduceFunction<V> redFunc;
    private final AllReduceHandler handler;
    private final NetworkService<ReefNetworkGroupCommProtos.GroupCommMessage> netService;

    @Inject
    public AllReduceOp(NetworkService<ReefNetworkGroupCommProtos.GroupCommMessage> networkService, AllReduceHandler allReduceHandler, @Parameter(AllReduceConfig.DataCodec.class) Codec<V> codec, @Parameter(AllReduceConfig.ReduceFunction.class) Reduce.ReduceFunction<V> reduceFunction, @Parameter(AllReduceConfig.SelfId.class) String str, @Parameter(AllReduceConfig.ParentId.class) String str2, @Parameter(AllReduceConfig.ChildIds.class) Set<String> set, @Parameter(AllReduceConfig.IdFactory.class) IdentifierFactory identifierFactory) {
        this.netService = networkService;
        this.handler = allReduceHandler;
        this.codec = codec;
        this.redFunc = reduceFunction;
        this.parent = str2.equals("NULL") ? null : identifierFactory.getNewInstance(str2);
        this.self = str.equals("NULL") ? null : identifierFactory.getNewInstance(str);
        ArrayList arrayList = new ArrayList();
        for (String str3 : set) {
            LOG.log(Level.FINEST, "Add child ID: {0}", str3);
            if (str3.equals("NULL")) {
                break;
            } else {
                arrayList.add(identifierFactory.getNewInstance(str3));
            }
        }
        this.children = arrayList.isEmpty() ? null : arrayList;
    }

    public V apply(V v) throws NetworkFault, NetworkException, InterruptedException {
        LOG.log(Level.FINEST, "I am {0}", this.self);
        V applyLeaf = this.children == null ? applyLeaf(v) : applyMidNode(v);
        LOG.log(Level.FINEST, "{0} returns {1}", new Object[]{this.self, applyLeaf});
        return applyLeaf;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private V applyMidNode(V v) throws NetworkFault, NetworkException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(v);
        for (Identifier identifier : this.children) {
            LOG.log(Level.FINEST, "Waiting for child: {0}", identifier);
            Object obj = this.handler.get(identifier, this.codec);
            LOG.log(Level.FINEST, "Received: {0}", obj);
            if (obj != null) {
                arrayList2.add(obj);
            } else {
                LOG.log(Level.FINEST, "Marking {0} as dead", identifier);
                arrayList.add(identifier);
            }
        }
        this.children.removeAll(arrayList);
        V apply = this.redFunc.apply(arrayList2);
        LOG.log(Level.FINEST, "Local Reduced value: {0}", apply);
        if (this.parent != null) {
            LOG.log(Level.FINEST, "Sending {0} to parent: {1}", new Object[]{apply, this.parent});
            send(apply, this.parent);
            LOG.log(Level.FINEST, "Waiting for {0}", this.parent);
            Object obj2 = this.handler.get(this.parent, this.codec);
            LOG.log(Level.FINEST, "Received {0} from {1}", new Object[]{obj2, this.parent});
            if (obj2 != 0) {
                apply = obj2;
            }
        }
        for (Identifier identifier2 : this.children) {
            LOG.log(Level.FINEST, "Sending {0} to child: {1}", new Object[]{apply, identifier2});
            send(apply, identifier2);
        }
        return apply;
    }

    private V applyLeaf(V v) throws NetworkFault, NetworkException, InterruptedException {
        if (this.parent == null) {
            return v;
        }
        if (this.toss.nextFloat() < 0.7d) {
            Thread.sleep(this.toss.nextInt(100) + 1);
            LOG.log(Level.FINEST, "I am marked to terminate and throw fault");
            throw new NetworkFault();
        }
        LOG.log(Level.FINEST, "I am leaf. Sending {0} to my parent: {1}", new Object[]{v, this.parent});
        send(v, this.parent);
        LOG.log(Level.FINEST, "Waiting for my parent: {0}", this.parent);
        V v2 = (V) this.handler.get(this.parent, this.codec);
        LOG.log(Level.FINEST, "Received: {0}", v2);
        return v2 == null ? v : v2;
    }

    /* JADX WARN: Type inference failed for: r4v1, types: [byte[], byte[][]] */
    public void send(V v, Identifier identifier) throws NetworkException {
        Connection<ReefNetworkGroupCommProtos.GroupCommMessage> newConnection = this.netService.newConnection(identifier);
        newConnection.open();
        newConnection.write(Utils.bldGCM(ReefNetworkGroupCommProtos.GroupCommMessage.Type.AllReduce, this.self, identifier, new byte[]{this.codec.encode(v)}));
    }
}
