flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [7/9] git commit: Renamed Network Speed test to exclude the long benachmark from regular test cycles
Date Thu, 12 Jun 2014 21:03:58 GMT
Renamed Network Speed test to exclude the long benachmark from regular test cycles


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/54f30591
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/54f30591
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/54f30591

Branch: refs/heads/release-0.5.1
Commit: 54f305919e0d1da147d2759ee32ea8d189ca3f3d
Parents: 2a165ee
Author: StephanEwen <stephan.ewen@tu-berlin.de>
Authored: Thu Jun 12 17:50:30 2014 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Thu Jun 12 20:48:10 2014 +0200

----------------------------------------------------------------------
 .../test/runtime/NetworkStackNepheleITCase.java | 286 -------------------
 .../test/runtime/NetworkStackThroughput.java    | 286 +++++++++++++++++++
 2 files changed, 286 insertions(+), 286 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/54f30591/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java
b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java
deleted file mode 100644
index 64026a2..0000000
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.test.runtime;
-
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.jobgraph.DistributionPattern;
-import eu.stratosphere.nephele.jobgraph.JobGenericInputVertex;
-import eu.stratosphere.nephele.jobgraph.JobGraph;
-import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
-import eu.stratosphere.nephele.jobgraph.JobInputVertex;
-import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
-import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
-import eu.stratosphere.nephele.template.AbstractGenericInputTask;
-import eu.stratosphere.nephele.template.AbstractOutputTask;
-import eu.stratosphere.nephele.template.AbstractTask;
-import eu.stratosphere.runtime.io.api.RecordReader;
-import eu.stratosphere.runtime.io.api.RecordWriter;
-import eu.stratosphere.runtime.io.channels.ChannelType;
-import eu.stratosphere.test.util.RecordAPITestBase;
-import eu.stratosphere.util.LogUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.junit.After;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class NetworkStackNepheleITCase extends RecordAPITestBase {
-
-	private static final Log LOG = LogFactory.getLog(NetworkStackNepheleITCase.class);
-
-	private static final String DATA_VOLUME_GB_CONFIG_KEY = "data.volume.gb";
-
-	private static final String USE_FORWARDER_CONFIG_KEY = "use.forwarder";
-
-	private static final String NUM_SUBTASKS_CONFIG_KEY = "num.subtasks";
-
-	private static final String NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY = "num.subtasks.instance";
-
-	private static final String IS_SLOW_SENDER_CONFIG_KEY = "is.slow.sender";
-
-	private static final String IS_SLOW_RECEIVER_CONFIG_KEY = "is.slow.receiver";
-
-	private static final int IS_SLOW_SLEEP_MS = 10;
-
-	private static final int IS_SLOW_EVERY_NUM_RECORDS = (2 * 32 * 1024) / SpeedTestRecord.RECORD_SIZE;
-
-	// ------------------------------------------------------------------------
-
-	public NetworkStackNepheleITCase(Configuration config) {
-		super(config);
-
-		setNumTaskManager(2);
-		LogUtils.initializeDefaultConsoleLogger();
-	}
-
-	@Parameters
-	public static Collection<Object[]> getConfigurations() {
-		Object[][] configParams = new Object[][]{
-				new Object[]{1, false, false, false, 4, 2},
-				new Object[]{1, true, false, false, 4, 2},
-				new Object[]{1, true, true, false, 4, 2},
-				new Object[]{1, true, false, true, 4, 2},
-				new Object[]{2, true, false, false, 4, 2},
-				new Object[]{4, true, false, false, 4, 2},
-				new Object[]{4, true, false, false, 8, 4},
-				new Object[]{4, true, false, false, 16, 8},
-		};
-
-		List<Configuration> configs = new ArrayList<Configuration>(configParams.length);
-		for (Object[] p : configParams) {
-			Configuration config = new Configuration();
-			config.setInteger(DATA_VOLUME_GB_CONFIG_KEY, (Integer) p[0]);
-			config.setBoolean(USE_FORWARDER_CONFIG_KEY, (Boolean) p[1]);
-			config.setBoolean(IS_SLOW_SENDER_CONFIG_KEY, (Boolean) p[2]);
-			config.setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, (Boolean) p[3]);
-			config.setInteger(NUM_SUBTASKS_CONFIG_KEY, (Integer) p[4]);
-			config.setInteger(NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY, (Integer) p[5]);
-
-			configs.add(config);
-		}
-
-		return toParameterList(configs);
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	protected JobGraph getJobGraph() throws Exception {
-		int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
-		boolean useForwarder = this.config.getBoolean(USE_FORWARDER_CONFIG_KEY, true);
-		boolean isSlowSender = this.config.getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
-		boolean isSlowReceiver = this.config.getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
-		int numSubtasks = this.config.getInteger(NUM_SUBTASKS_CONFIG_KEY, 1);
-		int numSubtasksPerInstance = this.config.getInteger(NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY,
1);
-
-		return createJobGraph(dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, numSubtasks,
numSubtasksPerInstance);
-	}
-
-	@After
-	public void calculateThroughput() {
-		if (getJobExecutionResult() != null) {
-			int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
-
-			double dataVolumeMbit = dataVolumeGb * 8192.0;
-			double runtimeSecs = getJobExecutionResult().getNetRuntime() / 1000.0;
-
-			int mbitPerSecond = (int) Math.round(dataVolumeMbit / runtimeSecs);
-
-			LOG.info(String.format("Test finished with throughput of %d MBit/s (" +
-					"runtime [secs]: %.2f, data volume [mbits]: %.2f)", mbitPerSecond, runtimeSecs, dataVolumeMbit));
-		}
-	}
-
-	private JobGraph createJobGraph(int dataVolumeGb, boolean useForwarder, boolean isSlowSender,
boolean isSlowReceiver,
-									int numSubtasks, int numSubtasksPerInstance) throws JobGraphDefinitionException
{
-
-		JobGraph jobGraph = new JobGraph("Speed Test");
-
-		JobInputVertex producer = new JobGenericInputVertex("Speed Test Producer", jobGraph);
-		producer.setInputClass(SpeedTestProducer.class);
-		producer.setNumberOfSubtasks(numSubtasks);
-		producer.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
-		producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb);
-		producer.getConfiguration().setBoolean(IS_SLOW_SENDER_CONFIG_KEY, isSlowSender);
-
-		JobTaskVertex forwarder = null;
-		if (useForwarder) {
-			forwarder = new JobTaskVertex("Speed Test Forwarder", jobGraph);
-			forwarder.setTaskClass(SpeedTestForwarder.class);
-			forwarder.setNumberOfSubtasks(numSubtasks);
-			forwarder.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
-		}
-
-		JobOutputVertex consumer = new JobOutputVertex("Speed Test Consumer", jobGraph);
-		consumer.setOutputClass(SpeedTestConsumer.class);
-		consumer.setNumberOfSubtasks(numSubtasks);
-		consumer.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
-		consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);
-
-		if (useForwarder) {
-			producer.connectTo(forwarder, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
-			forwarder.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
-
-			forwarder.setVertexToShareInstancesWith(producer);
-			consumer.setVertexToShareInstancesWith(producer);
-		}
-		else {
-			producer.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
-			producer.setVertexToShareInstancesWith(consumer);
-		}
-
-		return jobGraph;
-	}
-
-	// ------------------------------------------------------------------------
-
-	public static class SpeedTestProducer extends AbstractGenericInputTask {
-
-		private RecordWriter<SpeedTestRecord> writer;
-
-		@Override
-		public void registerInputOutput() {
-			this.writer = new RecordWriter<SpeedTestRecord>(this);
-		}
-
-		@Override
-		public void invoke() throws Exception {
-			this.writer.initializeSerializers();
-
-			// Determine the amount of data to send per subtask
-			int dataVolumeGb = getTaskConfiguration().getInteger(NetworkStackNepheleITCase.DATA_VOLUME_GB_CONFIG_KEY,
1);
-
-			long dataMbPerSubtask = (dataVolumeGb * 1024) / getCurrentNumberOfSubtasks();
-			long numRecordsToEmit = (dataMbPerSubtask * 1024 * 1024) / SpeedTestRecord.RECORD_SIZE;
-
-			LOG.info(String.format("%d/%d: Producing %d records (each record: %d bytes, total: %.2f
GB)",
-					getIndexInSubtaskGroup() + 1, getCurrentNumberOfSubtasks(), numRecordsToEmit,
-					SpeedTestRecord.RECORD_SIZE, dataMbPerSubtask / 1024.0));
-
-			boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
-
-			int numRecords = 0;
-			SpeedTestRecord record = new SpeedTestRecord();
-			for (long i = 0; i < numRecordsToEmit; i++) {
-				if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
-					Thread.sleep(IS_SLOW_SLEEP_MS);
-				}
-
-				this.writer.emit(record);
-			}
-
-			this.writer.flush();
-		}
-	}
-
-	public static class SpeedTestForwarder extends AbstractTask {
-
-		private RecordReader<SpeedTestRecord> reader;
-
-		private RecordWriter<SpeedTestRecord> writer;
-
-		@Override
-		public void registerInputOutput() {
-			this.reader = new RecordReader<SpeedTestRecord>(this, SpeedTestRecord.class);
-			this.writer = new RecordWriter<SpeedTestRecord>(this);
-		}
-
-		@Override
-		public void invoke() throws Exception {
-			this.writer.initializeSerializers();
-
-			SpeedTestRecord record;
-			while ((record = this.reader.next()) != null) {
-				this.writer.emit(record);
-			}
-
-			this.writer.flush();
-		}
-	}
-
-	public static class SpeedTestConsumer extends AbstractOutputTask {
-
-		private RecordReader<SpeedTestRecord> reader;
-
-		@Override
-		public void registerInputOutput() {
-			this.reader = new RecordReader<SpeedTestRecord>(this, SpeedTestRecord.class);
-		}
-
-		@Override
-		public void invoke() throws Exception {
-			boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
-
-			int numRecords = 0;
-			while (this.reader.next() != null) {
-				if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
-					Thread.sleep(IS_SLOW_SLEEP_MS);
-				}
-			}
-		}
-	}
-
-	public static class SpeedTestRecord implements IOReadableWritable {
-
-		private static final int RECORD_SIZE = 128;
-
-		private final byte[] buf = new byte[RECORD_SIZE];
-
-		public SpeedTestRecord() {
-			for (int i = 0; i < RECORD_SIZE; ++i) {
-				this.buf[i] = (byte) (i % 128);
-			}
-		}
-
-		@Override
-		public void write(DataOutput out) throws IOException {
-			out.write(this.buf);
-		}
-
-		@Override
-		public void read(DataInput in) throws IOException {
-			in.readFully(this.buf);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/54f30591/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
new file mode 100644
index 0000000..fae3f99
--- /dev/null
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
@@ -0,0 +1,286 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.test.runtime;
+
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.nephele.jobgraph.JobGenericInputVertex;
+import eu.stratosphere.nephele.jobgraph.JobGraph;
+import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
+import eu.stratosphere.nephele.jobgraph.JobInputVertex;
+import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
+import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
+import eu.stratosphere.nephele.template.AbstractGenericInputTask;
+import eu.stratosphere.nephele.template.AbstractOutputTask;
+import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.runtime.io.api.RecordReader;
+import eu.stratosphere.runtime.io.api.RecordWriter;
+import eu.stratosphere.runtime.io.channels.ChannelType;
+import eu.stratosphere.test.util.RecordAPITestBase;
+import eu.stratosphere.util.LogUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class NetworkStackThroughput extends RecordAPITestBase {
+
+	private static final Log LOG = LogFactory.getLog(NetworkStackThroughput.class);
+
+	private static final String DATA_VOLUME_GB_CONFIG_KEY = "data.volume.gb";
+
+	private static final String USE_FORWARDER_CONFIG_KEY = "use.forwarder";
+
+	private static final String NUM_SUBTASKS_CONFIG_KEY = "num.subtasks";
+
+	private static final String NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY = "num.subtasks.instance";
+
+	private static final String IS_SLOW_SENDER_CONFIG_KEY = "is.slow.sender";
+
+	private static final String IS_SLOW_RECEIVER_CONFIG_KEY = "is.slow.receiver";
+
+	private static final int IS_SLOW_SLEEP_MS = 10;
+
+	private static final int IS_SLOW_EVERY_NUM_RECORDS = (2 * 32 * 1024) / SpeedTestRecord.RECORD_SIZE;
+
+	// ------------------------------------------------------------------------
+
+	public NetworkStackThroughput(Configuration config) {
+		super(config);
+
+		setNumTaskManager(2);
+		LogUtils.initializeDefaultConsoleLogger();
+	}
+
+	@Parameters
+	public static Collection<Object[]> getConfigurations() {
+		Object[][] configParams = new Object[][]{
+				new Object[]{1, false, false, false, 4, 2},
+				new Object[]{1, true, false, false, 4, 2},
+				new Object[]{1, true, true, false, 4, 2},
+				new Object[]{1, true, false, true, 4, 2},
+				new Object[]{2, true, false, false, 4, 2},
+				new Object[]{4, true, false, false, 4, 2},
+				new Object[]{4, true, false, false, 8, 4},
+				new Object[]{4, true, false, false, 16, 8},
+		};
+
+		List<Configuration> configs = new ArrayList<Configuration>(configParams.length);
+		for (Object[] p : configParams) {
+			Configuration config = new Configuration();
+			config.setInteger(DATA_VOLUME_GB_CONFIG_KEY, (Integer) p[0]);
+			config.setBoolean(USE_FORWARDER_CONFIG_KEY, (Boolean) p[1]);
+			config.setBoolean(IS_SLOW_SENDER_CONFIG_KEY, (Boolean) p[2]);
+			config.setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, (Boolean) p[3]);
+			config.setInteger(NUM_SUBTASKS_CONFIG_KEY, (Integer) p[4]);
+			config.setInteger(NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY, (Integer) p[5]);
+
+			configs.add(config);
+		}
+
+		return toParameterList(configs);
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected JobGraph getJobGraph() throws Exception {
+		int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
+		boolean useForwarder = this.config.getBoolean(USE_FORWARDER_CONFIG_KEY, true);
+		boolean isSlowSender = this.config.getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
+		boolean isSlowReceiver = this.config.getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
+		int numSubtasks = this.config.getInteger(NUM_SUBTASKS_CONFIG_KEY, 1);
+		int numSubtasksPerInstance = this.config.getInteger(NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY,
1);
+
+		return createJobGraph(dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, numSubtasks,
numSubtasksPerInstance);
+	}
+
+	@After
+	public void calculateThroughput() {
+		if (getJobExecutionResult() != null) {
+			int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
+
+			double dataVolumeMbit = dataVolumeGb * 8192.0;
+			double runtimeSecs = getJobExecutionResult().getNetRuntime() / 1000.0;
+
+			int mbitPerSecond = (int) Math.round(dataVolumeMbit / runtimeSecs);
+
+			LOG.info(String.format("Test finished with throughput of %d MBit/s (" +
+					"runtime [secs]: %.2f, data volume [mbits]: %.2f)", mbitPerSecond, runtimeSecs, dataVolumeMbit));
+		}
+	}
+
+	private JobGraph createJobGraph(int dataVolumeGb, boolean useForwarder, boolean isSlowSender,
boolean isSlowReceiver,
+									int numSubtasks, int numSubtasksPerInstance) throws JobGraphDefinitionException
{
+
+		JobGraph jobGraph = new JobGraph("Speed Test");
+
+		JobInputVertex producer = new JobGenericInputVertex("Speed Test Producer", jobGraph);
+		producer.setInputClass(SpeedTestProducer.class);
+		producer.setNumberOfSubtasks(numSubtasks);
+		producer.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
+		producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb);
+		producer.getConfiguration().setBoolean(IS_SLOW_SENDER_CONFIG_KEY, isSlowSender);
+
+		JobTaskVertex forwarder = null;
+		if (useForwarder) {
+			forwarder = new JobTaskVertex("Speed Test Forwarder", jobGraph);
+			forwarder.setTaskClass(SpeedTestForwarder.class);
+			forwarder.setNumberOfSubtasks(numSubtasks);
+			forwarder.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
+		}
+
+		JobOutputVertex consumer = new JobOutputVertex("Speed Test Consumer", jobGraph);
+		consumer.setOutputClass(SpeedTestConsumer.class);
+		consumer.setNumberOfSubtasks(numSubtasks);
+		consumer.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
+		consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);
+
+		if (useForwarder) {
+			producer.connectTo(forwarder, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
+			forwarder.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
+
+			forwarder.setVertexToShareInstancesWith(producer);
+			consumer.setVertexToShareInstancesWith(producer);
+		}
+		else {
+			producer.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
+			producer.setVertexToShareInstancesWith(consumer);
+		}
+
+		return jobGraph;
+	}
+
+	// ------------------------------------------------------------------------
+
+	public static class SpeedTestProducer extends AbstractGenericInputTask {
+
+		private RecordWriter<SpeedTestRecord> writer;
+
+		@Override
+		public void registerInputOutput() {
+			this.writer = new RecordWriter<SpeedTestRecord>(this);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			this.writer.initializeSerializers();
+
+			// Determine the amount of data to send per subtask
+			int dataVolumeGb = getTaskConfiguration().getInteger(NetworkStackThroughput.DATA_VOLUME_GB_CONFIG_KEY,
1);
+
+			long dataMbPerSubtask = (dataVolumeGb * 1024) / getCurrentNumberOfSubtasks();
+			long numRecordsToEmit = (dataMbPerSubtask * 1024 * 1024) / SpeedTestRecord.RECORD_SIZE;
+
+			LOG.info(String.format("%d/%d: Producing %d records (each record: %d bytes, total: %.2f
GB)",
+					getIndexInSubtaskGroup() + 1, getCurrentNumberOfSubtasks(), numRecordsToEmit,
+					SpeedTestRecord.RECORD_SIZE, dataMbPerSubtask / 1024.0));
+
+			boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
+
+			int numRecords = 0;
+			SpeedTestRecord record = new SpeedTestRecord();
+			for (long i = 0; i < numRecordsToEmit; i++) {
+				if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
+					Thread.sleep(IS_SLOW_SLEEP_MS);
+				}
+
+				this.writer.emit(record);
+			}
+
+			this.writer.flush();
+		}
+	}
+
+	public static class SpeedTestForwarder extends AbstractTask {
+
+		private RecordReader<SpeedTestRecord> reader;
+
+		private RecordWriter<SpeedTestRecord> writer;
+
+		@Override
+		public void registerInputOutput() {
+			this.reader = new RecordReader<SpeedTestRecord>(this, SpeedTestRecord.class);
+			this.writer = new RecordWriter<SpeedTestRecord>(this);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			this.writer.initializeSerializers();
+
+			SpeedTestRecord record;
+			while ((record = this.reader.next()) != null) {
+				this.writer.emit(record);
+			}
+
+			this.writer.flush();
+		}
+	}
+
+	public static class SpeedTestConsumer extends AbstractOutputTask {
+
+		private RecordReader<SpeedTestRecord> reader;
+
+		@Override
+		public void registerInputOutput() {
+			this.reader = new RecordReader<SpeedTestRecord>(this, SpeedTestRecord.class);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
+
+			int numRecords = 0;
+			while (this.reader.next() != null) {
+				if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
+					Thread.sleep(IS_SLOW_SLEEP_MS);
+				}
+			}
+		}
+	}
+
+	public static class SpeedTestRecord implements IOReadableWritable {
+
+		private static final int RECORD_SIZE = 128;
+
+		private final byte[] buf = new byte[RECORD_SIZE];
+
+		public SpeedTestRecord() {
+			for (int i = 0; i < RECORD_SIZE; ++i) {
+				this.buf[i] = (byte) (i % 128);
+			}
+		}
+
+		@Override
+		public void write(DataOutput out) throws IOException {
+			out.write(this.buf);
+		}
+
+		@Override
+		public void read(DataInput in) throws IOException {
+			in.readFully(this.buf);
+		}
+	}
+}


Mime
View raw message