package eu.stratosphere.nephele.rpc;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.minlog.Log;
import eu.stratosphere.util.KryoUtil;
import eu.stratosphere.util.StringUtils;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.DatagramPacket;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:eu/stratosphere/nephele/rpc/RPCService.class */
public final class RPCService {
    private static final int DEFAULT_NUM_RPC_HANDLERS = 1;
    static final int CLEANUP_INTERVAL = 10000;
    private static final int RPC_TIMEOUT = 60000;
    private final ExecutorService rpcHandlers;
    private final int rpcPort;
    private final NetworkThread networkThread;
    private final AtomicBoolean shutdownRequested;
    private final RPCStatistics statistics;
    private final Timer cleanupTimer;
    private final ConcurrentHashMap<String, RPCProtocol> callbackHandlers;
    private final ConcurrentHashMap<Integer, RPCRequestMonitor> pendingRequests;
    private final ConcurrentHashMap<Integer, RPCRequest> requestsBeingProcessed;
    private final ConcurrentHashMap<Integer, CachedResponse> cachedResponses;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:eu/stratosphere/nephele/rpc/RPCService$CachedResponse.class */
    public static final class CachedResponse {
        private final long creationTime;
        private final DatagramPacket[] packets;

        private CachedResponse(long j, DatagramPacket[] datagramPacketArr) {
            this.creationTime = j;
            this.packets = datagramPacketArr;
        }
    }

