package com.microsoft.reef.io.network.group.impl.operators.faulty;

import com.microsoft.reef.io.network.Message;
import com.microsoft.reef.io.network.group.impl.operators.faulty.BroadcastOp;
import com.microsoft.reef.io.network.group.impl.operators.faulty.ReduceOp;
import com.microsoft.reef.io.network.proto.ReefNetworkGroupCommProtos;
import com.microsoft.tang.annotations.Name;
import com.microsoft.tang.annotations.NamedParameter;
import com.microsoft.wake.EventHandler;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;

/* loaded from: input_file:com/microsoft/reef/io/network/group/impl/operators/faulty/BroadRedHandler.class */
public class BroadRedHandler implements EventHandler<Message<ReefNetworkGroupCommProtos.GroupCommMessage>> {
    private static Object ctrlLock = new Object();
    private static AtomicBoolean firstSync = new AtomicBoolean(false);
    private static CountDownLatch srcAddLatch = new CountDownLatch(2);
    private final BroadcastHandler broadHandler;
    private final ReduceHandler redHandler;

    @NamedParameter(doc = "List of Identifiers on which the handler should listen")
    /* loaded from: input_file:com/microsoft/reef/io/network/group/impl/operators/faulty/BroadRedHandler$IDs.class */
    public static class IDs implements Name<Set<String>> {
    }

    @Inject
    public BroadRedHandler(BroadcastHandler broadcastHandler, ReduceHandler reduceHandler) {
        this.broadHandler = broadcastHandler;
        this.redHandler = reduceHandler;
    }

    public void onNext(Message<ReefNetworkGroupCommProtos.GroupCommMessage> message) {
        ReefNetworkGroupCommProtos.GroupCommMessage groupCommMessage = null;
        if (message.getData().iterator().hasNext()) {
            groupCommMessage = message.getData().iterator().next();
        }
        switch (groupCommMessage.getType()) {
            case Reduce:
                this.redHandler.onNext(message);
                return;
            case Broadcast:
                this.broadHandler.onNext(message);
                return;
            case SourceAdd:
                synchronized (ctrlLock) {
                    this.redHandler.onNext(message);
                    this.broadHandler.onNext(message);
                    srcAddLatch.countDown();
                }
                return;
            case SourceDead:
                synchronized (ctrlLock) {
                    this.redHandler.onNext(message);
                    this.broadHandler.onNext(message);
                }
                return;
            default:
                return;
        }
    }

    public static void waitForSrcAdd(BroadcastOp.Sender<?> sender, ReduceOp.Receiver<?> receiver) {
        if (firstSync.compareAndSet(false, true)) {
            try {
                srcAddLatch.await();
                synchronized (ctrlLock) {
                    sender.sync();
                    receiver.sync();
                }
            } catch (InterruptedException e) {
                throw new RuntimeException("Interrupted while waiting for src add", e);
            }
        }
    }

    public static void waitForSrcAdd(BroadcastOp.Receiver<?> receiver, ReduceOp.Sender<?> sender) {
        if (firstSync.compareAndSet(false, true)) {
            try {
                srcAddLatch.await();
                synchronized (ctrlLock) {
                    receiver.sync();
                    sender.sync();
                }
            } catch (InterruptedException e) {
                throw new RuntimeException("Interrupted while waiting for src add", e);
            }
        }
    }

    public static void sync(BroadcastOp.Sender<?> sender, ReduceOp.Receiver<?> receiver) {
        synchronized (ctrlLock) {
            sender.sync();
            receiver.sync();
        }
    }

    public static void sync(BroadcastOp.Receiver<?> receiver, ReduceOp.Sender<?> sender) {
        synchronized (ctrlLock) {
            receiver.sync();
            sender.sync();
        }
    }
}
