flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [81/82] [abbrv] incubator-flink git commit: Fix race condition in ExecutionGraph which made the job finish before all vertices have called the finalizeOnMaster method.
Date Thu, 18 Dec 2014 18:46:17 GMT
Fix race condition in ExecutionGraph which made the job finish before all vertices have called
the finalizeOnMaster method.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/04b97c94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/04b97c94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/04b97c94

Branch: refs/heads/master
Commit: 04b97c942fcf81e397ebb43cb344ee4cbb657bf5
Parents: ae8fb94
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Dec 18 00:46:39 2014 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Dec 18 18:58:33 2014 +0100

----------------------------------------------------------------------
 .../example/HadoopMapredCompatWordCount.java    |  4 +-
 .../mapreduce/HadoopOutputFormat.java           | 42 +-------------------
 .../mapreduce/example/WordCount.java            |  2 +-
 3 files changed, 4 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/04b97c94/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
index 81b1f67..de20fab 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
@@ -92,7 +92,7 @@ public class HadoopMapredCompatWordCount {
 			// normalize and split the line
 			String line = v.toString();
 			String[] tokens = line.toLowerCase().split("\\W+");
-
+			
 			// emit the pairs
 			for (String token : tokens) {
 				if (token.length() > 0) {
@@ -119,8 +119,8 @@ public class HadoopMapredCompatWordCount {
 			while(vs.hasNext()) {
 				cnt += vs.next().get();
 			}
-
 			out.collect(k, new LongWritable(cnt));
+			
 		}
 		
 		@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/04b97c94/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
index 326f8ef..402372c 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
@@ -19,7 +19,6 @@
 
 package org.apache.flink.hadoopcompatibility.mapreduce;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -117,9 +116,7 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable>
implement
 		} catch (Exception e) {
 			throw new RuntimeException(e);
 		}
-
-		System.out.println("HadoopOutputFormat: Write to " + this.configuration.get("mapred" +
-				".output.dir"));
+		
 		this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")),
context);
 		
 		try {
@@ -136,21 +133,6 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable>
implement
 		} catch (InterruptedException e) {
 			throw new IOException("Could not create RecordWriter.", e);
 		}
-
-		File dir = new File(this.configuration.get("mapred.output.dir"));
-
-		if(dir.isDirectory()){
-			File[] files = dir.listFiles();
-			System.out.println(configuration.get("mapred.output.dir") + " contains the " +
-					"following files.");
-			for(File file: files){
-				System.out.println(file.toURI());
-			}
-		}else if(dir.exists()){
-			System.out.println(configuration.get("mapred.output.dir") + " is not a directory.");
-		}else{
-			System.out.println(configuration.get("mapred.output.dir") + " does not yet exists.");
-		}
 	}
 	
 	
@@ -169,7 +151,6 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable>
implement
 	 */
 	@Override
 	public void close() throws IOException {
-		System.out.println("HadoopOutputFormat: Close");
 		try {
 			this.recordWriter.close(this.context);
 		} catch (InterruptedException e) {
@@ -181,22 +162,6 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable>
implement
 		}
 		
 		Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
-
-		File dir = new File(this.configuration.get("mapred.output.dir"));
-
-		if(dir.isDirectory()){
-			File[] files = dir.listFiles();
-			System.out.println(configuration.get("mapred.output.dir") + " contains the " +
-					"following files.");
-			for(File file: files){
-				System.out.println(file.toURI());
-			}
-		}else if(dir.exists()){
-			System.out.println(configuration.get("mapred.output.dir") + " is not a directory.");
-		}else{
-			System.out.println(configuration.get("mapred.output.dir") + " does not yet exists.");
-		}
-
 		
 		// rename tmp-file to final name
 		FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration);
@@ -206,18 +171,13 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable>
implement
 		String tmpFile = tmpFileTemplate.substring(0,11-taskNumberStr.length())+taskNumberStr;
 		
 		if(fs.exists(new Path(outputPath.toString()+"/"+tmpFile))) {
-			System.out.println("Rename file " +  new Path(outputPath.toString()+"/"+tmpFile) + " "
+
-					"to " + new Path(outputPath.toString()+"/"+taskNumberStr));
 			fs.rename(new Path(outputPath.toString()+"/"+tmpFile), new Path(outputPath.toString()+"/"+taskNumberStr));
-		}else{
-			System.out.println("File does not exist?");
 		}
 	}
 	
 	@Override
 	public void finalizeGlobal(int parallelism) throws IOException {
 
-		System.out.println("Finalize HadoopOutputFormat.");
 		JobContext jobContext;
 		TaskAttemptContext taskContext;
 		try {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/04b97c94/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
index 271ee6c..2b99fd2 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
@@ -95,7 +95,7 @@ public class WordCount {
 			// normalize and split the line
 			String line = value.f1.toString();
 			String[] tokens = line.toLowerCase().split("\\W+");
-
+			
 			// emit the pairs
 			for (String token : tokens) {
 				if (token.length() > 0) {


Mime
View raw message