Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 565EB10BE9 for ; Thu, 18 Dec 2014 18:45:50 +0000 (UTC) Received: (qmail 25741 invoked by uid 500); 18 Dec 2014 18:45:49 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 25554 invoked by uid 500); 18 Dec 2014 18:45:49 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 25338 invoked by uid 99); 18 Dec 2014 18:45:49 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Dec 2014 18:45:49 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 18 Dec 2014 18:45:25 +0000 Received: (qmail 20432 invoked by uid 99); 18 Dec 2014 18:45:00 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Dec 2014 18:45:00 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 839AF957A38; Thu, 18 Dec 2014 18:45:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.incubator.apache.org Date: Thu, 18 Dec 2014 18:46:15 -0000 Message-Id: <18d9ecb7b32841c3985994a09ac470dc@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [79/82] [abbrv] incubator-flink git commit: Exclude netty dependency from hadoop-mapreduce-client-core to resolve dependency conflict X-Virus-Checked: Checked by ClamAV on apache.org Exclude netty dependency from hadoop-mapreduce-client-core to resolve dependency conflict Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b962243b Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b962243b Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b962243b Branch: refs/heads/master Commit: b962243b4c190ad266951855ab1439e2a1b096ad Parents: f5618fa Author: Till Rohrmann Authored: Wed Dec 17 15:22:22 2014 +0100 Committer: Till Rohrmann Committed: Thu Dec 18 18:58:33 2014 +0100 ---------------------------------------------------------------------- .../example/HadoopMapredCompatWordCount.java | 4 +- .../mapred/wrapper/HadoopInputSplit.java | 6 +++ .../mapreduce/HadoopOutputFormat.java | 45 +++++++++++++++++++- .../mapreduce/example/WordCount.java | 2 +- .../src/test/resources/log4j-test.properties | 10 ++++- .../org/apache/flink/yarn/YarnJobManager.scala | 2 +- .../scala/org/apache/flink/yarn/YarnUtils.scala | 5 +-- pom.xml | 11 +++++ 8 files changed, 76 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b962243b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java index de20fab..81b1f67 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java @@ -92,7 +92,7 @@ public class HadoopMapredCompatWordCount { // normalize and split the line String line = v.toString(); String[] tokens = line.toLowerCase().split("\\W+"); - + // emit the pairs for (String token : tokens) { if (token.length() > 0) { @@ -119,8 +119,8 @@ public class HadoopMapredCompatWordCount { while(vs.hasNext()) { cnt += vs.next().get(); } + out.collect(k, new LongWritable(cnt)); - } @Override http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b962243b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java index 3fb66c2..77c40f5 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java @@ -92,6 +92,7 @@ public class HadoopInputSplit implements InputSplit { private void writeObject(ObjectOutputStream out) throws IOException { out.writeInt(splitNumber); out.writeUTF(hadoopInputSplitTypeName); + jobConf.write(out); hadoopInputSplit.write(out); } @@ -110,6 +111,11 @@ public class HadoopInputSplit implements InputSplit { throw new RuntimeException("Unable to create InputSplit", e); } } + jobConf = new JobConf(); + jobConf.readFields(in); + if (this.hadoopInputSplit instanceof Configurable) { + ((Configurable) this.hadoopInputSplit).setConf(this.jobConf); + } this.hadoopInputSplit.readFields(in); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b962243b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java index 402372c..cce7695 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java @@ -19,6 +19,7 @@ package org.apache.flink.hadoopcompatibility.mapreduce; +import java.io.File; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; @@ -116,7 +117,9 @@ public class HadoopOutputFormat implement } catch (Exception e) { throw new RuntimeException(e); } - + + System.out.println("HadoopOutputFormat: Write to " + this.configuration.get("mapred" + + ".output.dir")); this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), context); try { @@ -133,6 +136,22 @@ public class HadoopOutputFormat implement } catch (InterruptedException e) { throw new IOException("Could not create RecordWriter.", e); } + + File dir = new File(this.configuration.get("mapred.output.dir")); + if(dir.exists()){ + if(dir.isDirectory()){ + File[] files = dir.listFiles(); + System.out.println(configuration.get("mapred.output.dir") + " contains the " + + "following files."); + for(File file: files){ + System.out.println(file.toPath()); + } + }else{ + System.out.println(configuration.get("mapred.output.dir") + " is not a directory."); + } + }else{ + System.out.println(configuration.get("mapred.output.dir") + " does not exist yet."); + } } @@ -151,6 +170,7 @@ public class HadoopOutputFormat implement */ @Override public void close() throws IOException { + System.out.println("HadoopOutputFormat: Close"); try { this.recordWriter.close(this.context); } catch (InterruptedException e) { @@ -162,6 +182,25 @@ public class HadoopOutputFormat implement } Path outputPath = new Path(this.configuration.get("mapred.output.dir")); + + File dir = new File(this.configuration.get("mapred.output.dir")); + if(dir.exists()){ + if(dir.isDirectory()){ + File[] files = dir.listFiles(); + System.out.println("Close: " +configuration.get("mapred.output.dir") + " contains" + + " the " + + "following files."); + for(File file: files){ + System.out.println(file.toPath()); + } + }else{ + System.out.println("Close: " +configuration.get("mapred.output.dir") + " is not a" + + " directory."); + } + }else{ + System.out.println("Close: " +configuration.get("mapred.output.dir") + " does not " + + "exist yet))."); + } // rename tmp-file to final name FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration); @@ -171,7 +210,11 @@ public class HadoopOutputFormat implement String tmpFile = tmpFileTemplate.substring(0,11-taskNumberStr.length())+taskNumberStr; if(fs.exists(new Path(outputPath.toString()+"/"+tmpFile))) { + System.out.println("Rename file " + new Path(outputPath.toString()+"/"+tmpFile) + " " + + "to " + new Path(outputPath.toString()+"/"+taskNumberStr)); fs.rename(new Path(outputPath.toString()+"/"+tmpFile), new Path(outputPath.toString()+"/"+taskNumberStr)); + }else{ + System.out.println("File does not exist?"); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b962243b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java index 2b99fd2..271ee6c 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java @@ -95,7 +95,7 @@ public class WordCount { // normalize and split the line String line = value.f1.toString(); String[] tokens = line.toLowerCase().split("\\W+"); - + // emit the pairs for (String token : tokens) { if (token.length() > 0) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b962243b/flink-addons/flink-hadoop-compatibility/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/test/resources/log4j-test.properties b/flink-addons/flink-hadoop-compatibility/src/test/resources/log4j-test.properties index 2fb9345..0b686e5 100644 --- a/flink-addons/flink-hadoop-compatibility/src/test/resources/log4j-test.properties +++ b/flink-addons/flink-hadoop-compatibility/src/test/resources/log4j-test.properties @@ -16,4 +16,12 @@ # limitations under the License. ################################################################################ -log4j.rootLogger=OFF \ No newline at end of file +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=OFF, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b962243b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index af08f7b..aa5eb13 100644 --- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -22,7 +22,7 @@ import java.io.{IOException, File} import java.nio.ByteBuffer import java.util.{ Collections} -import akka.actor.{PoisonPill, ActorRef} +import akka.actor.{ActorRef} import org.apache.flink.configuration.ConfigConstants import org.apache.flink.runtime.ActorLogMessages import org.apache.flink.runtime.jobmanager.JobManager http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b962243b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala index 86b06e1..245651d 100644 --- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala +++ b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala @@ -42,8 +42,8 @@ object YarnUtils { def getConfigString: String = { """ |akka{ - | loglevel = "INFO" - | stdout-loglevel = "INFO" + | loglevel = "DEBUG" + | stdout-loglevel = "DEBUG" | log-dead-letters-during-shutdown = off | log-dead-letters = off | @@ -56,7 +56,6 @@ object YarnUtils { | | netty{ | tcp{ - | port = 0 | transport-class = "akka.remote.transport.netty.NettyTransport" | tcp-nodelay = on | maximum-frame-size = 1MB http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b962243b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 2a252f8..a166609 100644 --- a/pom.xml +++ b/pom.xml @@ -394,6 +394,17 @@ under the License. + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + org.jboss.netty + netty + + +