flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [48/53] [abbrv] git commit: InputVertices do not require an input format.
Date Thu, 26 Jun 2014 09:47:13 GMT
InputVertices do not require an input format.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/35438ec2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/35438ec2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/35438ec2

Branch: refs/heads/travis_test
Commit: 35438ec2f3b0d3babb4f9348a32df89a7376ae25
Parents: ef623e9
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Jun 25 16:52:55 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Jun 25 16:52:55 2014 +0200

----------------------------------------------------------------------
 .../executiongraph/ExecutionGroupVertex.java    |  6 +-
 .../nephele/jobgraph/JobInputVertex.java        |  6 +-
 .../test/runtime/NetworkStackThroughput.java    | 66 +++-----------------
 3 files changed, 14 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35438ec2/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
index dceeb90..599c682 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
@@ -429,8 +429,9 @@ public final class ExecutionGroupVertex {
 	 *
 	 * @param inputSplitType Input split type class
 	 */
-	public void setInputSplitType(final Class<? extends InputSplit> inputSplitType) {
this.inputSplitType =
-			inputSplitType; }
+	public void setInputSplitType(final Class<? extends InputSplit> inputSplitType) {

+		this.inputSplitType = inputSplitType;
+	}
 
 	/**
 	 * Returns the input splits assigned to this group vertex.
@@ -438,7 +439,6 @@ public final class ExecutionGroupVertex {
 	 * @return the input splits, possibly <code>null</code> if the group vertex
does not represent an input vertex
 	 */
 	public InputSplit[] getInputSplits() {
-
 		return this.inputSplits;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35438ec2/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobInputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobInputVertex.java
b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobInputVertex.java
index bf8f544..2c8ba86 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobInputVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobInputVertex.java
@@ -72,8 +72,8 @@ public class JobInputVertex extends AbstractJobInputVertex {
 	 */
 	@Override
 	public Class<? extends InputSplit> getInputSplitType() {
-		if(inputFormat == null){
-			throw new RuntimeException("No input format has been set for job vertex: "+ this.getID());
+		if (inputFormat == null){
+			return InputSplit.class;
 		}
 
 		return inputFormat.getInputSplitType();
@@ -89,7 +89,7 @@ public class JobInputVertex extends AbstractJobInputVertex {
 	@Override
 	public InputSplit[] getInputSplits(int minNumSplits) throws IOException {
 		if (inputFormat == null){
-			throw new RuntimeException("No input format has been set for job vertex: "+ this.getID());
+			return null;
 		}
 
 		return inputFormat.createInputSplits(minNumSplits);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35438ec2/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
index 74d52b9..9b202c6 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
@@ -13,12 +13,16 @@
 
 package eu.stratosphere.test.runtime;
 
-import eu.stratosphere.api.common.io.GenericInputFormat;
-import eu.stratosphere.api.common.io.OutputFormat;
-import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.core.io.InputSplit;
 import eu.stratosphere.nephele.jobgraph.DistributionPattern;
 import eu.stratosphere.nephele.jobgraph.JobGraph;
 import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
@@ -26,20 +30,11 @@ import eu.stratosphere.nephele.jobgraph.JobInputVertex;
 import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
 import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
 import eu.stratosphere.nephele.template.AbstractInvokable;
-import eu.stratosphere.pact.runtime.task.util.TaskConfig;
 import eu.stratosphere.runtime.io.api.RecordReader;
 import eu.stratosphere.runtime.io.api.RecordWriter;
 import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.test.util.RecordAPITestBase;
-import eu.stratosphere.types.Record;
 import eu.stratosphere.util.LogUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.junit.After;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 
 public class NetworkStackThroughput {
 
@@ -109,9 +104,6 @@ public class NetworkStackThroughput {
 			producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb);
 			producer.getConfiguration().setBoolean(IS_SLOW_SENDER_CONFIG_KEY, isSlowSender);
 
-			TaskConfig inputConfig = new TaskConfig(producer.getConfiguration());
-			inputConfig.setStubWrapper(new UserCodeObjectWrapper<Object>(new DummyInputFormat()));
-
 			JobTaskVertex forwarder = null;
 			if (useForwarder) {
 				forwarder = new JobTaskVertex("Speed Test Forwarder", jobGraph);
@@ -124,9 +116,6 @@ public class NetworkStackThroughput {
 			consumer.setNumberOfSubtasks(numSubtasks);
 			consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);
 
-			TaskConfig outputConfig = new TaskConfig(consumer.getConfiguration());
-			outputConfig.setStubWrapper(new UserCodeObjectWrapper<Object>(new DummyOutputFormat()));
-
 			if (useForwarder) {
 				producer.connectTo(forwarder, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
 				forwarder.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
@@ -269,45 +258,6 @@ public class NetworkStackThroughput {
 		}
 	}
 
-	public static final class DummyInputFormat extends GenericInputFormat {
-
-		private static final long serialVersionUID = 6891640958330871924L;
-
-		@Override
-		public void open(InputSplit split) throws IOException {
-
-		}
-
-		@Override
-		public boolean reachedEnd() throws IOException {
-			return false;
-		}
-
-		@Override
-		public Object nextRecord(Object reuse) throws IOException {
-			return null;
-		}
-	}
-
-	public static final class DummyOutputFormat implements OutputFormat<Record> {
-
-		@Override
-		public void configure(Configuration parameters) {
-		}
-
-		@Override
-		public void open(int taskNumber, int numTasks) {
-		}
-
-		@Override
-		public void writeRecord(Record record) {
-		}
-
-		@Override
-		public void close() {
-		}
-	}
-
 	// ------------------------------------------------------------------------
 
 	public void testThroughput() throws Exception {


Mime
View raw message