    /* loaded from: input_file:eu/stratosphere/nephele/rpc/RPCService$CleanupTask.class */
    private final class CleanupTask extends TimerTask {
        private CleanupTask() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            RPCService.this.statistics.processCollectedData();
            long currentTimeMillis = System.currentTimeMillis();
            Iterator it = RPCService.this.cachedResponses.entrySet().iterator();
            while (it.hasNext()) {
                if (((CachedResponse) ((Map.Entry) it.next()).getValue()).creationTime + 10000 < currentTimeMillis) {
                    it.remove();
                }
            }
            RPCService.this.networkThread.cleanUpStaleState();
        }
    }

    /* loaded from: input_file:eu/stratosphere/nephele/rpc/RPCService$RPCInvocationHandler.class */
    private final class RPCInvocationHandler implements InvocationHandler {
        private final InetSocketAddress remoteSocketAddress;
        private final String interfaceName;

        private RPCInvocationHandler(InetSocketAddress inetSocketAddress, String str) {
            this.remoteSocketAddress = inetSocketAddress;
            this.interfaceName = str;
        }

        @Override // java.lang.reflect.InvocationHandler
        public Object invoke(Object obj, Method method, Object[] objArr) throws Throwable {
            return RPCService.this.sendRPCRequest(this.remoteSocketAddress, new RPCRequest((int) ((-2.147483648E9d) + (Math.random() * 2.147483647E9d * 2.0d)), this.interfaceName, method, objArr));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:eu/stratosphere/nephele/rpc/RPCService$RPCRequestMonitor.class */
    public static final class RPCRequestMonitor {
        private RPCResponse rpcResponse;

        private RPCRequestMonitor() {
            this.rpcResponse = null;
        }
    }

    public RPCService() throws IOException {
        this(1);
    }

    public RPCService(int i) throws IOException {
        this.shutdownRequested = new AtomicBoolean(false);
        this.statistics = new RPCStatistics();
        this.cleanupTimer = new Timer();
        this.callbackHandlers = new ConcurrentHashMap<>();
        this.pendingRequests = new ConcurrentHashMap<>();
        this.requestsBeingProcessed = new ConcurrentHashMap<>();
        this.cachedResponses = new ConcurrentHashMap<>();
        this.rpcHandlers = Executors.newFixedThreadPool(i);
        this.rpcPort = -1;
        this.networkThread = new NetworkThread(this, -1);
        this.networkThread.start();
        this.cleanupTimer.schedule(new CleanupTask(), 10000L, 10000L);
    }

    public RPCService(int i, int i2) throws IOException {
        this.shutdownRequested = new AtomicBoolean(false);
        this.statistics = new RPCStatistics();
        this.cleanupTimer = new Timer();
        this.callbackHandlers = new ConcurrentHashMap<>();
        this.pendingRequests = new ConcurrentHashMap<>();
        this.requestsBeingProcessed = new ConcurrentHashMap<>();
        this.cachedResponses = new ConcurrentHashMap<>();
        this.rpcHandlers = Executors.newFixedThreadPool(i2);
        this.rpcPort = i;
        this.networkThread = new NetworkThread(this, i);
        this.networkThread.start();
        this.cleanupTimer.schedule(new CleanupTask(), 10000L, 10000L);
    }

    public <T extends RPCProtocol> T getProxy(InetSocketAddress inetSocketAddress, Class<T> cls) {
        return (T) Proxy.newProxyInstance(RPCService.class.getClassLoader(), new Class[]{cls}, new RPCInvocationHandler(inetSocketAddress, cls.getName()));
    }

    public int getRPCPort() {
        return this.rpcPort;
    }

    public void setProtocolCallbackHandler(Class<? extends RPCProtocol> cls, RPCProtocol rPCProtocol) {
        checkRPCProtocol(cls);
        if (this.callbackHandlers.putIfAbsent(cls.getName(), rPCProtocol) != null) {
            Log.error("There is already a protocol call back handler set for protocol " + cls.getName());
        }
    }

    public void shutDown() {
        if (this.shutdownRequested.compareAndSet(false, true)) {
            try {
                this.networkThread.shutdown();
            } catch (InterruptedException e) {
                Log.debug("Caught exception while waiting for network thread to shut down: ", e);
            }
            this.rpcHandlers.shutdown();
            try {
                this.rpcHandlers.awaitTermination(5000L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e2) {
                Log.debug("Caught exception while waiting for RPC handlers to finish: ", e2);
            }
            this.cleanupTimer.cancel();
            this.statistics.processCollectedData();
        }
    }

    void processIncomingRPCCleanup(RPCCleanup rPCCleanup) {
        this.cachedResponses.remove(Integer.valueOf(rPCCleanup.getMessageID()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processIncomingRPCMessage(final InetSocketAddress inetSocketAddress, final Input input) {
        this.rpcHandlers.execute(new Runnable() { // from class: eu.stratosphere.nephele.rpc.RPCService.1
            @Override // java.lang.Runnable
            public void run() {
                Kryo kryo = KryoUtil.getKryo();
                kryo.reset();
                RPCMessage rPCMessage = ((RPCEnvelope) kryo.readObject(input, RPCEnvelope.class)).getRPCMessage();
                if (rPCMessage instanceof RPCRequest) {
                    RPCService.this.processIncomingRPCRequest(inetSocketAddress, (RPCRequest) rPCMessage);
                } else if (rPCMessage instanceof RPCResponse) {
                    RPCService.this.processIncomingRPCResponse((RPCResponse) rPCMessage);
                } else {
                    RPCService.this.processIncomingRPCCleanup((RPCCleanup) rPCMessage);
                }
            }
        });
    }

    void processIncomingRPCResponse(RPCResponse rPCResponse) {
        RPCRequestMonitor rPCRequestMonitor = this.pendingRequests.get(Integer.valueOf(rPCResponse.getMessageID()));
        if (rPCRequestMonitor == null) {
            return;
        }
        synchronized (rPCRequestMonitor) {
            rPCRequestMonitor.rpcResponse = rPCResponse;
            rPCRequestMonitor.notify();
        }
    }

    Object sendRPCRequest(InetSocketAddress inetSocketAddress, RPCRequest rPCRequest) throws Throwable {
        if (this.shutdownRequested.get()) {
            throw new IOException("Shutdown of RPC service has already been requested");
        }
        long currentTimeMillis = System.currentTimeMillis();
        DatagramPacket[] messageToPackets = messageToPackets(inetSocketAddress, rPCRequest);
        Integer valueOf = Integer.valueOf(rPCRequest.getMessageID());
        RPCRequestMonitor rPCRequestMonitor = new RPCRequestMonitor();
        this.pendingRequests.put(valueOf, rPCRequestMonitor);
        RPCResponse rPCResponse = null;
        try {
            int send = this.networkThread.send(messageToPackets);
            synchronized (rPCRequestMonitor) {
                while (true) {
                    if (rPCRequestMonitor.rpcResponse == null) {
                        long currentTimeMillis2 = 60000 - (System.currentTimeMillis() - currentTimeMillis);
                        if (currentTimeMillis2 <= 0) {
                            break;
                        }
                        rPCRequestMonitor.wait(currentTimeMillis2);
                    } else {
                        rPCResponse = rPCRequestMonitor.rpcResponse;
                        break;
                    }
                }
            }
            if (rPCResponse == null) {
                throw new IOException("Unable to complete RPC of method " + rPCRequest.getMethodName() + " on " + inetSocketAddress);
            }
            String methodName = rPCRequest.getMethodName();
            this.statistics.reportSuccessfulTransmission(methodName, messageToPackets.length, send);
            this.statistics.reportRTT(methodName, (int) (System.currentTimeMillis() - currentTimeMillis));
            if (rPCResponse instanceof RPCReturnValue) {
                return ((RPCReturnValue) rPCResponse).getRetVal();
            }
            throw ((RPCThrowable) rPCResponse).getThrowable();
        } finally {
            this.pendingRequests.remove(valueOf);
        }
    }

    private boolean isThrowableRegistered(Class<? extends Throwable> cls) {
        try {
            KryoUtil.getKryo().getRegistration(cls);
            return true;
        } catch (IllegalArgumentException e) {
            return false;
        }
    }

    private DatagramPacket[] messageToPackets(InetSocketAddress inetSocketAddress, RPCMessage rPCMessage) {
        MultiPacketOutputStream multiPacketOutputStream = new MultiPacketOutputStream(1024);
        Kryo kryo = KryoUtil.getKryo();
        kryo.reset();
        Output output = new Output(multiPacketOutputStream);
        kryo.writeObject(output, new RPCEnvelope(rPCMessage));
        output.close();
        multiPacketOutputStream.close();
        return multiPacketOutputStream.createPackets(inetSocketAddress);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    public void processIncomingRPCRequest(InetSocketAddress inetSocketAddress, RPCRequest rPCRequest) {
        RPCResponse rPCThrowable;
        Integer valueOf = Integer.valueOf(rPCRequest.getMessageID());
        if (this.requestsBeingProcessed.putIfAbsent(valueOf, rPCRequest) != null) {
            Log.debug("Request " + rPCRequest.getMessageID() + " is already being processed at the moment");
            return;
        }
        CachedResponse cachedResponse = this.cachedResponses.get(valueOf);
        try {
            if (cachedResponse != null) {
                try {
                    this.statistics.reportSuccessfulTransmission(rPCRequest.getMethodName() + " (Response)", cachedResponse.packets.length, this.networkThread.send(cachedResponse.packets));
                    this.requestsBeingProcessed.remove(valueOf);
                } catch (Exception e) {
                    Log.error("Caught exception while trying to send RPC response: ", e);
                }
                return;
            }
            RPCProtocol rPCProtocol = this.callbackHandlers.get(rPCRequest.getInterfaceName());
            if (rPCProtocol == null) {
                Log.error("Cannot find callback handler for protocol " + rPCRequest.getInterfaceName());
                this.requestsBeingProcessed.remove(valueOf);
                return;
            }
            try {
                try {
                    try {
                        rPCThrowable = new RPCReturnValue(rPCRequest.getMessageID(), rPCProtocol.getClass().getMethod(rPCRequest.getMethodName(), rPCRequest.getParameterTypes()).invoke(rPCProtocol, rPCRequest.getArgs()));
                    } catch (InvocationTargetException e2) {
                        Throwable targetException = e2.getTargetException();
                        targetException.getStackTrace();
                        if (!isThrowableRegistered(targetException.getClass())) {
                            targetException = wrapInIOException(rPCRequest, targetException);
                        }
                        rPCThrowable = new RPCThrowable(rPCRequest.getMessageID(), targetException);
                    }
                    DatagramPacket[] messageToPackets = messageToPackets(inetSocketAddress, rPCThrowable);
                    this.cachedResponses.put(valueOf, new CachedResponse(System.currentTimeMillis(), messageToPackets));
                    this.statistics.reportSuccessfulTransmission(rPCRequest.getMethodName() + " (Response)", messageToPackets.length, this.networkThread.send(messageToPackets));
                    this.requestsBeingProcessed.remove(valueOf);
                } catch (Exception e3) {
                    Log.error("Caught processing RPC request: ", e3);
                    this.requestsBeingProcessed.remove(valueOf);
                }
            } finally {
                this.requestsBeingProcessed.remove(valueOf);
            }
        } catch (Throwable th) {
            this.requestsBeingProcessed.remove(valueOf);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static int decodeInteger(short s) {
        return (s - Short.MIN_VALUE) - 1;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static short encodeInteger(int i) {
        if (i < -1 || i > 65534) {
            throw new IllegalArgumentException("Value must be in the range -1 and 65534 but is " + i);
        }
        return (short) ((i - (-32768)) + 1);
    }

    private static final void checkRPCProtocol(Class<? extends RPCProtocol> cls) {
        if (!cls.isInterface()) {
            throw new IllegalArgumentException("Provided protocol " + cls + " is not an interface");
        }
        try {
            for (Method method : cls.getMethods()) {
                Class<?>[] exceptionTypes = method.getExceptionTypes();
                boolean z = false;
                boolean z2 = false;
                for (int i = 0; i < exceptionTypes.length; i++) {
                    if (IOException.class.equals(exceptionTypes[i])) {
                        z = true;
                    } else if (InterruptedException.class.equals(exceptionTypes[i])) {
                        z2 = true;
                    }
                }
                if (!z) {
                    throw new IllegalArgumentException("Method " + method.getName() + " of protocol " + cls.getName() + " must be declared to throw an IOException");
                }
                if (!z2) {
                    throw new IllegalArgumentException("Method " + method.getName() + " of protocol " + cls.getName() + " must be declared to throw an InterruptedException");
                }
            }
        } catch (SecurityException e) {
            if (Log.DEBUG) {
                Log.debug(StringUtils.stringifyException(e));
            }
        }
    }

    private static IOException wrapInIOException(RPCRequest rPCRequest, Throwable th) {
        return new IOException("The remote procedure call of method " + rPCRequest.getInterfaceName() + '.' + rPCRequest.getMethodName() + " caused an unregistered exception: " + StringUtils.stringifyException(th));
    }
}
