package com.microsoft.reef.io.network.impl;

import com.microsoft.reef.io.Tuple;
import com.microsoft.reef.io.naming.Naming;
import com.microsoft.reef.io.network.Connection;
import com.microsoft.reef.io.network.ConnectionFactory;
import com.microsoft.reef.io.network.Message;
import com.microsoft.reef.io.network.TransportFactory;
import com.microsoft.reef.io.network.impl.NetworkServiceParameters;
import com.microsoft.reef.io.network.naming.NameCache;
import com.microsoft.reef.io.network.naming.NameClient;
import com.microsoft.reef.io.network.naming.NameLookupClient;
import com.microsoft.reef.io.network.naming.NameServerParameters;
import com.microsoft.tang.Injector;
import com.microsoft.tang.Tang;
import com.microsoft.tang.annotations.Parameter;
import com.microsoft.tang.exceptions.InjectionException;
import com.microsoft.wake.EStage;
import com.microsoft.wake.EventHandler;
import com.microsoft.wake.Identifier;
import com.microsoft.wake.IdentifierFactory;
import com.microsoft.wake.Stage;
import com.microsoft.wake.impl.LoggingEventHandler;
import com.microsoft.wake.impl.SingleThreadStage;
import com.microsoft.wake.remote.Codec;
import com.microsoft.wake.remote.transport.LinkListener;
import com.microsoft.wake.remote.transport.Transport;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;

/* loaded from: input_file:com/microsoft/reef/io/network/impl/NetworkService.class */
public final class NetworkService<T> implements Stage, ConnectionFactory<T> {
    private static final Logger LOG = Logger.getLogger(NetworkService.class.getName());
    private static final int retryCount;
    private static final int retryTimeout;
    private Identifier myId;
    private final IdentifierFactory factory;
    private final Codec<T> codec;
    private final Transport transport;
    private final NameClient nameClient;
    private final ConcurrentMap<Identifier, Connection<T>> idToConnMap;
    private final EStage<Tuple<Identifier, InetSocketAddress>> nameServiceRegisteringStage;
    private final EStage<Identifier> nameServiceUnregisteringStage;

    public NetworkService(IdentifierFactory identifierFactory, int i, String str, int i2, Codec<T> codec, TransportFactory transportFactory, EventHandler<Message<T>> eventHandler, EventHandler<Exception> eventHandler2) {
        this(identifierFactory, i, str, i2, retryCount, retryTimeout, codec, transportFactory, eventHandler, eventHandler2);
    }

