flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/2] incubator-flink git commit: [FLINK-820] [compiler] Support for disconnected data flows
Date Thu, 20 Nov 2014 19:52:52 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master b3e5ed0ba -> 98ff76b0e


[FLINK-820] [compiler] Support for disconnected data flows


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/98ff76b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/98ff76b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/98ff76b0

Branch: refs/heads/master
Commit: 98ff76b0ea61a342250e15411edd9f7974cbe96d
Parents: d0f2db0
Author: Stephan Ewen <sewen@apache.org>
Authored: Thu Nov 20 16:43:18 2014 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Nov 20 19:14:47 2014 +0100

----------------------------------------------------------------------
 .../apache/flink/compiler/dag/SinkJoiner.java   | 36 +++++++-------
 .../compiler/BranchingPlansCompilerTest.java    | 47 +-----------------
 .../flink/compiler/DisjointDataFlowsTest.java   | 51 ++++++++++++++++++++
 .../test/misc/DisjointDataflowsITCase.java      | 37 ++++++++++++++
 4 files changed, 108 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98ff76b0/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SinkJoiner.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SinkJoiner.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SinkJoiner.java
index b37c6ac..c153078 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SinkJoiner.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SinkJoiner.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.dag;
 
 import java.util.ArrayList;
@@ -24,7 +23,6 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
-import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.DataStatistics;
 import org.apache.flink.compiler.operators.OperatorDescriptorDual;
 import org.apache.flink.compiler.operators.UtilSinkJoinOpDescriptor;
@@ -74,23 +72,25 @@ public class SinkJoiner extends TwoInputNode {
 		
 		// if the predecessors do not have branches, then we have multiple sinks that do not originate
from
 		// a common data flow.
-		if (pred1branches == null || pred1branches.isEmpty() || pred2branches == null || pred2branches.isEmpty())
{
-			throw new CompilerException("The given program contains multiple disconnected data flows.");
+		if (pred1branches == null || pred1branches.isEmpty()) {
+			
+			this.openBranches = (pred2branches == null || pred2branches.isEmpty()) ?
+					Collections.<UnclosedBranchDescriptor>emptyList() : // both empty - disconnected
flow
+					pred2branches;
+		}
+		else if (pred2branches == null || pred2branches.isEmpty()) {
+			this.openBranches = pred1branches;
+		}
+		else {
+			// copy the lists and merge
+			List<UnclosedBranchDescriptor> result1 = new ArrayList<UnclosedBranchDescriptor>(pred1branches);
+			List<UnclosedBranchDescriptor> result2 = new ArrayList<UnclosedBranchDescriptor>(pred2branches);
+			
+			ArrayList<UnclosedBranchDescriptor> result = new ArrayList<UnclosedBranchDescriptor>();
+			mergeLists(result1, result2, result);
+			
+			this.openBranches = result.isEmpty() ? Collections.<UnclosedBranchDescriptor>emptyList()
: result;
 		}
-		
-		// copy the lists and merge
-		List<UnclosedBranchDescriptor> result1 = new ArrayList<UnclosedBranchDescriptor>(pred1branches);
-		List<UnclosedBranchDescriptor> result2 = new ArrayList<UnclosedBranchDescriptor>(pred2branches);
-		
-		ArrayList<UnclosedBranchDescriptor> result = new ArrayList<UnclosedBranchDescriptor>();
-		mergeLists(result1, result2, result);
-		
-//		if (!didCloseSomeBranch) {
-//			// if the sink joiners do not close branches, then we have disjoint data flows.
-//			throw new CompilerException("The given program contains multiple disconnected data flows.");
-//		}
-		
-		this.openBranches = result.isEmpty() ? Collections.<UnclosedBranchDescriptor>emptyList()
: result;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98ff76b0/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
index 196f602..2b4ff02 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
@@ -566,42 +566,6 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	/**
 	 * 
 	 * <pre>
-	 *           (SINK A)    (SINK B)
-	 *             /           /
-	 *         (SRC A)     (SRC B)
-	 * </pre>
-	 */
-	@Test
-	public void testSimpleDisjointPlan() {
-		// construct the plan
-		final String out1Path = "file:///test/1";
-		final String out2Path = "file:///test/2";
-
-		FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE);
-		FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE);
-		
-		FileDataSink sinkA = new FileDataSink(DummyOutputFormat.class, out1Path, sourceA);
-		FileDataSink sinkB = new FileDataSink(DummyOutputFormat.class, out2Path, sourceB);
-		
-		List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-		sinks.add(sinkA);
-		sinks.add(sinkB);
-		
-		// return the PACT plan
-		Plan plan = new Plan(sinks, "Disjoint plan with multiple data sinks");
-		
-		try {
-			compileNoStats(plan);
-			Assert.fail("Plan must not be compilable, it contains disjoint sub-plans.");
-		}
-		catch (Exception ex) {
-			// as expected
-		}
-	}
-	
-	/**
-	 * 
-	 * <pre>
 	 *     (SINK 3) (SINK 1)   (SINK 2) (SINK 4)
 	 *         \     /             \     /
 	 *         (SRC A)             (SRC B)
@@ -609,7 +573,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	 * 
 	 * NOTE: this case is currently not caught by the compiler. we should enable the test once
it is caught.
 	 */
-//	@Test (Deactivated for now because of unsupported feature)
+	@Test
 	public void testBranchingDisjointPlan() {
 		// construct the plan
 		final String out1Path = "file:///test/1";
@@ -634,14 +598,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 		
 		// return the PACT plan
 		Plan plan = new Plan(sinks, "Disjoint plan with multiple data sinks and branches");
-		
-		try {
-			compileNoStats(plan);
-			Assert.fail("Plan must not be compilable, it contains disjoint sub-plans.");
-		}
-		catch (Exception ex) {
-			// as expected
-		}
+		compileNoStats(plan);
 	}
 	
 	@Test

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98ff76b0/flink-compiler/src/test/java/org/apache/flink/compiler/DisjointDataFlowsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/DisjointDataFlowsTest.java
b/flink-compiler/src/test/java/org/apache/flink/compiler/DisjointDataFlowsTest.java
new file mode 100644
index 0000000..0c7bbef
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/DisjointDataFlowsTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class DisjointDataFlowsTest extends CompilerTestBase {
+
+	@Test
+	public void testDisjointFlows() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			// generate two different flows
+			env.generateSequence(1, 10).print();
+			env.generateSequence(1, 10).print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			new NepheleJobGraphGenerator().compileJobGraph(op);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98ff76b0/flink-tests/src/test/java/org/apache/flink/test/misc/DisjointDataflowsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/DisjointDataflowsITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/misc/DisjointDataflowsITCase.java
new file mode 100644
index 0000000..f4d3d0b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/DisjointDataflowsITCase.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.misc;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOuputFormat;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class DisjointDataflowsITCase extends JavaProgramTestBase {
+
+	@Override
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		// generate two different flows
+		env.generateSequence(1, 10).output(new DiscardingOuputFormat<Long>());
+		env.generateSequence(1, 10).output(new DiscardingOuputFormat<Long>());
+		
+		env.execute();
+	}
+}


Mime
View raw message