flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [4/6] git commit: Transitive closure example in Java API
Date Wed, 18 Jun 2014 18:52:30 GMT
Transitive closure example in Java API

This closes #24


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f3930e3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f3930e3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f3930e3d

Branch: refs/heads/master
Commit: f3930e3dba63d98f2b0fc7986fb507f9afa8283a
Parents: b2bd469
Author: Kostas Tzoumas <Kostas.tzoumas@gmail.com>
Authored: Fri Jun 6 17:22:30 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Jun 18 18:57:22 2014 +0200

----------------------------------------------------------------------
 .../stratosphere-java-examples/pom.xml          |  26 +++-
 .../java/graph/TransitiveClosureNaive.java      | 132 +++++++++++++++++++
 .../test/testdata/TransitiveClosureData.java    |  44 +++++++
 .../TransitiveClosureITCase.java                |  54 ++++++++
 4 files changed, 255 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f3930e3d/stratosphere-examples/stratosphere-java-examples/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-examples/stratosphere-java-examples/pom.xml b/stratosphere-examples/stratosphere-java-examples/pom.xml
index 3fd4c9b..705c46c 100644
--- a/stratosphere-examples/stratosphere-java-examples/pom.xml
+++ b/stratosphere-examples/stratosphere-java-examples/pom.xml
@@ -57,7 +57,31 @@
 							</includes>
 						</configuration>
 					</execution>
-					
+
+                    <!-- Transitive Closure -->
+                    <execution>
+                        <id>TransitiveClosure</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                        <configuration>
+                            <classifier>TransitiveClosure</classifier>
+
+                            <archive>
+                                <manifestEntries>
+                                    <program-class>eu.stratosphere.example.java.graph.TransitiveClosureNaive</program-class>
+                                </manifestEntries>
+                            </archive>
+
+                            <includes>
+                                <include>**/java/graph/TransitiveClosureNaive.class</include>
+                                <include>**/java/graph/TransitiveClosureNaive$*.class</include>
+                                <include>**/java/graph/util/ConnectedComponentsData.class</include>
+                            </includes>
+                        </configuration>
+                    </execution>
+
 					<!-- Connected Components -->
 					<execution>
 						<id>ConnectedComponents</id>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f3930e3d/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/TransitiveClosureNaive.java
