beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/4] beam git commit: End-to-end test for large entity writes.
Date Fri, 19 May 2017 20:22:30 GMT
Repository: beam
Updated Branches:
  refs/heads/master ce00d2466 -> 9a6baefcd


End-to-end test for large entity writes.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dcf40564
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dcf40564
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dcf40564

Branch: refs/heads/master
Commit: dcf405644e2b51303e1d2c12592fe82ee01eb32f
Parents: de95c7f
Author: Colin Phipps <fipsy@google.com>
Authored: Tue May 9 09:40:50 2017 +0000
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Fri May 19 13:11:17 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/datastore/V1ReadIT.java     |  2 +-
 .../beam/sdk/io/gcp/datastore/V1TestUtil.java   | 15 ++++++--
 .../beam/sdk/io/gcp/datastore/V1WriteIT.java    | 36 ++++++++++++++++++--
 3 files changed, 47 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/dcf40564/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
index ec7fa8f..22945f5 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
@@ -148,7 +148,7 @@ public class V1ReadIT {
     Key ancestorKey = makeAncestorKey(options.getNamespace(), options.getKind(), ancestor);
 
     for (long i = 0; i < numEntities; i++) {
-      Entity entity = makeEntity(i, ancestorKey, options.getKind(), options.getNamespace());
+      Entity entity = makeEntity(i, ancestorKey, options.getKind(), options.getNamespace(),
0);
       writer.write(entity);
     }
     writer.close();

http://git-wip-us.apache.org/repos/asf/beam/blob/dcf40564/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
index dc91638..5e618df 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
@@ -92,8 +92,10 @@ class V1TestUtil {
 
   /**
    * Build an entity for the given ancestorKey, kind, namespace and value.
+   * @param largePropertySize if greater than 0, add an unindexed property of the given size.
    */
-  static Entity makeEntity(Long value, Key ancestorKey, String kind, @Nullable String namespace)
{
+  static Entity makeEntity(Long value, Key ancestorKey, String kind, @Nullable String namespace,
+      int largePropertySize) {
     Entity.Builder entityBuilder = Entity.newBuilder();
     Key.Builder keyBuilder = makeKey(ancestorKey, kind, UUID.randomUUID().toString());
     // NOTE: Namespace is not inherited between keys created with DatastoreHelper.makeKey,
so
@@ -105,6 +107,10 @@ class V1TestUtil {
 
     entityBuilder.setKey(keyBuilder.build());
     entityBuilder.putProperties("value", makeValue(value).build());
+    if (largePropertySize > 0) {
+      entityBuilder.putProperties("unindexed_value", makeValue(new String(
+          new char[largePropertySize]).replace("\0", "A")).setExcludeFromIndexes(true).build());
+    }
     return entityBuilder.build();
   }
 
@@ -115,18 +121,21 @@ class V1TestUtil {
     private final String kind;
     @Nullable
     private final String namespace;
+    private final int largePropertySize;
     private Key ancestorKey;
 
-    CreateEntityFn(String kind, @Nullable String namespace, String ancestor) {
+    CreateEntityFn(String kind, @Nullable String namespace, String ancestor,
+        int largePropertySize) {
       this.kind = kind;
       this.namespace = namespace;
+      this.largePropertySize = largePropertySize;
       // Build the ancestor key for all created entities once, including the namespace.
       ancestorKey = makeAncestorKey(namespace, kind, ancestor);
     }
 
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
-      c.output(makeEntity(c.element(), ancestorKey, kind, namespace));
+      c.output(makeEntity(c.element(), ancestorKey, kind, namespace, largePropertySize));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/dcf40564/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
index 82e4d64..4a874fd 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
@@ -67,8 +67,7 @@ public class V1WriteIT {
 
     // Write to datastore
     p.apply(GenerateSequence.from(0).to(numEntities))
-        .apply(ParDo.of(new CreateEntityFn(
-            options.getKind(), options.getNamespace(), ancestor)))
+        .apply(ParDo.of(new CreateEntityFn(options.getKind(), options.getNamespace(), ancestor,
0)))
         .apply(DatastoreIO.v1().write().withProjectId(project));
 
     p.run();
@@ -79,6 +78,39 @@ public class V1WriteIT {
     assertEquals(numEntitiesWritten, numEntities);
   }
 
+  /**
+   * An end-to-end test for {@link DatastoreV1.Write}.
+   *
+   * <p>Write some large test entities to Cloud Datastore, to test that a batch is
flushed when
+   * the byte size limit is reached. Read and count all the entities. Verify that the count
matches
+   * the number of entities written.
+   */
+  @Test
+  public void testE2EV1WriteWithLargeEntities() throws Exception {
+    Pipeline p = Pipeline.create(options);
+
+    /*
+     * Datastore has a limit of 1MB per entity, and 10MB per write RPC. If each entity is
around
+     * 1MB in size, then we hit the limit on the size of the write long before we hit the
limit on
+     * the number of entities per writes.
+     */
+    final int rawPropertySize = 900_000;
+    final int numLargeEntities = 100;
+
+    // Write to datastore
+    p.apply(GenerateSequence.from(0).to(numLargeEntities))
+        .apply(ParDo.of(new CreateEntityFn(
+                options.getKind(), options.getNamespace(), ancestor, rawPropertySize)))
+        .apply(DatastoreIO.v1().write().withProjectId(project));
+
+    p.run();
+
+    // Count number of entities written to datastore.
+    long numEntitiesWritten = countEntities(options, project, ancestor);
+
+    assertEquals(numEntitiesWritten, numLargeEntities);
+  }
+
   @After
   public void tearDown() throws Exception {
     deleteAllEntities(options, project, ancestor);


Mime
View raw message