flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject flink git commit: [examples] Fix remaining examples to properly handle eager print() execution.
Date Thu, 21 May 2015 14:07:19 GMT
Repository: flink
Updated Branches:
  refs/heads/master ad1d9362c -> b92ff1212


[examples] Fix remaining examples to properly handle eager print() execution.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b92ff121
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b92ff121
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b92ff121

Branch: refs/heads/master
Commit: b92ff121218ba637ee6079f93ceaea21a3958645
Parents: ad1d936
Author: Stephan Ewen <sewen@apache.org>
Authored: Thu May 21 15:38:50 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu May 21 15:38:50 2015 +0200

----------------------------------------------------------------------
 .../flink/examples/java/clustering/KMeans.java    | 12 ++++++------
 .../examples/java/graph/ConnectedComponents.java  |  3 +--
 .../examples/java/graph/EnumTrianglesBasic.java   |  4 +---
 .../java/graph/TransitiveClosureNaive.java        |  7 +++----
 .../java/misc/CollectionExecutionExample.java     | 18 ++++++------------
 .../flink/examples/java/misc/PiEstimation.java    | 15 ++++-----------
 .../flink/examples/java/ml/LinearRegression.java  |  3 ---
 .../relational/EmptyFieldsCountAccumulator.java   |  4 +---
 8 files changed, 22 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b92ff121/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
