package com.microsoft.reef.services.network;

import com.microsoft.reef.exception.evaluator.NetworkException;
import com.microsoft.reef.io.network.Connection;
import com.microsoft.reef.io.network.Message;
import com.microsoft.reef.io.network.impl.MessagingTransportFactory;
import com.microsoft.reef.io.network.impl.NetworkService;
import com.microsoft.reef.io.network.naming.NameServer;
import com.microsoft.reef.io.network.util.StringIdentifierFactory;
import com.microsoft.reef.services.network.util.Monitor;
import com.microsoft.reef.services.network.util.StringCodec;
import com.microsoft.wake.EventHandler;
import com.microsoft.wake.remote.NetUtils;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

/* loaded from: input_file:com/microsoft/reef/services/network/NetworkServiceTest.class */
public class NetworkServiceTest {
    private static final Logger LOG = Logger.getLogger(NetworkServiceTest.class.getName());

    @Rule
    public TestName name = new TestName();

    /* loaded from: input_file:com/microsoft/reef/services/network/NetworkServiceTest$ExceptionHandler.class */
    class ExceptionHandler implements EventHandler<Exception> {
        ExceptionHandler() {
        }

        public void onNext(Exception exc) {
            System.err.println(exc);
        }
    }

    /* loaded from: input_file:com/microsoft/reef/services/network/NetworkServiceTest$MessageHandler.class */
    class MessageHandler<T> implements EventHandler<Message<T>> {
        private final String name;
        private final int expected;
        private final Monitor monitor;
        private AtomicInteger count = new AtomicInteger(0);

        MessageHandler(String str, Monitor monitor, int i) {
            this.name = str;
            this.monitor = monitor;
            this.expected = i;
        }

        public void onNext(Message<T> message) {
            this.count.incrementAndGet();
            for (T t : message.getData()) {
            }
            if (this.count.get() == this.expected) {
                this.monitor.mnotify();
            }
        }
    }

    @Test
    public void testMessagingNetworkService() throws Exception {
        LOG.log(Level.FINEST, this.name.getMethodName());
        StringIdentifierFactory stringIdentifierFactory = new StringIdentifierFactory();
        String localAddress = NetUtils.getLocalAddress();
        NameServer nameServer = new NameServer(0, stringIdentifierFactory);
        int port = nameServer.getPort();
        Monitor monitor = new Monitor();
        LOG.log(Level.FINEST, "=== Test network service receiver start");
        NetworkService networkService = new NetworkService(stringIdentifierFactory, 0, localAddress, port, new StringCodec(), new MessagingTransportFactory(), new MessageHandler("task2", monitor, 10), new ExceptionHandler());
        networkService.registerId(stringIdentifierFactory.getNewInstance("task2"));
        nameServer.register(stringIdentifierFactory.getNewInstance("task2"), new InetSocketAddress(localAddress, networkService.getTransport().getListeningPort()));
        LOG.log(Level.FINEST, "=== Test network service sender start");
        NetworkService networkService2 = new NetworkService(stringIdentifierFactory, 0, localAddress, port, new StringCodec(), new MessagingTransportFactory(), new MessageHandler("task1", null, 0), new ExceptionHandler());
        networkService2.registerId(stringIdentifierFactory.getNewInstance("task1"));
        nameServer.register(stringIdentifierFactory.getNewInstance("task1"), new InetSocketAddress(localAddress, networkService2.getTransport().getListeningPort()));
        Connection newConnection = networkService2.newConnection(stringIdentifierFactory.getNewInstance("task2"));
        try {
            newConnection.open();
            for (int i = 0; i < 10; i++) {
                newConnection.write("hello! " + i);
            }
            monitor.mwait();
        } catch (NetworkException e) {
            e.printStackTrace();
        }
        newConnection.close();
        networkService2.close();
        networkService.close();
        nameServer.close();
    }

