package eu.stratosphere.test.runtime;

import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.nephele.jobgraph.DistributionPattern;
import eu.stratosphere.nephele.jobgraph.JobGenericInputVertex;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.nephele.template.AbstractGenericInputTask;
import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.runtime.io.api.RecordReader;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.test.util.RecordAPITestBase;
import eu.stratosphere.util.LogUtils;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:eu/stratosphere/test/runtime/NetworkStackThroughput.class */
public class NetworkStackThroughput extends RecordAPITestBase {
    private static final Log LOG = LogFactory.getLog(NetworkStackThroughput.class);
    private static final String DATA_VOLUME_GB_CONFIG_KEY = "data.volume.gb";
    private static final String USE_FORWARDER_CONFIG_KEY = "use.forwarder";
    private static final String NUM_SUBTASKS_CONFIG_KEY = "num.subtasks";
    private static final String NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY = "num.subtasks.instance";
    private static final String IS_SLOW_SENDER_CONFIG_KEY = "is.slow.sender";
    private static final String IS_SLOW_RECEIVER_CONFIG_KEY = "is.slow.receiver";
    private static final int IS_SLOW_SLEEP_MS = 10;
    private static final int IS_SLOW_EVERY_NUM_RECORDS = 512;

    /* loaded from: input_file:eu/stratosphere/test/runtime/NetworkStackThroughput$SpeedTestConsumer.class */
    public static class SpeedTestConsumer extends AbstractOutputTask {
        private RecordReader<SpeedTestRecord> reader;

        public void registerInputOutput() {
            this.reader = new RecordReader<>(this, SpeedTestRecord.class);
        }

        public void invoke() throws Exception {
            boolean z = getTaskConfiguration().getBoolean(NetworkStackThroughput.IS_SLOW_RECEIVER_CONFIG_KEY, false);
            int i = 0;
            while (this.reader.next() != null) {
                if (z) {
                    int i2 = i;
                    i++;
                    if (i2 % NetworkStackThroughput.IS_SLOW_EVERY_NUM_RECORDS == 0) {
                        Thread.sleep(10L);
                    }
                }
            }
        }
    }

    /* loaded from: input_file:eu/stratosphere/test/runtime/NetworkStackThroughput$SpeedTestForwarder.class */
    public static class SpeedTestForwarder extends AbstractTask {
        private RecordReader<SpeedTestRecord> reader;
        private RecordWriter<SpeedTestRecord> writer;

        public void registerInputOutput() {
            this.reader = new RecordReader<>(this, SpeedTestRecord.class);
            this.writer = new RecordWriter<>(this);
        }

        public void invoke() throws Exception {
            this.writer.initializeSerializers();
            while (true) {
                SpeedTestRecord speedTestRecord = (SpeedTestRecord) this.reader.next();
                if (speedTestRecord == null) {
                    this.writer.flush();
                    return;
                }
                this.writer.emit(speedTestRecord);
            }
        }
    }

    /* loaded from: input_file:eu/stratosphere/test/runtime/NetworkStackThroughput$SpeedTestProducer.class */
    public static class SpeedTestProducer extends AbstractGenericInputTask {
        private RecordWriter<SpeedTestRecord> writer;

        public void registerInputOutput() {
            this.writer = new RecordWriter<>(this);
        }

        public void invoke() throws Exception {
            this.writer.initializeSerializers();
            long integer = (getTaskConfiguration().getInteger(NetworkStackThroughput.DATA_VOLUME_GB_CONFIG_KEY, 1) * 1024) / getCurrentNumberOfSubtasks();
            long j = ((integer * 1024) * 1024) / 128;
            NetworkStackThroughput.LOG.info(String.format("%d/%d: Producing %d records (each record: %d bytes, total: %.2f GB)", Integer.valueOf(getIndexInSubtaskGroup() + 1), Integer.valueOf(getCurrentNumberOfSubtasks()), Long.valueOf(j), 128, Double.valueOf(integer / 1024.0d)));
            boolean z = getTaskConfiguration().getBoolean(NetworkStackThroughput.IS_SLOW_SENDER_CONFIG_KEY, false);
            int i = 0;
            SpeedTestRecord speedTestRecord = new SpeedTestRecord();
            long j2 = 0;
            while (true) {
                long j3 = j2;
                if (j3 >= j) {
                    this.writer.flush();
                    return;
                }
                if (z) {
                    int i2 = i;
                    i++;
                    if (i2 % NetworkStackThroughput.IS_SLOW_EVERY_NUM_RECORDS == 0) {
                        Thread.sleep(10L);
                    }
                }
                this.writer.emit(speedTestRecord);
                j2 = j3 + 1;
            }
        }
    }

    /* loaded from: input_file:eu/stratosphere/test/runtime/NetworkStackThroughput$SpeedTestRecord.class */
    public static class SpeedTestRecord implements IOReadableWritable {
        private static final int RECORD_SIZE = 128;
        private final byte[] buf = new byte[RECORD_SIZE];

