Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3E5CA200C52 for ; Sun, 26 Mar 2017 10:24:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3D2D9160B87; Sun, 26 Mar 2017 08:24:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5AD74160B6B for ; Sun, 26 Mar 2017 10:24:12 +0200 (CEST) Received: (qmail 91934 invoked by uid 500); 26 Mar 2017 08:24:11 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 91921 invoked by uid 99); 26 Mar 2017 08:24:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 26 Mar 2017 08:24:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 55A09DFFD8; Sun, 26 Mar 2017 08:24:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aviemzur@apache.org To: commits@beam.apache.org Date: Sun, 26 Mar 2017 08:24:11 -0000 Message-Id: <82c47856eabc40208595b13987b9a02c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: [BEAM-1810] Replace usage of RDD#isEmpty on non-serialized RDDs archived-at: Sun, 26 Mar 2017 08:24:13 -0000 Repository: beam Updated Branches: refs/heads/master 348d33588 -> c9e55a436 [BEAM-1810] Replace usage of RDD#isEmpty on non-serialized RDDs Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b32f0482 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b32f0482 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b32f0482 Branch: refs/heads/master Commit: b32f0482784b9df7ce67226b32febe6e664a45b6 Parents: 348d335 Author: Aviem Zur Authored: Sat Mar 25 21:49:06 2017 +0300 Committer: Aviem Zur Committed: Sun Mar 26 10:31:40 2017 +0300 ---------------------------------------------------------------------- .../translation/GroupCombineFunctions.java | 15 ++++++----- .../spark/translation/TransformTranslator.java | 26 ++++++++++++-------- 2 files changed, 25 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b32f0482/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java index b2a589d..917a9ee 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java @@ -18,8 +18,7 @@ package org.apache.beam.runners.spark.translation; -import static com.google.common.base.Preconditions.checkArgument; - +import com.google.common.base.Optional; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.util.ByteArray; import org.apache.beam.sdk.coders.Coder; @@ -67,14 +66,12 @@ public class GroupCombineFunctions { /** * Apply a composite {@link org.apache.beam.sdk.transforms.Combine.Globally} transformation. */ - public static Iterable> combineGlobally( + public static Optional>> combineGlobally( JavaRDD> rdd, final SparkGlobalCombineFn sparkCombineFn, final Coder iCoder, final Coder aCoder, final WindowingStrategy windowingStrategy) { - checkArgument(!rdd.isEmpty(), "CombineGlobally computation should be skipped for empty RDDs."); - // coders. final WindowedValue.FullWindowedValueCoder wviCoder = WindowedValue.FullWindowedValueCoder.of(iCoder, @@ -93,6 +90,11 @@ public class GroupCombineFunctions { //---- AccumT: A //---- InputT: I JavaRDD inputRDDBytes = rdd.map(CoderHelpers.toByteFunction(wviCoder)); + + if (inputRDDBytes.isEmpty()) { + return Optional.absent(); + } + /*Itr>*/ byte[] accumulatedBytes = inputRDDBytes.aggregate( CoderHelpers.toByteArray(sparkCombineFn.zeroValue(), iterAccumCoder), new Function2() { @@ -115,7 +117,8 @@ public class GroupCombineFunctions { } } ); - return CoderHelpers.fromByteArray(accumulatedBytes, iterAccumCoder); + + return Optional.of(CoderHelpers.fromByteArray(accumulatedBytes, iterAccumCoder)); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/b32f0482/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index b4362b0..ffb207a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -27,6 +27,7 @@ import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceSh import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectSplittable; import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectStateAndTimers; +import com.google.common.base.Optional; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -259,9 +260,20 @@ public final class TransformTranslator { ((BoundedDataset) context.borrowDataset(transform)).getRDD(); JavaRDD> outRdd; - // handle empty input RDD, which will naturally skip the entire execution - // as Spark will not run on empty RDDs. - if (inRdd.isEmpty()) { + + Optional>> maybeAccumulated = + GroupCombineFunctions.combineGlobally(inRdd, sparkCombineFn, iCoder, aCoder, + windowingStrategy); + + if (maybeAccumulated.isPresent()) { + Iterable> output = + sparkCombineFn.extractOutput(maybeAccumulated.get()); + outRdd = context.getSparkContext() + .parallelize(CoderHelpers.toByteArrays(output, wvoCoder)) + .map(CoderHelpers.fromByteFunction(wvoCoder)); + } else { + // handle empty input RDD, which will naturally skip the entire execution + // as Spark will not run on empty RDDs. JavaSparkContext jsc = new JavaSparkContext(inRdd.context()); if (hasDefault) { OutputT defaultValue = combineFn.defaultValue(); @@ -272,14 +284,8 @@ public final class TransformTranslator { } else { outRdd = jsc.emptyRDD(); } - } else { - Iterable> accumulated = GroupCombineFunctions.combineGlobally( - inRdd, sparkCombineFn, iCoder, aCoder, windowingStrategy); - Iterable> output = sparkCombineFn.extractOutput(accumulated); - outRdd = context.getSparkContext() - .parallelize(CoderHelpers.toByteArrays(output, wvoCoder)) - .map(CoderHelpers.fromByteFunction(wvoCoder)); } + context.putDataset(transform, new BoundedDataset<>(outRdd)); }