    @Test
    public void testMessagingNetworkServiceRate() throws Exception {
        LOG.log(Level.FINEST, this.name.getMethodName());
        StringIdentifierFactory stringIdentifierFactory = new StringIdentifierFactory();
        String localAddress = NetUtils.getLocalAddress();
        NameServer nameServer = new NameServer(0, stringIdentifierFactory);
        int port = nameServer.getPort();
        for (int i : new int[]{1, 16, 32, 64, 512, 65536, 1048576}) {
            int max = 300000 / Math.max(1, i / 512);
            Monitor monitor = new Monitor();
            LOG.log(Level.FINEST, "=== Test network service receiver start");
            NetworkService networkService = new NetworkService(stringIdentifierFactory, 0, localAddress, port, new StringCodec(), new MessagingTransportFactory(), new MessageHandler("task2", monitor, max), new ExceptionHandler());
            networkService.registerId(stringIdentifierFactory.getNewInstance("task2"));
            nameServer.register(stringIdentifierFactory.getNewInstance("task2"), new InetSocketAddress(localAddress, networkService.getTransport().getListeningPort()));
            LOG.log(Level.FINEST, "=== Test network service sender start");
            NetworkService networkService2 = new NetworkService(stringIdentifierFactory, 0, localAddress, port, new StringCodec(), new MessagingTransportFactory(), new MessageHandler("task1", null, 0), new ExceptionHandler());
            networkService2.registerId(stringIdentifierFactory.getNewInstance("task1"));
            nameServer.register(stringIdentifierFactory.getNewInstance("task1"), new InetSocketAddress(localAddress, networkService2.getTransport().getListeningPort()));
            Connection newConnection = networkService2.newConnection(stringIdentifierFactory.getNewInstance("task2"));
            StringBuilder sb = new StringBuilder();
            for (int i2 = 0; i2 < i; i2++) {
                sb.append("1");
            }
            String sb2 = sb.toString();
            long currentTimeMillis = System.currentTimeMillis();
            for (int i3 = 0; i3 < max; i3++) {
                try {
                    newConnection.open();
                    newConnection.write(sb2);
                } catch (NetworkException e) {
                    e.printStackTrace();
                }
            }
            monitor.mwait();
            double currentTimeMillis2 = (System.currentTimeMillis() - currentTimeMillis) / 1000.0d;
            LOG.log(Level.FINEST, "size: " + i + "; messages/s: " + (max / currentTimeMillis2) + " bandwidth(bytes/s): " + (((max * 2.0d) * i) / currentTimeMillis2));
            newConnection.close();
            networkService2.close();
            networkService.close();
        }
        nameServer.close();
    }

