Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BD83D118D8 for ; Fri, 15 Aug 2014 11:09:08 +0000 (UTC) Received: (qmail 29956 invoked by uid 500); 15 Aug 2014 11:09:08 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 29937 invoked by uid 500); 15 Aug 2014 11:09:08 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 29926 invoked by uid 99); 15 Aug 2014 11:09:08 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Aug 2014 11:09:08 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 15 Aug 2014 11:08:45 +0000 Received: (qmail 29809 invoked by uid 99); 15 Aug 2014 11:08:43 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Aug 2014 11:08:43 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 56D7F9C4D94; Fri, 15 Aug 2014 11:08:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.incubator.apache.org Date: Fri, 15 Aug 2014 11:08:43 -0000 Message-Id: <462cdc96ce0c4537b90d9e51267bf2ad@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] git commit: Small bug fixes for running hadoop output formats X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-flink Updated Branches: refs/heads/master a87559aee -> 653216ba8 Small bug fixes for running hadoop output formats This closes #75 Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/653216ba Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/653216ba Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/653216ba Branch: refs/heads/master Commit: 653216ba83910ead811cfddf525b0448325882a3 Parents: d0cead7 Author: twalthr Authored: Mon Jul 21 12:11:16 2014 +0200 Committer: Stephan Ewen Committed: Thu Aug 14 22:43:26 2014 +0200 ---------------------------------------------------------------------- .../mapred/HadoopOutputFormat.java | 10 ++++++---- .../mapreduce/HadoopOutputFormat.java | 16 ++++++++++------ 2 files changed, 16 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/653216ba/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java index deae026..69a0b50 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java @@ -100,16 +100,18 @@ public class HadoopOutputFormat implement + Integer.toString(taskNumber + 1) + "_0"); + this.jobConf.set("mapred.task.id", taskAttemptID.toString()); + this.jobConf.setInt("mapred.task.partition", taskNumber + 1); + // for hadoop 2.2 + this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString()); + this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1); + try { this.context = HadoopUtils.instantiateTaskAttemptContext(this.jobConf, taskAttemptID); } catch (Exception e) { throw new RuntimeException(e); } - this.jobConf.set("mapred.task.id", taskAttemptID.toString()); - // for hadoop 2.2 - this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString()); - this.fileOutputCommitter = new FileOutputCommitter(); try { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/653216ba/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java index 9eabc03..f071eda 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java @@ -103,14 +103,17 @@ public class HadoopOutputFormat implement + Integer.toString(taskNumber + 1) + "_0"); + this.configuration.set("mapred.task.id", taskAttemptID.toString()); + this.configuration.setInt("mapred.task.partition", taskNumber + 1); + // for hadoop 2.2 + this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString()); + this.configuration.setInt("mapreduce.task.partition", taskNumber + 1); + try { this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID); } catch (Exception e) { throw new RuntimeException(e); } - this.configuration.set("mapred.task.id", taskAttemptID.toString()); - // for hadoop 2.2 - this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString()); this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), context); @@ -157,11 +160,12 @@ public class HadoopOutputFormat implement } this.fileOutputCommitter.commitJob(this.context); - // rename tmp-* files to final name - FileSystem fs = FileSystem.get(this.configuration); Path outputPath = new Path(this.configuration.get("mapred.output.dir")); - + + // rename tmp-* files to final name + FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration); + final Pattern p = Pattern.compile("tmp-(.)-([0-9]+)"); // isDirectory does not work in hadoop 1