package eu.stratosphere.nephele.rpc;

import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.minlog.Log;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:eu/stratosphere/nephele/rpc/NetworkThread.class */
public final class NetworkThread extends Thread {
    private static final int MAXIMUM_NUMBER_OF_RETRANSMISSIONS = 20;
    private static final int RETRANSMISSION_TIMEOUT = 100;
    private static final int MAXIMUM_NUMBER_OF_OUTSTANDING_PACKETS = 100;
    private final RPCService rpcService;
    private final DatagramSocket socket;
    private final ConcurrentHashMap<Integer, OutstandingTransmission> outstandingTransmissions;
    private final ConcurrentHashMap<Integer, MultiPacketInputStream> incompleteInputStreams;
    private volatile boolean shutdownRequested;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:eu/stratosphere/nephele/rpc/NetworkThread$OutstandingTransmission.class */
    public static final class OutstandingTransmission {
        private int lastAckedPacket;

        private OutstandingTransmission() {
            this.lastAckedPacket = -1;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NetworkThread(RPCService rPCService, int i) throws IOException {
        super("RPC Network Thread");
        this.outstandingTransmissions = new ConcurrentHashMap<>();
        this.incompleteInputStreams = new ConcurrentHashMap<>();
        this.shutdownRequested = false;
        this.rpcService = rPCService;
        if (i == -1) {
            this.socket = new DatagramSocket();
        } else {
            this.socket = new DatagramSocket(i);
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        byte[] bArr = new byte[1024];
        DatagramPacket datagramPacket = new DatagramPacket(bArr, bArr.length);
        byte[] bArr2 = new byte[6];
        DatagramPacket datagramPacket2 = new DatagramPacket(bArr2, bArr2.length);
        while (!this.shutdownRequested) {
            try {
                this.socket.receive(datagramPacket);
                byte[] data = datagramPacket.getData();
                int length = datagramPacket.getLength();
                if (length < 8) {
                    int byteArrayToInteger = NumberUtils.byteArrayToInteger(data, 0);
                    int decodeInteger = RPCService.decodeInteger(NumberUtils.byteArrayToShort(data, 4));
                    OutstandingTransmission outstandingTransmission = this.outstandingTransmissions.get(Integer.valueOf(byteArrayToInteger));
                    if (outstandingTransmission != null) {
                        synchronized (outstandingTransmission) {
                            if (outstandingTransmission.lastAckedPacket < decodeInteger) {
                                outstandingTransmission.lastAckedPacket = decodeInteger;
                                outstandingTransmission.notify();
                            }
                        }
                    } else {
                        continue;
                    }
                } else {
                    int i = length - 8;
                    int decodeInteger2 = RPCService.decodeInteger(NumberUtils.byteArrayToShort(data, i + 2));
                    int byteArrayToInteger2 = NumberUtils.byteArrayToInteger(data, i + 4);
                    if (decodeInteger2 == 1) {
                        InetSocketAddress inetSocketAddress = (InetSocketAddress) datagramPacket.getSocketAddress();
                        NumberUtils.integerToByteArray(byteArrayToInteger2, bArr2, 0);
                        NumberUtils.shortToByteArray(RPCService.encodeInteger(0), bArr2, 4);
                        datagramPacket2.setSocketAddress(inetSocketAddress);
                        try {
                            this.socket.send(datagramPacket2);
                            byte[] bArr3 = new byte[1024];
                            datagramPacket = new DatagramPacket(bArr3, bArr3.length);
                            this.rpcService.processIncomingRPCMessage(inetSocketAddress, new Input(new SinglePacketInputStream(data, i)));
                        } catch (IOException e) {
                            if (this.shutdownRequested) {
                                return;
                            }
                            Log.error("Shutting down receiver thread due to error: ", e);
                            return;
                        }
                    } else {
                        Integer valueOf = Integer.valueOf(byteArrayToInteger2);
                        MultiPacketInputStream multiPacketInputStream = this.incompleteInputStreams.get(valueOf);
                        if (multiPacketInputStream == null) {
                            multiPacketInputStream = new MultiPacketInputStream(decodeInteger2);
                            MultiPacketInputStream putIfAbsent = this.incompleteInputStreams.putIfAbsent(valueOf, multiPacketInputStream);
                            if (putIfAbsent != null) {
                                multiPacketInputStream = putIfAbsent;
                            }
                        }
                        int decodeInteger3 = RPCService.decodeInteger(NumberUtils.byteArrayToShort(data, i));
                        int addPacket = multiPacketInputStream.addPacket(decodeInteger3, datagramPacket);
                        if (decodeInteger3 != addPacket) {
                            InetSocketAddress inetSocketAddress2 = (InetSocketAddress) datagramPacket.getSocketAddress();
                            NumberUtils.integerToByteArray(byteArrayToInteger2, bArr2, 0);
                            NumberUtils.shortToByteArray(RPCService.encodeInteger(addPacket - 1), bArr2, 4);
                            datagramPacket2.setSocketAddress(inetSocketAddress2);
                            try {
                                this.socket.send(datagramPacket2);
                            } catch (IOException e2) {
                                if (this.shutdownRequested) {
                                    return;
                                }
                                Log.error("Shutting down receiver thread due to error: ", e2);
                                return;
                            }
                        } else {
                            if ((decodeInteger3 - 1) % 10 == 0 || decodeInteger3 == decodeInteger2 - 1) {
                                InetSocketAddress inetSocketAddress3 = (InetSocketAddress) datagramPacket.getSocketAddress();
                                NumberUtils.integerToByteArray(byteArrayToInteger2, bArr2, 0);
                                NumberUtils.shortToByteArray(RPCService.encodeInteger(decodeInteger3), bArr2, 4);
                                datagramPacket2.setSocketAddress(inetSocketAddress3);
                                try {
                                    this.socket.send(datagramPacket2);
                                } catch (IOException e3) {
                                    if (this.shutdownRequested) {
                                        return;
                                    }
                                    Log.error("Shutting down receiver thread due to error: ", e3);
                                    return;
                                }
                            }
                            InetSocketAddress inetSocketAddress4 = (InetSocketAddress) datagramPacket.getSocketAddress();
                            byte[] bArr4 = new byte[1024];
                            datagramPacket = new DatagramPacket(bArr4, bArr4.length);
                            if (multiPacketInputStream.isComplete()) {
                                this.incompleteInputStreams.remove(valueOf);
                                this.rpcService.processIncomingRPCMessage(inetSocketAddress4, new Input(multiPacketInputStream));
                            }
                        }
                    }
                }
            } catch (SocketException e4) {
                if (this.shutdownRequested) {
                    return;
                }
                Log.error("Shutting down receiver thread due to error: ", e4);
                return;
            } catch (IOException e5) {
                Log.error("Shutting down receiver thread due to error: ", e5);
                return;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void cleanUpStaleState() {
        long currentTimeMillis = System.currentTimeMillis();
        Iterator<MultiPacketInputStream> it = this.incompleteInputStreams.values().iterator();
        while (it.hasNext()) {
            if (it.next().getCreationTime() + 10000 < currentTimeMillis) {
                it.remove();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int send(DatagramPacket[] datagramPacketArr) throws IOException, InterruptedException {
        return send(datagramPacketArr, true);
    }

    int send(DatagramPacket[] datagramPacketArr, boolean z) throws IOException, InterruptedException {
        if (datagramPacketArr.length == 0) {
            return 0;
        }
        OutstandingTransmission outstandingTransmission = new OutstandingTransmission();
        Integer valueOf = Integer.valueOf(NumberUtils.byteArrayToInteger(datagramPacketArr[0].getData(), (datagramPacketArr[0].getLength() - 8) + 4));
        if (!z) {
            for (DatagramPacket datagramPacket : datagramPacketArr) {
                this.socket.send(datagramPacket);
            }
            return 0;
        }
        this.outstandingTransmissions.put(valueOf, outstandingTransmission);
        int i = -1;
        int i2 = 0;
        int i3 = 0;
        boolean z2 = true;
        while (true) {
            if (z2) {
                try {
                    int min = Math.min(datagramPacketArr.length, i + 100 + 1);
                    for (int i4 = i + 1; i4 < min; i4++) {
                        this.socket.send(datagramPacketArr[i4]);
                    }
                    i3 = min;
                } finally {
                    this.outstandingTransmissions.remove(valueOf);
                }
            } else {
                int min2 = Math.min(datagramPacketArr.length, (i3 + 100) - ((i3 - i) - 1));
                for (int i5 = i3; i5 < min2; i5++) {
                    this.socket.send(datagramPacketArr[i5]);
                }
                i3 = min2;
            }
            synchronized (outstandingTransmission) {
                i = outstandingTransmission.lastAckedPacket;
                if (i != i3 - 1) {
                    outstandingTransmission.wait(100L);
                    int i6 = outstandingTransmission.lastAckedPacket;
                    if (i == i6) {
                        z2 = true;
                        i2++;
                        if (i2 == MAXIMUM_NUMBER_OF_RETRANSMISSIONS) {
                        }
                    } else {
                        z2 = false;
                        i = i6;
                        if (i == i3 - 1) {
                            if (i3 == datagramPacketArr.length) {
                            }
                        }
                    }
                } else if (i3 != datagramPacketArr.length) {
                    z2 = false;
                }
            }
            break;
        }
        if (i != datagramPacketArr.length - 1) {
            throw new IOException("Unable to send RPC request to " + datagramPacketArr[0].getSocketAddress());
        }
        return i2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown() throws InterruptedException {
        this.shutdownRequested = true;
        this.socket.close();
        interrupted();
        join();
    }
}
