beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [2/2] incubator-beam git commit: Fix CreateSource#getBytesPerOffset
Date Fri, 22 Apr 2016 18:39:11 GMT
Fix CreateSource#getBytesPerOffset

Return 1L when there are no elements, as the default value. Return 1L
when the total encoded size of all the elements is 0, for example when
using VoidCoder.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6bd4f4e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6bd4f4e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6bd4f4e5

Branch: refs/heads/master
Commit: 6bd4f4e5eb7006d1016e1cb4bc8b018d6e2ae820
Parents: 1d337b7
Author: Thomas Groh <tgroh@google.com>
Authored: Fri Apr 22 10:46:50 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Fri Apr 22 11:38:56 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/Create.java  |  4 ++--
 .../apache/beam/sdk/transforms/CreateTest.java  | 20 ++++++++++++++++++++
 2 files changed, 22 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6bd4f4e5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index 1bd4fb3..89e9985 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -400,9 +400,9 @@ public class Create<T> {
       @Override
       public long getBytesPerOffset() {
         if (allElementsBytes.size() == 0) {
-          return 0L;
+          return 1L;
         }
-        return totalSize / allElementsBytes.size();
+        return Math.max(1, totalSize / allElementsBytes.size());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6bd4f4e5/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 2998489..e491fea 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -51,6 +51,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
@@ -346,6 +347,25 @@ public class CreateTest {
   }
 
   @Test
+  public void testSourceSplitIntoBundlesVoid() throws Exception {
+    CreateSource<Void> source =
+        CreateSource.fromIterable(
+            Lists.<Void>newArrayList(null, null, null, null, null), VoidCoder.of());
+    PipelineOptions options = PipelineOptionsFactory.create();
+    List<? extends BoundedSource<Void>> splitSources = source.splitIntoBundles(3,
options);
+    SourceTestUtils.assertSourcesEqualReferenceSource(source, splitSources, options);
+  }
+
+  @Test
+  public void testSourceSplitIntoBundlesEmpty() throws Exception {
+    CreateSource<Integer> source =
+        CreateSource.fromIterable(ImmutableList.<Integer>of(), BigEndianIntegerCoder.of());
+    PipelineOptions options = PipelineOptionsFactory.create();
+    List<? extends BoundedSource<Integer>> splitSources = source.splitIntoBundles(12,
options);
+    SourceTestUtils.assertSourcesEqualReferenceSource(source, splitSources, options);
+  }
+
+  @Test
   public void testSourceDoesNotProduceSortedKeys() throws Exception {
     CreateSource<String> source =
         CreateSource.fromIterable(ImmutableList.of("spam", "ham", "eggs"), StringUtf8Coder.of());


Mime
View raw message