    @Inject
    public NetworkService(@Parameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class) IdentifierFactory identifierFactory, @Parameter(NetworkServiceParameters.NetworkServicePort.class) int i, @Parameter(NameServerParameters.NameServerAddr.class) String str, @Parameter(NameServerParameters.NameServerPort.class) int i2, @Parameter(NameLookupClient.RetryCount.class) int i3, @Parameter(NameLookupClient.RetryTimeout.class) int i4, @Parameter(NetworkServiceParameters.NetworkServiceCodec.class) Codec<T> codec, @Parameter(NetworkServiceParameters.NetworkServiceTransportFactory.class) TransportFactory transportFactory, @Parameter(NetworkServiceParameters.NetworkServiceHandler.class) EventHandler<Message<T>> eventHandler, @Parameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class) EventHandler<Exception> eventHandler2) {
        this.idToConnMap = new ConcurrentHashMap();
        this.factory = identifierFactory;
        this.codec = codec;
        this.transport = transportFactory.create(i, new LoggingEventHandler(), new MessageHandler(eventHandler, codec, identifierFactory), eventHandler2);
        this.nameClient = new NameClient(str, i2, identifierFactory, i3, i4, new NameCache(30000L));
        this.nameServiceRegisteringStage = new SingleThreadStage("NameServiceRegisterer", new EventHandler<Tuple<Identifier, InetSocketAddress>>() { // from class: com.microsoft.reef.io.network.impl.NetworkService.1
            public void onNext(Tuple<Identifier, InetSocketAddress> tuple) {
                try {
                    NetworkService.this.nameClient.register((Identifier) tuple.getKey(), (InetSocketAddress) tuple.getValue());
                    NetworkService.LOG.log(Level.FINEST, "Registered {0} with nameservice", tuple.getKey());
                } catch (Exception e) {
                    String str2 = "Unable to register " + tuple.getKey() + "with name service";
                    NetworkService.LOG.log(Level.WARNING, str2, (Throwable) e);
                    throw new RuntimeException(str2, e);
                }
            }
        }, 5);
        this.nameServiceUnregisteringStage = new SingleThreadStage("NameServiceRegisterer", new EventHandler<Identifier>() { // from class: com.microsoft.reef.io.network.impl.NetworkService.2
            public void onNext(Identifier identifier) {
                try {
                    NetworkService.this.nameClient.unregister(identifier);
                    NetworkService.LOG.log(Level.FINEST, "Unregistered {0} with nameservice", identifier);
                } catch (Exception e) {
                    String str2 = "Unable to unregister " + identifier + " with name service";
                    NetworkService.LOG.log(Level.WARNING, str2, (Throwable) e);
                    throw new RuntimeException(str2, e);
                }
            }
        }, 5);
    }

    public void registerId(Identifier identifier) {
        this.myId = identifier;
        Tuple tuple = new Tuple(identifier, (InetSocketAddress) this.transport.getLocalAddress());
        LOG.log(Level.FINEST, "Binding {0} to NetworkService@({1})", new Object[]{tuple.getKey(), tuple.getValue()});
        this.nameServiceRegisteringStage.onNext(tuple);
    }

    public void unregisterId(Identifier identifier) {
        this.myId = null;
        LOG.log(Level.FINEST, "Unbinding {0} to NetworkService@({1})", new Object[]{identifier, this.transport.getLocalAddress()});
        this.nameServiceUnregisteringStage.onNext(identifier);
    }

    public Identifier getMyId() {
        return this.myId;
    }

    public Transport getTransport() {
        return this.transport;
    }

    public Codec<T> getCodec() {
        return this.codec;
    }

    public Naming getNameClient() {
        return this.nameClient;
    }

    public IdentifierFactory getIdentifierFactory() {
        return this.factory;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void remove(Identifier identifier) {
        this.idToConnMap.remove(identifier);
    }

    public void close() throws Exception {
        LOG.log(Level.INFO, "Shutting down");
        this.transport.close();
        this.nameClient.close();
    }

    @Override // com.microsoft.reef.io.network.ConnectionFactory
    public Connection<T> newConnection(Identifier identifier) {
        if (this.myId == null) {
            throw new RuntimeException("Trying to establish a connection from a Network Service that is not bound to any task");
        }
        Connection<T> connection = this.idToConnMap.get(identifier);
        if (connection != null) {
            return connection;
        }
        NSConnection nSConnection = new NSConnection(this.myId, identifier, new LinkListener<T>() { // from class: com.microsoft.reef.io.network.impl.NetworkService.3
            public void messageReceived(Object obj) {
            }
        }, this);
        Connection<T> putIfAbsent = this.idToConnMap.putIfAbsent(identifier, nSConnection);
        return putIfAbsent == null ? nSConnection : putIfAbsent;
    }

    static {
        try {
            Injector newInjector = Tang.Factory.getTang().newInjector();
            retryCount = ((Integer) newInjector.getNamedInstance(NameLookupClient.RetryCount.class)).intValue();
            retryTimeout = ((Integer) newInjector.getNamedInstance(NameLookupClient.RetryTimeout.class)).intValue();
        } catch (InjectionException e) {
            LOG.log(Level.SEVERE, "Exception while trying to find default values for retryCount & Timeout", e);
            throw new RuntimeException("Exception while trying to find default values for retryCount & Timeout", e);
        }
    }
}