        public SpeedTestRecord() {
            for (int i = 0; i < RECORD_SIZE; i++) {
                this.buf[i] = (byte) (i % RECORD_SIZE);
            }
        }

        public void write(DataOutput dataOutput) throws IOException {
            dataOutput.write(this.buf);
        }

        public void read(DataInput dataInput) throws IOException {
            dataInput.readFully(this.buf);
        }
    }

    public NetworkStackThroughput(Configuration configuration) {
        super(configuration);
        setNumTaskManager(2);
        LogUtils.initializeDefaultConsoleLogger();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Parameterized.Parameters
    public static Collection<Object[]> getConfigurations() {
        Object[] objArr = {new Object[]{1, false, false, false, 4, 2}, new Object[]{1, true, false, false, 4, 2}, new Object[]{1, true, true, false, 4, 2}, new Object[]{1, true, false, true, 4, 2}, new Object[]{2, true, false, false, 4, 2}, new Object[]{4, true, false, false, 4, 2}, new Object[]{4, true, false, false, 8, 4}, new Object[]{4, true, false, false, 16, 8}};
        ArrayList arrayList = new ArrayList(objArr.length);
        for (Object[] objArr2 : objArr) {
            Configuration configuration = new Configuration();
            configuration.setInteger(DATA_VOLUME_GB_CONFIG_KEY, ((Integer) objArr2[0]).intValue());
            configuration.setBoolean(USE_FORWARDER_CONFIG_KEY, ((Boolean) objArr2[1]).booleanValue());
            configuration.setBoolean(IS_SLOW_SENDER_CONFIG_KEY, ((Boolean) objArr2[2]).booleanValue());
            configuration.setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, ((Boolean) objArr2[3]).booleanValue());
            configuration.setInteger(NUM_SUBTASKS_CONFIG_KEY, ((Integer) objArr2[4]).intValue());
            configuration.setInteger(NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY, ((Integer) objArr2[5]).intValue());
            arrayList.add(configuration);
        }
        return toParameterList(arrayList);
    }

    protected JobGraph getJobGraph() throws Exception {
        return createJobGraph(this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1), this.config.getBoolean(USE_FORWARDER_CONFIG_KEY, true), this.config.getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false), this.config.getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false), this.config.getInteger(NUM_SUBTASKS_CONFIG_KEY, 1), this.config.getInteger(NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY, 1));
    }

    @After
    public void calculateThroughput() {
        if (getJobExecutionResult() != null) {
            double integer = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1) * 8192.0d;
            double netRuntime = getJobExecutionResult().getNetRuntime() / 1000.0d;
            LOG.info(String.format("Test finished with throughput of %d MBit/s (runtime [secs]: %.2f, data volume [mbits]: %.2f)", Integer.valueOf((int) Math.round(integer / netRuntime)), Double.valueOf(netRuntime), Double.valueOf(integer)));
        }
    }

    private JobGraph createJobGraph(int i, boolean z, boolean z2, boolean z3, int i2, int i3) throws JobGraphDefinitionException {
        JobGraph jobGraph = new JobGraph("Speed Test");
        JobGenericInputVertex jobGenericInputVertex = new JobGenericInputVertex("Speed Test Producer", jobGraph);
        jobGenericInputVertex.setInputClass(SpeedTestProducer.class);
        jobGenericInputVertex.setNumberOfSubtasks(i2);
        jobGenericInputVertex.setNumberOfSubtasksPerInstance(i3);
        jobGenericInputVertex.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, i);
        jobGenericInputVertex.getConfiguration().setBoolean(IS_SLOW_SENDER_CONFIG_KEY, z2);
        JobTaskVertex jobTaskVertex = null;
        if (z) {
            jobTaskVertex = new JobTaskVertex("Speed Test Forwarder", jobGraph);
            jobTaskVertex.setTaskClass(SpeedTestForwarder.class);
            jobTaskVertex.setNumberOfSubtasks(i2);
            jobTaskVertex.setNumberOfSubtasksPerInstance(i3);
        }
        JobOutputVertex jobOutputVertex = new JobOutputVertex("Speed Test Consumer", jobGraph);
        jobOutputVertex.setOutputClass(SpeedTestConsumer.class);
        jobOutputVertex.setNumberOfSubtasks(i2);
        jobOutputVertex.setNumberOfSubtasksPerInstance(i3);
        jobOutputVertex.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, z3);
        if (z) {
            jobGenericInputVertex.connectTo(jobTaskVertex, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
            jobTaskVertex.connectTo(jobOutputVertex, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
            jobTaskVertex.setVertexToShareInstancesWith(jobGenericInputVertex);
            jobOutputVertex.setVertexToShareInstancesWith(jobGenericInputVertex);
        } else {
            jobGenericInputVertex.connectTo(jobOutputVertex, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
            jobGenericInputVertex.setVertexToShareInstancesWith(jobOutputVertex);
        }
        return jobGraph;
    }
}