index 7383457..73f90ca 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
@@ -114,15 +114,15 @@ public class KMeans {
 				.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
 		
 		// emit result
-		if(fileOutput) {
+		if (fileOutput) {
 			clusteredPoints.writeAsCsv(outputPath, "\n", " ");
-		} else {
+
+			// since file sinks are lazy, we trigger the execution explicitly
+			env.execute("KMeans Example");
+		}
+		else {
 			clusteredPoints.print();
 		}
-
-		// execute program
-		env.execute("KMeans Example");
-		
 	}
 	
 	// *************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/b92ff121/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
index 827bb25..56f98a7 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
@@ -109,9 +109,8 @@ public class ConnectedComponents implements ProgramDescription {
 		DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);
 		
 		// emit result
-		if(fileOutput) {
+		if (fileOutput) {
 			result.writeAsCsv(outputPath, "\n", " ");
-
 			// execute program
 			env.execute("Connected Components Example");
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/b92ff121/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
index fdbe197..423edc7 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
@@ -104,15 +104,13 @@ public class EnumTrianglesBasic {
 				.join(edgesById).where(Triad.V2, Triad.V3).equalTo(Edge.V1, Edge.V2).with(new TriadFilter());
 
 		// emit result
-		if(fileOutput) {
+		if (fileOutput) {
 			triangles.writeAsCsv(outputPath, "\n", ",");
-
 			// execute program
 			env.execute("Basic Triangle Enumeration Example");
 		} else {
 			triangles.print();
 		}
-
 	}
 	
 	// *************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/b92ff121/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
index d0fefeb..5306895 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
@@ -98,13 +98,12 @@ public class TransitiveClosureNaive implements ProgramDescription {
 		// emit result
 		if (fileOutput) {
 			transitiveClosure.writeAsCsv(outputPath, "\n", " ");
+
+			// execute program explicitly, because file sinks are lazy
+			env.execute("Transitive Closure Example");
 		} else {
 			transitiveClosure.print();
 		}
-
-		// execute program
-		env.execute("Transitive Closure Example");
-
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b92ff121/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java
index bf8b942..57ab3f6 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java
@@ -18,13 +18,10 @@
 
 package org.apache.flink.examples.java.misc;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
+import java.util.List;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 
 /** 
@@ -76,27 +73,24 @@ public class CollectionExecutionExample {
 		
 		// create objects for users and emails
 		User[] usersArray = { new User(1, "Peter"), new User(2, "John"), new User(3, "Bill") };
+		
 		EMail[] emailsArray = {new EMail(1, "Re: Meeting", "How about 1pm?"),
 							new EMail(1, "Re: Meeting", "Sorry, I'm not availble"),
 							new EMail(3, "Re: Re: Project proposal", "Give me a few more days to think about it.")};
 		
 		// convert objects into a DataSet
-		DataSet<User> users = env.fromCollection(Arrays.asList(usersArray));
-		DataSet<EMail> emails = env.fromCollection(Arrays.asList(emailsArray));
+		DataSet<User> users = env.fromElements(usersArray);
+		DataSet<EMail> emails = env.fromElements(emailsArray);
 		
 		// join the two DataSets
 		DataSet<Tuple2<User,EMail>> joined = users.join(emails).where("userIdentifier").equalTo("userId");
 		
 		// retrieve the resulting Tuple2 elements into a ArrayList.
-		Collection<Tuple2<User,EMail>> result = new ArrayList<Tuple2<User,EMail>>(3);
-		joined.output(new LocalCollectionOutputFormat<Tuple2<User,EMail>>(result));
-		
-		// kick off execution.
-		env.execute();
+		List<Tuple2<User,EMail>> result = joined.collect();
 		
 		// Do some work with the resulting ArrayList (=Collection).
 		for(Tuple2<User, EMail> t : result) {
-			System.err.println("Result = "+t);
+			System.err.println("Result = " + t);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b92ff121/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
index 2780bb1..fc85110 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
@@ -60,16 +60,9 @@ public class PiEstimation implements java.io.Serializable {
 				.map(new Sampler())
 				.reduce(new SumReducer());
 
-		// the ratio of the unit circle surface to 4 times the unit square is pi
-		DataSet<Double> pi = count
-				.map(new MapFunction<Long, Double>() {
-					public Double map(Long value) {
-						return value * 4.0 / numSamples;
-					}
-				});
+		long theCount = count.collect().get(0);
 
-		System.out.println("We estimate Pi to be:");
-		pi.print();
+		System.out.println("We estimate Pi to be: " + (theCount * 4.0 / numSamples));
 	}
 
 	//*************************************************************************
@@ -85,7 +78,7 @@ public class PiEstimation implements java.io.Serializable {
 	public static class Sampler implements MapFunction<Long, Long> {
 
 		@Override
-		public Long map(Long value) throws Exception{
+		public Long map(Long value) {
 			double x = Math.random();
 			double y = Math.random();
 			return (x * x + y * y) < 1 ? 1L : 0L;
@@ -99,7 +92,7 @@ public class PiEstimation implements java.io.Serializable {
 	public static final class SumReducer implements ReduceFunction<Long>{
 
 		@Override
-		public Long reduce(Long value1, Long value2) throws Exception {
+		public Long reduce(Long value1, Long value2) {
 			return value1 + value2;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b92ff121/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
index 46873f6..341daa6 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
@@ -109,9 +109,6 @@ public class LinearRegression {
 		} else {
 			result.print();
 		}
-
-
-
 	}
 
 	// *************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/b92ff121/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
index 9f6f567..558ee23 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
@@ -72,7 +72,7 @@ public class EmptyFieldsCountAccumulator {
 		final DataSet<Tuple> filteredLines = file.filter(new EmptyFieldFilter());
 
 		// Here, we could do further processing with the filtered lines...
-		JobExecutionResult result = null;
+		JobExecutionResult result;
 		// output the filtered lines
 		if (outputPath == null) {
 			filteredLines.print();
@@ -83,11 +83,9 @@ public class EmptyFieldsCountAccumulator {
 			result = env.execute("Accumulator example");
 		}
 
-
 		// get the accumulator result via its registration key
 		final List<Integer> emptyFields = result.getAccumulatorResult(EMPTY_FIELD_ACCUMULATOR);
 		System.out.format("Number of detected empty fields per column: %s\n", emptyFields);
-
 	}
 
 	// *************************************************************************


Mime
View raw message