flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [01/63] [abbrv] Refactor job graph construction to incremental attachment based
Date Sun, 21 Sep 2014 02:12:25 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 439ca7ffe -> 91cfbc5aa


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
index 8246d22..65c9857 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
@@ -131,7 +131,7 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase
{
 		// --------------------------------------------------------------------------------------------------------------
 
 		// - input -----------------------------------------------------------------------------------------------------
-		InputFormatInputVertex input = JobGraphUtils.createInput(
+		InputFormatVertex input = JobGraphUtils.createInput(
 			new PointInFormat(), inputPath, "Input", jobGraph, numSubTasks);
 		TaskConfig inputConfig = new TaskConfig(input.getConfiguration());
 		{
@@ -214,7 +214,7 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase
{
 		}
 
 		// - output ----------------------------------------------------------------------------------------------------
-		OutputFormatOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks);
+		OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks);
 		TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
 		{
 			outputConfig.addInputToGroup(0);
@@ -225,10 +225,10 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase
{
 		}
 
 		// - fake tail -------------------------------------------------------------------------------------------------
-		SimpleOutputVertex fakeTail = JobGraphUtils.createFakeOutput(jobGraph, "Fake Tail", numSubTasks);
+		OutputFormatVertex fakeTail = JobGraphUtils.createFakeOutput(jobGraph, "Fake Tail", numSubTasks);
 
 		// - sync ------------------------------------------------------------------------------------------------------
-		SimpleOutputVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks);
+		OutputFormatVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
 		syncConfig.setNumberOfIterations(maxIterations);
 		syncConfig.setIterationId(ITERATION_ID);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
index 052c7ea..82bd046 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative.nephele;
 
 import java.io.IOException;
@@ -33,12 +32,10 @@ import org.apache.flink.runtime.iterative.io.FakeOutputTask;
 import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.InputFormatInputVertex;
+import org.apache.flink.runtime.jobgraph.InputFormatVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
-import org.apache.flink.runtime.jobgraph.JobTaskVertex;
-import org.apache.flink.runtime.jobgraph.OutputFormatOutputVertex;
-import org.apache.flink.runtime.jobgraph.SimpleOutputVertex;
+import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
 import org.apache.flink.runtime.operators.DataSinkTask;
 import org.apache.flink.runtime.operators.DataSourceTask;
 import org.apache.flink.runtime.operators.RegularPactTask;
@@ -56,21 +53,21 @@ public class JobGraphUtils {
 		client.submitJobAndWait();
 	}
 	
-	public static <T extends FileInputFormat<?>> InputFormatInputVertex createInput(T
stub, String path, String name, JobGraph graph,
+	public static <T extends FileInputFormat<?>> InputFormatVertex createInput(T
stub, String path, String name, JobGraph graph,
 			int degreeOfParallelism)
 	{
 		stub.setFilePath(path);
 		return createInput(new UserCodeObjectWrapper<T>(stub), name, graph, degreeOfParallelism);
 	}
 
-	private static <T extends InputFormat<?,?>> InputFormatInputVertex createInput(UserCodeWrapper<T>
stub, String name, JobGraph graph,
+	private static <T extends InputFormat<?,?>> InputFormatVertex createInput(UserCodeWrapper<T>
stub, String name, JobGraph graph,
 			int degreeOfParallelism)
 	{
-		InputFormatInputVertex inputVertex = new InputFormatInputVertex(name, graph);
+		InputFormatVertex inputVertex = new InputFormatVertex(graph, name);
 		
 		inputVertex.setInvokableClass(DataSourceTask.class);
 		
-		inputVertex.setNumberOfSubtasks(degreeOfParallelism);
+		inputVertex.setParallelism(degreeOfParallelism);
 
 		TaskConfig inputConfig = new TaskConfig(inputVertex.getConfiguration());
 		inputConfig.setStubWrapper(stub);
@@ -100,28 +97,28 @@ public class JobGraphUtils {
 		return taskVertex;
 	}
 
-	public static SimpleOutputVertex createSync(JobGraph jobGraph, int degreeOfParallelism)
{
-		SimpleOutputVertex sync = new SimpleOutputVertex("BulkIterationSync", jobGraph);
+	public static OutputFormatVertex createSync(JobGraph jobGraph, int degreeOfParallelism)
{
+		OutputFormatVertex sync = new OutputFormatVertex(jobGraph, "BulkIterationSync");
 		sync.setInvokableClass(IterationSynchronizationSinkTask.class);
-		sync.setNumberOfSubtasks(1);
+		sync.setParallelism(1);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
 		syncConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, degreeOfParallelism);
 		return sync;
 	}
 
-	public static SimpleOutputVertex createFakeOutput(JobGraph jobGraph, String name, int degreeOfParallelism)
+	public static OutputFormatVertex createFakeOutput(JobGraph jobGraph, String name, int degreeOfParallelism)
 	{
-		SimpleOutputVertex outputVertex = new SimpleOutputVertex(name, jobGraph);
+		OutputFormatVertex outputVertex = new OutputFormatVertex(jobGraph, name);
 		outputVertex.setInvokableClass(FakeOutputTask.class);
-		outputVertex.setNumberOfSubtasks(degreeOfParallelism);
+		outputVertex.setParallelism(degreeOfParallelism);
 		return outputVertex;
 	}
 
-	public static OutputFormatOutputVertex createFileOutput(JobGraph jobGraph, String name,
int degreeOfParallelism)
+	public static OutputFormatVertex createFileOutput(JobGraph jobGraph, String name, int degreeOfParallelism)
 	{
-		OutputFormatOutputVertex sinkVertex = new OutputFormatOutputVertex(name, jobGraph);
+		OutputFormatVertex sinkVertex = new OutputFormatVertex(jobGraph, name);
 		sinkVertex.setInvokableClass(DataSinkTask.class);
-		sinkVertex.setNumberOfSubtasks(degreeOfParallelism);
+		sinkVertex.setParallelism(degreeOfParallelism);
 		return sinkVertex;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
index aea2c2c..a6771ba 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
@@ -267,7 +267,7 @@ public class CustomCompensatableDanglingPageRank {
 		
 		// --------------- the output ---------------------
 
-		OutputFormatOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput",
degreeOfParallelism);
+		OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism);
 		TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
 		outputConfig.addInputToGroup(0);
 		outputConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
@@ -276,10 +276,10 @@ public class CustomCompensatableDanglingPageRank {
 		
 		// --------------- the auxiliaries ---------------------
 		
-		SimpleOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput",
+		OutputFormatVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput",
 			degreeOfParallelism);
 
-		SimpleOutputVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
+		OutputFormatVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
 		syncConfig.setNumberOfIterations(numIterations);
 		syncConfig.addIterationAggregator(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME,
new PageRankStatsAggregator());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
index a740cf3..7eacf1b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
@@ -138,7 +138,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		// --------------- the inputs ---------------------
 
 		// page rank input
-		InputFormatInputVertex pageWithRankInput = JobGraphUtils.createInput(new CustomImprovedDanglingPageRankInputFormat(),
+		InputFormatVertex pageWithRankInput = JobGraphUtils.createInput(new CustomImprovedDanglingPageRankInputFormat(),
 			pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism);
 		TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
 		pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
@@ -147,7 +147,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		pageWithRankInputConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
 
 		// edges as adjacency list
-		InputFormatInputVertex adjacencyListInput = JobGraphUtils.createInput(new CustomImprovedAdjacencyListInputFormat(),
+		InputFormatVertex adjacencyListInput = JobGraphUtils.createInput(new CustomImprovedAdjacencyListInputFormat(),
 			adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism);
 		TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
 		adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
@@ -279,7 +279,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		
 		// --------------- the output ---------------------
 
-		OutputFormatOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput",
degreeOfParallelism);
+		OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism);
 		TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
 		outputConfig.addInputToGroup(0);
 		outputConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
@@ -288,9 +288,10 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		
 		// --------------- the auxiliaries ---------------------
 		
-		SimpleOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput",
degreeOfParallelism);
+		OutputFormatVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput",
+			degreeOfParallelism);
 
-		SimpleOutputVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
+		OutputFormatVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
 		syncConfig.setNumberOfIterations(numIterations);
 		syncConfig.addIterationAggregator(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME,
new PageRankStatsAggregator());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
index d1f4ae0..317963b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
@@ -119,7 +119,7 @@ public class CompensatableDanglingPageRank {
 		// --------------- the inputs ---------------------
 
 		// page rank input
-		InputFormatInputVertex pageWithRankInput = JobGraphUtils.createInput(new ImprovedDanglingPageRankInputFormat(),
+		InputFormatVertex pageWithRankInput = JobGraphUtils.createInput(new ImprovedDanglingPageRankInputFormat(),
 			pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism);
 		TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
 		pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
@@ -128,7 +128,7 @@ public class CompensatableDanglingPageRank {
 		pageWithRankInputConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
 
 		// edges as adjacency list
-		InputFormatInputVertex adjacencyListInput = JobGraphUtils.createInput(new ImprovedAdjacencyListInputFormat(),
+		InputFormatVertex adjacencyListInput = JobGraphUtils.createInput(new ImprovedAdjacencyListInputFormat(),
 			adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism);
 		TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
 		adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
@@ -248,7 +248,7 @@ public class CompensatableDanglingPageRank {
 		
 		// --------------- the output ---------------------
 
-		OutputFormatOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput",
degreeOfParallelism);
+		OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism);
 		TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
 		outputConfig.addInputToGroup(0);
 		outputConfig.setInputSerializer(recSerializer, 0);
@@ -257,10 +257,10 @@ public class CompensatableDanglingPageRank {
 		
 		// --------------- the auxiliaries ---------------------
 		
-		SimpleOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput",
+		OutputFormatVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput",
 			degreeOfParallelism);
 
-		SimpleOutputVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
+		OutputFormatVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
 		syncConfig.setNumberOfIterations(numIterations);
 		syncConfig.addIterationAggregator(CompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/IntermediateDataSet.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/IntermediateDataSet.java
b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/IntermediateDataSet.java
deleted file mode 100644
index fdc3375..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/IntermediateDataSet.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.jobgraph;
-
-/**
- * An intermediate data set is the data set produced by an operator - either a
- * source or any intermediate operation.
- * 
- * Intermediate data sets may be read by other operators, materialized, or
- * discarded.
- */
-public class IntermediateDataSet {
-	
-	private final IntermediateDataSetID id; 		// the identifier
-	
-	private final AbstractJobVertex producer;		// the operation that produced this data set
-
-	
-	public IntermediateDataSet(AbstractJobVertex producer) {
-		this(new IntermediateDataSetID(), producer);
-	}
-	
-	public IntermediateDataSet(IntermediateDataSetID id, AbstractJobVertex producer) {
-		this.id = id;
-		this.producer = producer;
-	}
-	
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/IntermediateDataSetID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/IntermediateDataSetID.java
b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/IntermediateDataSetID.java
deleted file mode 100644
index ac12be9..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/IntermediateDataSetID.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.jobgraph;
-
-import java.util.UUID;
-
-import eu.stratosphere.nephele.AbstractID;
-
-public class IntermediateDataSetID extends AbstractID {
-
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * Creates an new random intermediate data set ID.
-	 */
-	public IntermediateDataSetID() {
-		super();
-	}
-	
-	/**
-	 * Creates a new intermediate data set ID with the bytes of the given ID.
-	 * 
-	 * @param from The ID to create this ID from.
-	 */
-	public IntermediateDataSetID(AbstractID from) {
-		super(from);
-	}
-	
-	/**
-	 * Creates a new intermediate data set ID with the bytes of the given UUID.
-	 * 
-	 * @param from The UUID to create this ID from.
-	 */
-	public IntermediateDataSetID(UUID from) {
-		super(from.getLeastSignificantBits(), from.getMostSignificantBits());
-	}
-}


Mime
View raw message