beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] beam git commit: Adds large key tests to GroupByKeyTest
Date Wed, 24 May 2017 19:47:04 GMT
Repository: beam
Updated Branches:
  refs/heads/master 6dd5585b3 -> 6418bcfcb


Adds large key tests to GroupByKeyTest


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6a792f32
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6a792f32
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6a792f32

Branch: refs/heads/master
Commit: 6a792f3251904f191ff27e58bb9bb78b9c30fdd9
Parents: 6dd5585
Author: Daniel Mills <millsd@google.com>
Authored: Mon May 8 16:45:44 2017 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Wed May 24 12:46:07 2017 -0700

----------------------------------------------------------------------
 runners/direct-java/pom.xml                     |  4 +-
 runners/flink/pom.xml                           |  2 +
 runners/google-cloud-dataflow-java/pom.xml      |  1 +
 .../org/apache/beam/sdk/testing/LargeKeys.java  | 48 +++++++++++++
 .../beam/sdk/transforms/GroupByKeyTest.java     | 74 ++++++++++++++++++++
 5 files changed, 128 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6a792f32/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 857dcf4..bec2113 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -144,11 +144,13 @@
             </goals>
             <configuration>
               <groups>org.apache.beam.sdk.testing.NeedsRunner</groups>
+              <!-- 100MB keys work on the direct runner, but make the test too slow. -->

+              <excludedGroups>org.apache.beam.sdk.testing.LargeKeys$Above100MB</excludedGroups>
               <parallel>none</parallel>
               <failIfNoTests>true</failIfNoTests>
               <dependenciesToScan>
                 <dependency>org.apache.beam:beam-sdks-java-core</dependency>
-                <dependency>org.apache.beam:beam-runners-core-java</dependency>
+                <dependency>org.apache.beam:beam-runners-java-core</dependency>
               </dependenciesToScan>
               <systemPropertyVariables>
                 <beamTestPipelineOptions>

http://git-wip-us.apache.org/repos/asf/beam/blob/6a792f32/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index ff73ec1..fb0a67c 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -57,6 +57,7 @@
                   <groups>org.apache.beam.sdk.testing.ValidatesRunner</groups>
                   <excludedGroups>
                     org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
+                    org.apache.beam.sdk.testing.LargeKeys$Above100MB,
                     org.apache.beam.sdk.testing.UsesSplittableParDo,
                     org.apache.beam.sdk.testing.UsesCommittedMetrics,
                     org.apache.beam.sdk.testing.UsesTestStream
@@ -89,6 +90,7 @@
                   <groups>org.apache.beam.sdk.testing.ValidatesRunner</groups>
                   <excludedGroups>
                     org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
+                    org.apache.beam.sdk.testing.LargeKeys$Above100MB,
                     org.apache.beam.sdk.testing.UsesSetState,
                     org.apache.beam.sdk.testing.UsesMapState,
                     org.apache.beam.sdk.testing.UsesCommittedMetrics,

http://git-wip-us.apache.org/repos/asf/beam/blob/6a792f32/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 895a8e6..16e1266 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -217,6 +217,7 @@
             <id>validates-runner-tests</id>
             <configuration>
               <excludedGroups>
+                org.apache.beam.sdk.testing.LargeKeys$Above10MB,
                 org.apache.beam.sdk.testing.UsesDistributionMetrics,
                 org.apache.beam.sdk.testing.UsesGaugeMetrics,
                 org.apache.beam.sdk.testing.UsesSetState,

http://git-wip-us.apache.org/repos/asf/beam/blob/6a792f32/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/LargeKeys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/LargeKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/LargeKeys.java
new file mode 100644
index 0000000..384b298
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/LargeKeys.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+/**
+ * Category tags for tests which validate that a Beam runner can handle keys up to a given
size.
+ */
+public interface LargeKeys {
+  /**
+   * Tests if a runner supports 10KB keys.
+   */
+  public interface Above10KB {}
+
+  /**
+   * Tests if a runner supports 100KB keys.
+   */
+  public interface Above100KB extends Above10KB {}
+
+  /**
+   * Tests if a runner supports 1MB keys.
+   */
+  public interface Above1MB extends Above100KB {}
+
+  /**
+   * Tests if a runner supports 10MB keys.
+   */
+  public interface Above10MB extends Above1MB {}
+
+  /**
+   * Tests if a runner supports 100MB keys.
+   */
+  public interface Above100MB extends Above10MB {}
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6a792f32/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 0cd885c..171171f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -45,6 +45,7 @@ import org.apache.beam.sdk.coders.CoderProviders;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.LargeKeys;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -427,6 +428,79 @@ public class GroupByKeyTest {
     p.run();
   }
 
+  private static String bigString(char c, int size) {
+    char[] buf = new char[size];
+    for (int i = 0; i < size; i++) {
+      buf[i] = c;
+    }
+    return new String(buf);
+  }
+
+  private static void runLargeKeysTest(TestPipeline p, final int keySize) throws Exception
{
+    PCollection<KV<String, Integer>> result = p
+        .apply(Create.of("a", "a", "b"))
+        .apply("Expand", ParDo.of(new DoFn<String, KV<String, String>>() {
+              @ProcessElement
+              public void process(ProcessContext c) {
+                c.output(KV.of(bigString(c.element().charAt(0), keySize), c.element()));
+              }
+          }))
+        .apply(GroupByKey.<String, String>create())
+        .apply("Count", ParDo.of(new DoFn<KV<String, Iterable<String>>, KV<String,
Integer>>() {
+              @ProcessElement
+              public void process(ProcessContext c) {
+                int size = 0;
+                for (String value : c.element().getValue()) {
+                  size++;
+                }
+                c.output(KV.of(c.element().getKey(), size));
+              }
+          }));
+
+    PAssert.that(result).satisfies(
+        new SerializableFunction<Iterable<KV<String, Integer>>, Void>()
{
+          @Override
+          public Void apply(Iterable<KV<String, Integer>> values) {
+            assertThat(values,
+                containsInAnyOrder(
+                    KV.of(bigString('a', keySize), 2), KV.of(bigString('b', keySize), 1)));
+            return null;
+          }
+        });
+
+    p.run();
+  }
+
+  @Test
+  @Category({ValidatesRunner.class, LargeKeys.Above10KB.class})
+  public void testLargeKeys10KB() throws Exception {
+    runLargeKeysTest(p, 10 << 10);
+  }
+
+  @Test
+  @Category({ValidatesRunner.class, LargeKeys.Above100KB.class})
+  public void testLargeKeys100KB() throws Exception {
+    runLargeKeysTest(p, 100 << 10);
+  }
+
+  @Test
+  @Category({ValidatesRunner.class, LargeKeys.Above1MB.class})
+  public void testLargeKeys1MB() throws Exception {
+    runLargeKeysTest(p, 1 << 20);
+  }
+
+  @Test
+  @Category({ValidatesRunner.class, LargeKeys.Above10MB.class})
+  public void testLargeKeys10MB() throws Exception {
+    runLargeKeysTest(p, 10 << 20);
+  }
+
+  @Test
+  @Category({ValidatesRunner.class, LargeKeys.Above100MB.class})
+  public void testLargeKeys100MB() throws Exception {
+    runLargeKeysTest(p, 100 << 20);
+  }
+
   /**
    * This is a bogus key class that returns random hash values from {@link #hashCode()} and
always
    * returns {@code false} for {@link #equals(Object)}. The results of the test are correct
if


Mime
View raw message