Return-Path: X-Original-To: apmail-spark-reviews-archive@minotaur.apache.org Delivered-To: apmail-spark-reviews-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0160F174D7 for ; Thu, 6 Nov 2014 07:36:30 +0000 (UTC) Received: (qmail 97025 invoked by uid 500); 6 Nov 2014 07:36:29 -0000 Delivered-To: apmail-spark-reviews-archive@spark.apache.org Received: (qmail 97003 invoked by uid 500); 6 Nov 2014 07:36:29 -0000 Mailing-List: contact reviews-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list reviews@spark.apache.org Received: (qmail 96992 invoked by uid 99); 6 Nov 2014 07:36:29 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Nov 2014 07:36:29 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 19DA1930B76; Thu, 6 Nov 2014 07:36:29 +0000 (UTC) From: kayousterhout To: reviews@spark.apache.org Reply-To: reviews@spark.apache.org References: In-Reply-To: Subject: [GitHub] spark pull request: SPARK-3179. Add task OutputMetrics. Content-Type: text/plain Message-Id: <20141106073629.19DA1930B76@tyr.zones.apache.org> Date: Thu, 6 Nov 2014 07:36:29 +0000 (UTC) Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/2968#discussion_r19930092 --- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala --- @@ -961,30 +962,52 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => { + val config = wrappedConf.value // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it // around by taking a mod. We expect that no task will be attempted 2 billion times. val attemptNumber = (context.attemptId % Int.MaxValue).toInt /* "reduce task" */ val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId, attemptNumber) - val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId) + val hadoopContext = newTaskAttemptContext(config, attemptId) val format = outfmt.newInstance format match { - case c: Configurable => c.setConf(wrappedConf.value) + case c: Configurable => c.setConf(config) case _ => () } val committer = format.getOutputCommitter(hadoopContext) committer.setupTask(hadoopContext) + + val bytesWrittenCallback = Option(config.get("mapreduce.output.fileoutputformat.outputdir")) + .map(new Path(_)) + .flatMap(SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(_, config)) + val outputMetrics = new OutputMetrics() + if (bytesWrittenCallback.isDefined) { + context.taskMetrics.outputMetrics = Some(outputMetrics) + } + --- End diff -- Can you move this code and (separately) the code in lines 995 through 1004 into methods that can be called here and also by saveAsHadoopDataset to avoid the code duplication? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org For additional commands, e-mail: reviews-help@spark.apache.org