flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [34/53] [abbrv] git commit: [FLINK-979] Fix NetworkThroughput test input and output task config
Date Thu, 26 Jun 2014 09:46:59 GMT
[FLINK-979] Fix NetworkThroughput test input and output task config

- Set DummyInputFormat and DummyOutputFormat via TaskConfig to respect task
  hierarchy refactoring.
- Run test via main method instead of JUnit test runner (this was originally
  a test in order to use RecordAPITestBase for JobGraph submission).

This closes #41.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/2f0bd8fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/2f0bd8fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/2f0bd8fa

Branch: refs/heads/travis_test
Commit: 2f0bd8fa26f5ed13d5f116b296557df8207a8260
Parents: f3c5428
Author: uce <u.celebi@fu-berlin.de>
Authored: Tue Jun 24 23:24:17 2014 +0200
Committer: uce <u.celebi@fu-berlin.de>
Committed: Wed Jun 25 10:25:54 2014 +0200

----------------------------------------------------------------------
 .../test/runtime/NetworkStackThroughput.java    | 280 +++++++++++--------
 1 file changed, 168 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2f0bd8fa/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
index ed6f608..74d52b9 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
@@ -13,22 +13,12 @@
 
 package eu.stratosphere.test.runtime;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.junit.After;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
+import eu.stratosphere.api.common.io.GenericInputFormat;
+import eu.stratosphere.api.common.io.OutputFormat;
+import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.core.io.InputSplit;
 import eu.stratosphere.nephele.jobgraph.DistributionPattern;
 import eu.stratosphere.nephele.jobgraph.JobGraph;
 import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
@@ -36,14 +26,22 @@ import eu.stratosphere.nephele.jobgraph.JobInputVertex;
 import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
 import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
 import eu.stratosphere.nephele.template.AbstractInvokable;
+import eu.stratosphere.pact.runtime.task.util.TaskConfig;
 import eu.stratosphere.runtime.io.api.RecordReader;
 import eu.stratosphere.runtime.io.api.RecordWriter;
 import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.test.util.RecordAPITestBase;
+import eu.stratosphere.types.Record;
 import eu.stratosphere.util.LogUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 