----------------------------------------------------------------------
diff --git a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/TransitiveClosureNaive.java
b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/TransitiveClosureNaive.java
new file mode 100644
index 0000000..3fd1627
--- /dev/null
+++ b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/TransitiveClosureNaive.java
@@ -0,0 +1,132 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.example.java.graph;
+
+import eu.stratosphere.api.common.ProgramDescription;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.IterativeDataSet;
+import eu.stratosphere.api.java.functions.GroupReduceFunction;
+import eu.stratosphere.api.java.functions.JoinFunction;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.example.java.graph.util.ConnectedComponentsData;
+import eu.stratosphere.util.Collector;
+
+import java.util.Iterator;
+
+
+public class TransitiveClosureNaive implements ProgramDescription {
+
+
+	public static void main (String... args) throws Exception{
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up execution environment
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env);
+
+		IterativeDataSet<Tuple2<Long,Long>> paths = edges.iterate(maxIterations);
+
+		DataSet<Tuple2<Long,Long>> nextPaths = paths
+				.join(edges)
+				.where(1)
+				.equalTo(0)
+				.with(new JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long,
Long>>() {
+					@Override
+					/**
+						left: Path (z,x) - x is reachable by z
+						right: Edge (x,y) - edge x-->y exists
+						out: Path (z,y) - y is reachable by z
+					 */
+					public Tuple2<Long, Long> join(Tuple2<Long, Long> left, Tuple2<Long,
Long> right) throws Exception {
+						return new Tuple2<Long, Long>(
+								new Long(left.f0),
+								new Long(right.f1));
+					}
+				})
+				.union(paths)
+				.groupBy(0, 1)
+				.reduceGroup(new GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>()
{
+					@Override
+					public void reduce(Iterator<Tuple2<Long, Long>> values, Collector<Tuple2<Long,
Long>> out) throws Exception {
+						out.collect(values.next());
+					}
+				});
+
+		DataSet<Tuple2<Long, Long>> transitiveClosure = paths.closeWith(nextPaths);
+
+
+		// emit result
+		if (fileOutput) {
+			transitiveClosure.writeAsCsv(outputPath, "\n", " ");
+		} else {
+			transitiveClosure.print();
+		}
+
+		// execute program
+		env.execute("Transitive Closure Example");
+
+	}
+
+	@Override
+	public String getDescription() {
+		return "Parameters: <edges-path> <result-path> <max-number-of-iterations>";
+	}
+
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String edgesPath = null;
+	private static String outputPath = null;
+	private static int maxIterations = 10;
+
+	private static boolean parseParameters(String[] programArguments) {
+
+		if (programArguments.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (programArguments.length == 3) {
+				edgesPath = programArguments[0];
+				outputPath = programArguments[1];
+				maxIterations = Integer.parseInt(programArguments[2]);
+			} else {
+				System.err.println("Usage: TransitiveClosure <edges path> <result path> <max
number of iterations>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing TransitiveClosure example with default parameters and built-in
default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  Usage: TransitiveClosure <edges path> <result path>
<max number of iterations>");
+		}
+		return true;
+	}
+
+
+	private static DataSet<Tuple2<Long, Long>> getEdgeDataSet(ExecutionEnvironment
env) {
+
+		if(fileOutput) {
+			return env.readCsvFile(edgesPath).fieldDelimiter(' ').types(Long.class, Long.class);
+		} else {
+			return ConnectedComponentsData.getDefaultEdgeDataSet(env);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f3930e3d/stratosphere-test-utils/src/main/java/eu/stratosphere/test/testdata/TransitiveClosureData.java
----------------------------------------------------------------------
diff --git a/stratosphere-test-utils/src/main/java/eu/stratosphere/test/testdata/TransitiveClosureData.java
b/stratosphere-test-utils/src/main/java/eu/stratosphere/test/testdata/TransitiveClosureData.java
new file mode 100644
index 0000000..f39216a
--- /dev/null
+++ b/stratosphere-test-utils/src/main/java/eu/stratosphere/test/testdata/TransitiveClosureData.java
@@ -0,0 +1,44 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.test.testdata;
+
+import org.junit.Assert;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+public class TransitiveClosureData {
+
+	public static void checkOddEvenResult(BufferedReader result) throws IOException {
+		Pattern split = Pattern.compile(" ");
+		String line;
+		while ((line = result.readLine()) != null) {
+			String[] res = split.split(line);
+			Assert.assertEquals("Malformed result: Wrong number of tokens in line.", 2, res.length);
+			try {
+				int from = Integer.parseInt(res[0]);
+				int to = Integer.parseInt(res[1]);
+
+				Assert.assertEquals("Vertex should not be reachable.", from % 2, to % 2);
+			} catch (NumberFormatException e) {
+				Assert.fail("Malformed result.");
+			}
+		}
+	}
+
+	private TransitiveClosureData() {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f3930e3d/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/TransitiveClosureITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/TransitiveClosureITCase.java
b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/TransitiveClosureITCase.java
new file mode 100644
index 0000000..96761c8
--- /dev/null
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/TransitiveClosureITCase.java
@@ -0,0 +1,54 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.test.exampleJavaPrograms;
+
+
+import eu.stratosphere.example.java.graph.TransitiveClosureNaive;
+import eu.stratosphere.test.testdata.ConnectedComponentsData;
+import eu.stratosphere.test.testdata.TransitiveClosureData;
+import eu.stratosphere.test.util.JavaProgramTestBase;
+
+import java.io.BufferedReader;
+
+public class TransitiveClosureITCase extends JavaProgramTestBase {
+
+	private static final long SEED = 0xBADC0FFEEBEEFL;
+
+	private static final int NUM_VERTICES = 1000;
+
+	private static final int NUM_EDGES = 10000;
+
+	private String edgesPath;
+	private String resultPath;
+
+
+	@Override
+	protected void preSubmit() throws Exception {
+		edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES,
NUM_VERTICES, SEED));
+		resultPath = getTempFilePath("results");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		TransitiveClosureNaive.main(edgesPath, resultPath, "100");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		for (BufferedReader reader : getResultReader(resultPath)) {
+			TransitiveClosureData.checkOddEvenResult(reader);
+		}
+	}
+}
+


Mime
View raw message