crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject crunch git commit: CRUNCH-601: Handle empty PCollections correctly in Crunch-on-Spark. Created by Micah Whitacre, Mikael Goldmann, and Josh Wills.
Date Fri, 26 Aug 2016 05:48:03 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 3fff74e2e -> f1d074c2a


CRUNCH-601: Handle empty PCollections correctly in Crunch-on-Spark. Created by Micah Whitacre,
Mikael Goldmann, and Josh Wills.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f1d074c2
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f1d074c2
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f1d074c2

Branch: refs/heads/master
Commit: f1d074c2a7dcaf44b03dab5b84e9d323f586fdac
Parents: 3fff74e
Author: Josh Wills <jwills@apache.org>
Authored: Wed Aug 24 10:59:14 2016 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Thu Aug 25 22:47:02 2016 -0700

----------------------------------------------------------------------
 .../impl/dist/collect/BaseDoCollection.java     |  6 +-
 .../crunch/impl/dist/collect/BaseDoTable.java   |  6 +-
 .../java/org/apache/crunch/lib/Aggregate.java   |  2 +-
 .../pobject/FirstElementPObject.java            | 15 +++-
 .../crunch/SmallCollectionLengthTest.java       | 80 ++++++++++++++++++++
 5 files changed, 105 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/f1d074c2/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoCollection.java
b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoCollection.java
index bb1d054..a43967e 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoCollection.java
@@ -46,7 +46,11 @@ public class BaseDoCollection<S> extends PCollectionImpl<S>
{
 
   @Override
   protected long getSizeInternal() {
-    return (long) (fn.scaleFactor() * parent.getSize());
+    long parentSize = parent.getSize();
+    if (parentSize == 0L) {
+      return parentSize;
+    }
+    return Math.max(1L, (long) (fn.scaleFactor() * parentSize));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/f1d074c2/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoTable.java
b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoTable.java
index 4c5569e..4b64ae8 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoTable.java
@@ -74,7 +74,11 @@ public class BaseDoTable<K, V> extends PTableBase<K, V> implements
PTable<K, V>
 
   @Override
   protected long getSizeInternal() {
-    return (long) (fn.scaleFactor() * parent.getSize());
+    long parentSize = parent.getSize();
+    if (parentSize == 0L) {
+      return parentSize;
+    }
+    return Math.max(1L, (long) (fn.scaleFactor() * parentSize));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/f1d074c2/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
index dd4e1db..9f71458 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
@@ -92,7 +92,7 @@ public class Aggregate {
         .groupByKey(GroupingOptions.builder().numReducers(1).build())
         .combineValues(Aggregators.SUM_LONGS());
     PCollection<Long> count = countTable.values();
-    return new FirstElementPObject<Long>(count);
+    return new FirstElementPObject<Long>(count, 0L);
   }
 
   public static class PairValueComparator<K, V> implements Comparator<Pair<K,
V>> {

http://git-wip-us.apache.org/repos/asf/crunch/blob/f1d074c2/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java
b/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java
index aa5fd9e..7f25720 100644
--- a/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java
+++ b/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java
@@ -29,13 +29,26 @@ import org.apache.crunch.PCollection;
  */
 public class FirstElementPObject<T> extends PObjectImpl<T, T> {
 
+  private T defaultValue;
+
   /**
    * Constructs a new instance of this {@code PObject} implementation.
    *
    * @param collect The backing {@code PCollection} for this {@code PObject}.
    */
   public FirstElementPObject(PCollection<T> collect) {
+    this(collect, null);
+  }
+
+  /**
+   * Constructs a new instance of this {@code PObject} implementation.
+   *
+   * @param collect The backing {@code PCollection} for this {@code PObject}.
+   * @param defaultValue The value to return if the backing PCollection is empty.
+   */
+  public FirstElementPObject(PCollection<T> collect, T defaultValue) {
     super(collect);
+    this.defaultValue = defaultValue;
   }
 
   /** {@inheritDoc} */
@@ -45,6 +58,6 @@ public class FirstElementPObject<T> extends PObjectImpl<T, T>
{
     if (itr.hasNext()) {
       return itr.next();
     }
-    return null;
+    return defaultValue;
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/f1d074c2/crunch-spark/src/it/java/org/apache/crunch/SmallCollectionLengthTest.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SmallCollectionLengthTest.java b/crunch-spark/src/it/java/org/apache/crunch/SmallCollectionLengthTest.java
new file mode 100644
index 0000000..2cc800b
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/SmallCollectionLengthTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.crunch.impl.spark;
+
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PObject;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+import javax.annotation.Nullable;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+
+public class SmallCollectionLengthTest implements Serializable {
+
+
+  @Test
+  public void smallCollectionsShouldNotHaveNullLength() throws Exception {
+    Pipeline p = new SparkPipeline("local", "foobar");
+    final ImmutableList<String>
+        allFruits =
+        ImmutableList.of("apelsin", "banan", "citron", "daddel");
+    final ArrayList<ImmutableList<String>> fruitLists = new ArrayList<>();
+    for (int i = 0; i <= allFruits.size(); ++i) {
+      fruitLists.add(ImmutableList.copyOf(allFruits.subList(0, i)));
+    }
+
+    final ArrayList<PObject<Long>> results = new ArrayList<>();
+    for (ImmutableList<String> fruit : fruitLists) {
+      final PCollection<String> collection = p.create(fruit, Avros.strings());
+      results.add(collection.length());
+    }
+
+    p.run();
+
+    final Iterable<Long>
+        lengths =
+        Iterables.transform(results, new Function<PObject<Long>, Long>() {
+          @Nullable
+          @Override
+          public Long apply(@Nullable PObject<Long> input) {
+            return input.getValue();
+          }
+        });
+
+    for (Long length : lengths) {
+      assertThat(length, not(nullValue()));
+    }
+
+    p.done();
+  }
+
+}


Mime
View raw message