-@RunWith(Parameterized.class)
-public class NetworkStackThroughput extends RecordAPITestBase {
+public class NetworkStackThroughput {
 
 	private static final Log LOG = LogFactory.getLog(NetworkStackThroughput.class);
 
@@ -62,125 +60,102 @@ public class NetworkStackThroughput extends RecordAPITestBase {
 	private static final int IS_SLOW_SLEEP_MS = 10;
 
 	private static final int IS_SLOW_EVERY_NUM_RECORDS = (2 * 32 * 1024) / SpeedTestRecord.RECORD_SIZE;
-	
-	// ------------------------------------------------------------------------
-	
-	private int dataVolumeGb;
-	private boolean useForwarder;
-	private boolean isSlowSender;
-	private boolean isSlowReceiver;
-	private int parallelism;
 
 	// ------------------------------------------------------------------------
 
-	public NetworkStackThroughput(Configuration config) {
-		super(config);
-		
-		dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
-		useForwarder = this.config.getBoolean(USE_FORWARDER_CONFIG_KEY, true);
-		isSlowSender = this.config.getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
-		isSlowReceiver = this.config.getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
-		parallelism = config.getInteger(PARALLELISM_CONFIG_KEY, 1);
-		
-		int numSlots = config.getInteger(NUM_SLOTS_PER_TM_CONFIG_KEY, 1);
-		
-		if (parallelism % numSlots != 0) {
-			throw new RuntimeException("The test case defines a parallelism that is not a multiple
of the slots per task manager.");
-		}
-		
-		setNumTaskTracker(parallelism / numSlots);
-		setTaskManagerNumSlots(numSlots);
-		
-		LogUtils.initializeDefaultConsoleLogger();
-	}
+	// wrapper to reuse RecordAPITestBase code in runs via main()
+	private static class TestBaseWrapper extends RecordAPITestBase {
 
-	@Parameters
-	public static Collection<Object[]> getConfigurations() {
-		Object[][] configParams = new Object[][]{
-				new Object[]{1, false, false, false, 4, 2},
-				new Object[]{1, true, false, false, 4, 2},
-				new Object[]{1, true, true, false, 4, 2},
-				new Object[]{1, true, false, true, 4, 2},
-				new Object[]{2, true, false, false, 4, 2},
-				new Object[]{4, true, false, false, 4, 2},
-				new Object[]{4, true, false, false, 8, 4},
-				new Object[]{4, true, false, false, 16, 8},
-		};
+		private int dataVolumeGb;
+		private boolean useForwarder;
+		private boolean isSlowSender;
+		private boolean isSlowReceiver;
+		private int parallelism;
 
-		List<Configuration> configs = new ArrayList<Configuration>(configParams.length);
-		for (Object[] p : configParams) {
-			Configuration config = new Configuration();
-			config.setInteger(DATA_VOLUME_GB_CONFIG_KEY, (Integer) p[0]);
-			config.setBoolean(USE_FORWARDER_CONFIG_KEY, (Boolean) p[1]);
-			config.setBoolean(IS_SLOW_SENDER_CONFIG_KEY, (Boolean) p[2]);
-			config.setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, (Boolean) p[3]);
-			config.setInteger(PARALLELISM_CONFIG_KEY, (Integer) p[4]);
-			config.setInteger(NUM_SLOTS_PER_TM_CONFIG_KEY, (Integer) p[5]);
+		public TestBaseWrapper(Configuration config) {
+			super(config);
 
-			configs.add(config);
+			dataVolumeGb = config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
+			useForwarder = config.getBoolean(USE_FORWARDER_CONFIG_KEY, true);
+			isSlowSender = config.getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
+			isSlowReceiver = config.getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
+			parallelism = config.getInteger(PARALLELISM_CONFIG_KEY, 1);
+
+			int numSlots = config.getInteger(NUM_SLOTS_PER_TM_CONFIG_KEY, 1);
+
+			if (parallelism % numSlots != 0) {
+				throw new RuntimeException("The test case defines a parallelism that is not a multiple
of the slots per task manager.");
+			}
+
+			setNumTaskTracker(parallelism / numSlots);
+			setTaskManagerNumSlots(numSlots);
+
+			LogUtils.initializeDefaultConsoleLogger();
 		}
 
-		return toParameterList(configs);
-	}
+		@Override
+		protected JobGraph getJobGraph() throws Exception {
+			return createJobGraph(dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, parallelism);
+		}
 
-	// ------------------------------------------------------------------------
+		private JobGraph createJobGraph(int dataVolumeGb, boolean useForwarder, boolean isSlowSender,
+				boolean isSlowReceiver, int numSubtasks) throws JobGraphDefinitionException {
 
-	@Override
-	protected JobGraph getJobGraph() throws Exception {
-		return createJobGraph(dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, parallelism);
-	}
+			JobGraph jobGraph = new JobGraph("Speed Test");
 
-	@After
-	public void calculateThroughput() {
-		if (getJobExecutionResult() != null) {
-			int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
+			JobInputVertex producer = new JobInputVertex("Speed Test Producer", jobGraph);
+			producer.setInvokableClass(SpeedTestProducer.class);
+			producer.setNumberOfSubtasks(numSubtasks);
+			producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb);
+			producer.getConfiguration().setBoolean(IS_SLOW_SENDER_CONFIG_KEY, isSlowSender);
 
-			double dataVolumeMbit = dataVolumeGb * 8192.0;
-			double runtimeSecs = getJobExecutionResult().getNetRuntime() / 1000.0;
+			TaskConfig inputConfig = new TaskConfig(producer.getConfiguration());
+			inputConfig.setStubWrapper(new UserCodeObjectWrapper<Object>(new DummyInputFormat()));
 
-			int mbitPerSecond = (int) Math.round(dataVolumeMbit / runtimeSecs);
+			JobTaskVertex forwarder = null;
+			if (useForwarder) {
+				forwarder = new JobTaskVertex("Speed Test Forwarder", jobGraph);
+				forwarder.setInvokableClass(SpeedTestForwarder.class);
+				forwarder.setNumberOfSubtasks(numSubtasks);
+			}
 
-			LOG.info(String.format("Test finished with throughput of %d MBit/s (" +
-					"runtime [secs]: %.2f, data volume [mbits]: %.2f)", mbitPerSecond, runtimeSecs, dataVolumeMbit));
-		}
-	}
+			JobOutputVertex consumer = new JobOutputVertex("Speed Test Consumer", jobGraph);
+			consumer.setInvokableClass(SpeedTestConsumer.class);
+			consumer.setNumberOfSubtasks(numSubtasks);
+			consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);
 
-	private JobGraph createJobGraph(int dataVolumeGb, boolean useForwarder, boolean isSlowSender,
boolean isSlowReceiver,
-									int numSubtasks) throws JobGraphDefinitionException {
+			TaskConfig outputConfig = new TaskConfig(consumer.getConfiguration());
+			outputConfig.setStubWrapper(new UserCodeObjectWrapper<Object>(new DummyOutputFormat()));
 
-		JobGraph jobGraph = new JobGraph("Speed Test");
+			if (useForwarder) {
+				producer.connectTo(forwarder, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
+				forwarder.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
 
-		JobInputVertex producer = new JobInputVertex("Speed Test Producer", jobGraph);
-		producer.setInvokableClass(SpeedTestProducer.class);
-		producer.setNumberOfSubtasks(numSubtasks);
-		producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb);
-		producer.getConfiguration().setBoolean(IS_SLOW_SENDER_CONFIG_KEY, isSlowSender);
+				forwarder.setVertexToShareInstancesWith(producer);
+				consumer.setVertexToShareInstancesWith(producer);
+			}
+			else {
+				producer.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
+				producer.setVertexToShareInstancesWith(consumer);
+			}
 
-		JobTaskVertex forwarder = null;
-		if (useForwarder) {
-			forwarder = new JobTaskVertex("Speed Test Forwarder", jobGraph);
-			forwarder.setInvokableClass(SpeedTestForwarder.class);
-			forwarder.setNumberOfSubtasks(numSubtasks);
+			return jobGraph;
 		}
 
-		JobOutputVertex consumer = new JobOutputVertex("Speed Test Consumer", jobGraph);
-		consumer.setInvokableClass(SpeedTestConsumer.class);
-		consumer.setNumberOfSubtasks(numSubtasks);
-		consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);
+		@After
+		public void calculateThroughput() {
+			if (getJobExecutionResult() != null) {
+				int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
 
-		if (useForwarder) {
-			producer.connectTo(forwarder, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
-			forwarder.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
+				double dataVolumeMbit = dataVolumeGb * 8192.0;
+				double runtimeSecs = getJobExecutionResult().getNetRuntime() / 1000.0;
 
-			forwarder.setVertexToShareInstancesWith(producer);
-			consumer.setVertexToShareInstancesWith(producer);
-		}
-		else {
-			producer.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
-			producer.setVertexToShareInstancesWith(consumer);
-		}
+				int mbitPerSecond = (int) Math.round(dataVolumeMbit / runtimeSecs);
 
-		return jobGraph;
+				LOG.info(String.format("Test finished with throughput of %d MBit/s (runtime [secs]: %.2f,
" +
+								"data volume [mbits]: %.2f)", mbitPerSecond, runtimeSecs, dataVolumeMbit));
+			}
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -293,4 +268,85 @@ public class NetworkStackThroughput extends RecordAPITestBase {
 			in.readFully(this.buf);
 		}
 	}
+
+	public static final class DummyInputFormat extends GenericInputFormat {
+
+		private static final long serialVersionUID = 6891640958330871924L;
+
+		@Override
+		public void open(InputSplit split) throws IOException {
+
+		}
+
+		@Override
+		public boolean reachedEnd() throws IOException {
+			return false;
+		}
+
+		@Override
+		public Object nextRecord(Object reuse) throws IOException {
+			return null;
+		}
+	}
+
+	public static final class DummyOutputFormat implements OutputFormat<Record> {
+
+		@Override
+		public void configure(Configuration parameters) {
+		}
+
+		@Override
+		public void open(int taskNumber, int numTasks) {
+		}
+
+		@Override
+		public void writeRecord(Record record) {
+		}
+
+		@Override
+		public void close() {
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	public void testThroughput() throws Exception {
+		Object[][] configParams = new Object[][]{
+				new Object[]{1, false, false, false, 4, 2},
+				new Object[]{1, true, false, false, 4, 2},
+				new Object[]{1, true, true, false, 4, 2},
+				new Object[]{1, true, false, true, 4, 2},
+				new Object[]{2, true, false, false, 4, 2},
+				new Object[]{4, true, false, false, 4, 2},
+				new Object[]{4, true, false, false, 8, 4},
+				new Object[]{4, true, false, false, 16, 8},
+		};
+
+		for (Object[] p : configParams) {
+			Configuration config = new Configuration();
+			config.setInteger(DATA_VOLUME_GB_CONFIG_KEY, (Integer) p[0]);
+			config.setBoolean(USE_FORWARDER_CONFIG_KEY, (Boolean) p[1]);
+			config.setBoolean(IS_SLOW_SENDER_CONFIG_KEY, (Boolean) p[2]);
+			config.setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, (Boolean) p[3]);
+			config.setInteger(PARALLELISM_CONFIG_KEY, (Integer) p[4]);
+			config.setInteger(NUM_SLOTS_PER_TM_CONFIG_KEY, (Integer) p[5]);
+
+			TestBaseWrapper test = new TestBaseWrapper(config);
+
+			test.startCluster();
+			test.testJob();
+			test.calculateThroughput();
+			test.stopCluster();
+		}
+	}
+
+	private void runAllTests() throws Exception {
+		testThroughput();
+
+		System.out.println("Done.");
+	}
+
+	public static void main(String[] args) throws Exception {
+		new NetworkStackThroughput().runAllTests();
+	}
 }


Mime
View raw message