    @Test
    public void testMessagingNetworkServiceRateDisjoint() throws Exception {
        LOG.log(Level.FINEST, this.name.getMethodName());
        final StringIdentifierFactory stringIdentifierFactory = new StringIdentifierFactory();
        final String localAddress = NetUtils.getLocalAddress();
        final NameServer nameServer = new NameServer(0, stringIdentifierFactory);
        final int port = nameServer.getPort();
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        final int max = 300000 / Math.max(1, 3);
        int i = max * 4;
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        for (int i2 = 0; i2 < 4; i2++) {
            final int i3 = i2;
            newCachedThreadPool.submit(new Runnable() { // from class: com.microsoft.reef.services.network.NetworkServiceTest.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        Monitor monitor = new Monitor();
                        NetworkServiceTest.LOG.log(Level.FINEST, "=== Test network service receiver start");
                        String str = "task2-" + i3;
                        NetworkService networkService = new NetworkService(stringIdentifierFactory, 0, localAddress, port, new StringCodec(), new MessagingTransportFactory(), new MessageHandler(str, monitor, max), new ExceptionHandler());
                        networkService.registerId(stringIdentifierFactory.getNewInstance(str));
                        nameServer.register(stringIdentifierFactory.getNewInstance(str), new InetSocketAddress(localAddress, networkService.getTransport().getListeningPort()));
                        NetworkServiceTest.LOG.log(Level.FINEST, "=== Test network service sender start");
                        String str2 = "task1-" + i3;
                        NetworkService networkService2 = new NetworkService(stringIdentifierFactory, 0, localAddress, port, new StringCodec(), new MessagingTransportFactory(), new MessageHandler(str2, null, 0), new ExceptionHandler());
                        networkService2.registerId(stringIdentifierFactory.getNewInstance(str2));
                        nameServer.register(stringIdentifierFactory.getNewInstance(str2), new InetSocketAddress(localAddress, networkService2.getTransport().getListeningPort()));
                        Connection newConnection = networkService2.newConnection(stringIdentifierFactory.getNewInstance(str));
                        StringBuilder sb = new StringBuilder();
                        for (int i4 = 0; i4 < 2000; i4++) {
                            sb.append("1");
                        }
                        String sb2 = sb.toString();
                        for (int i5 = 0; i5 < max; i5++) {
                            try {
                                newConnection.open();
                                newConnection.write(sb2);
                            } catch (NetworkException e) {
                                e.printStackTrace();
                            }
                        }
                        monitor.mwait();
                        newConnection.close();
                        networkService2.close();
                        networkService.close();
                    } catch (Exception e2) {
                        e2.printStackTrace();
                    }
                }
            });
        }
        long currentTimeMillis = System.currentTimeMillis();
        Object obj = new Object();
        for (int i4 = 0; i4 < 4; i4++) {
            linkedBlockingQueue.add(obj);
        }
        newCachedThreadPool.shutdown();
        newCachedThreadPool.awaitTermination(100L, TimeUnit.SECONDS);
        double currentTimeMillis2 = (System.currentTimeMillis() - currentTimeMillis) / 1000.0d;
        LOG.log(Level.FINEST, "size: 2000; messages/s: " + (i / currentTimeMillis2) + " bandwidth(bytes/s): " + (((i * 2.0d) * 2000.0d) / currentTimeMillis2));
        nameServer.close();
    }

    @Test
    public void testMultithreadedSharedConnMessagingNetworkServiceRate() throws Exception {
        LOG.log(Level.FINEST, this.name.getMethodName());
        StringIdentifierFactory stringIdentifierFactory = new StringIdentifierFactory();
        String localAddress = NetUtils.getLocalAddress();
        NameServer nameServer = new NameServer(0, stringIdentifierFactory);
        int port = nameServer.getPort();
        for (int i : new int[]{2000}) {
            final int max = 300000 / Math.max(1, i / 512);
            int i2 = max * 2;
            Monitor monitor = new Monitor();
            LOG.log(Level.FINEST, "=== Test network service receiver start");
            NetworkService networkService = new NetworkService(stringIdentifierFactory, 0, localAddress, port, new StringCodec(), new MessagingTransportFactory(), new MessageHandler("task2", monitor, i2), new ExceptionHandler());
            networkService.registerId(stringIdentifierFactory.getNewInstance("task2"));
            nameServer.register(stringIdentifierFactory.getNewInstance("task2"), new InetSocketAddress(localAddress, networkService.getTransport().getListeningPort()));
            LOG.log(Level.FINEST, "=== Test network service sender start");
            NetworkService networkService2 = new NetworkService(stringIdentifierFactory, 0, localAddress, port, new StringCodec(), new MessagingTransportFactory(), new MessageHandler("task1", null, 0), new ExceptionHandler());
            networkService2.registerId(stringIdentifierFactory.getNewInstance("task1"));
            nameServer.register(stringIdentifierFactory.getNewInstance("task1"), new InetSocketAddress(localAddress, networkService2.getTransport().getListeningPort()));
            final Connection newConnection = networkService2.newConnection(stringIdentifierFactory.getNewInstance("task2"));
            newConnection.open();
            StringBuilder sb = new StringBuilder();
            for (int i3 = 0; i3 < i; i3++) {
                sb.append("1");
            }
            final String sb2 = sb.toString();
            ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
            long currentTimeMillis = System.currentTimeMillis();
            for (int i4 = 0; i4 < 2; i4++) {
                newCachedThreadPool.submit(new Runnable() { // from class: com.microsoft.reef.services.network.NetworkServiceTest.2
                    @Override // java.lang.Runnable
                    public void run() {
                        for (int i5 = 0; i5 < max; i5++) {
                            try {
                                newConnection.write(sb2);
                            } catch (NetworkException e) {
                                e.printStackTrace();
                                return;
                            }
                        }
                    }
                });
            }
            newCachedThreadPool.shutdown();
            newCachedThreadPool.awaitTermination(30L, TimeUnit.SECONDS);
            monitor.mwait();
            double currentTimeMillis2 = (System.currentTimeMillis() - currentTimeMillis) / 1000.0d;
            LOG.log(Level.FINEST, "size: " + i + "; messages/s: " + (i2 / currentTimeMillis2) + " bandwidth(bytes/s): " + (((i2 * 2.0d) * i) / currentTimeMillis2));
            newConnection.close();
            networkService2.close();
            networkService.close();
        }
        nameServer.close();
    }

    @Test
    public void testMessagingNetworkServiceBatchingRate() throws Exception {
        LOG.log(Level.FINEST, this.name.getMethodName());
        StringIdentifierFactory stringIdentifierFactory = new StringIdentifierFactory();
        String localAddress = NetUtils.getLocalAddress();
        NameServer nameServer = new NameServer(0, stringIdentifierFactory);
        int port = nameServer.getPort();
        for (int i : new int[]{32, 64, 512}) {
            int max = 300 / Math.max(1, i / 512);
            Monitor monitor = new Monitor();
            LOG.log(Level.FINEST, "=== Test network service receiver start");
            NetworkService networkService = new NetworkService(stringIdentifierFactory, 0, localAddress, port, new StringCodec(), new MessagingTransportFactory(), new MessageHandler("task2", monitor, max), new ExceptionHandler());
            networkService.registerId(stringIdentifierFactory.getNewInstance("task2"));
            nameServer.register(stringIdentifierFactory.getNewInstance("task2"), new InetSocketAddress(localAddress, networkService.getTransport().getListeningPort()));
            LOG.log(Level.FINEST, "=== Test network service sender start");
            NetworkService networkService2 = new NetworkService(stringIdentifierFactory, 0, localAddress, port, new StringCodec(), new MessagingTransportFactory(), new MessageHandler("task1", null, 0), new ExceptionHandler());
            networkService2.registerId(stringIdentifierFactory.getNewInstance("task1"));
            nameServer.register(stringIdentifierFactory.getNewInstance("task1"), new InetSocketAddress(localAddress, networkService2.getTransport().getListeningPort()));
            Connection newConnection = networkService2.newConnection(stringIdentifierFactory.getNewInstance("task2"));
            StringBuilder sb = new StringBuilder();
            for (int i2 = 0; i2 < i; i2++) {
                sb.append("1");
            }
            String sb2 = sb.toString();
            long currentTimeMillis = System.currentTimeMillis();
            for (int i3 = 0; i3 < max; i3++) {
                try {
                    StringBuilder sb3 = new StringBuilder();
                    for (int i4 = 0; i4 < 1048576 / i; i4++) {
                        sb3.append(sb2);
                    }
                    newConnection.open();
                    newConnection.write(sb3.toString());
                } catch (NetworkException e) {
                    e.printStackTrace();
                }
            }
            monitor.mwait();
            double currentTimeMillis2 = (System.currentTimeMillis() - currentTimeMillis) / 1000.0d;
            long j = (max * 1048576) / i;
            LOG.log(Level.FINEST, "size: " + i + "; messages/s: " + (j / currentTimeMillis2) + " bandwidth(bytes/s): " + (((j * 2.0d) * i) / currentTimeMillis2));
            newConnection.close();
            networkService2.close();
            networkService.close();
        }
        nameServer.close();
    }
}
