beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chamik...@apache.org
Subject [1/2] beam git commit: Option to disable validation of BigtableIO.write target table
Date Thu, 28 Sep 2017 21:38:37 GMT
Repository: beam
Updated Branches:
  refs/heads/master 34360537f -> 0724f1c5f


Option to disable validation of BigtableIO.write target table


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9cea17eb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9cea17eb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9cea17eb

Branch: refs/heads/master
Commit: 9cea17ebd00feb5280efc6480643f7b55a4ec273
Parents: 3436053
Author: steve <sniemitz@twitter.com>
Authored: Mon Sep 25 14:21:54 2017 -0400
Committer: chamikara@google.com <chamikara@google.com>
Committed: Thu Sep 28 14:37:52 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    | 60 ++++++++++++++------
 .../sdk/io/gcp/bigtable/BigtableIOTest.java     | 26 +++++++++
 2 files changed, 70 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9cea17eb/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index fd15240..252f6c5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -190,7 +190,10 @@ public class BigtableIO {
    */
   @Experimental
   public static Read read() {
-    return new AutoValue_BigtableIO_Read.Builder().setKeyRange(ByteKeyRange.ALL_KEYS).setTableId("")
+    return new AutoValue_BigtableIO_Read.Builder()
+        .setKeyRange(ByteKeyRange.ALL_KEYS)
+        .setTableId("")
+        .setValidate(true)
         .build();
   }
 
@@ -203,7 +206,10 @@ public class BigtableIO {
    */
   @Experimental
   public static Write write() {
-    return new AutoValue_BigtableIO_Write.Builder().setTableId("").build();
+    return new AutoValue_BigtableIO_Write.Builder()
+        .setTableId("")
+        .setValidate(true)
+        .build();
   }
 
   /**
@@ -234,6 +240,8 @@ public class BigtableIO {
     @Nullable
     public abstract BigtableOptions getBigtableOptions();
 
+    public abstract boolean getValidate();
+
     abstract Builder toBuilder();
 
     @AutoValue.Builder
@@ -249,6 +257,8 @@ public class BigtableIO {
 
       abstract Builder setBigtableService(BigtableService bigtableService);
 
+      abstract Builder setValidate(boolean validate);
+
       abstract Read build();
     }
 
@@ -316,6 +326,11 @@ public class BigtableIO {
       return toBuilder().setTableId(tableId).build();
     }
 
+    /** Disables validation that the table being read from exists. */
+    public Read withoutValidation() {
+      return toBuilder().setValidate(false).build();
+    }
+
     @Override
     public PCollection<Row> expand(PBegin input) {
       checkArgument(getBigtableOptions() != null, "withBigtableOptions() is required");
@@ -332,13 +347,15 @@ public class BigtableIO {
 
     @Override
     public void validate(PipelineOptions options) {
-      try {
-        checkArgument(
-            getBigtableService(options).tableExists(getTableId()),
-            "Table %s does not exist",
-            getTableId());
-      } catch (IOException e) {
-        LOG.warn("Error checking whether table {} exists; proceeding.", getTableId(), e);
+      if (getValidate()) {
+        try {
+          checkArgument(
+              getBigtableService(options).tableExists(getTableId()),
+              "Table %s does not exist",
+              getTableId());
+        } catch (IOException e) {
+          LOG.warn("Error checking whether table {} exists; proceeding.", getTableId(), e);
+        }
       }
     }
 
@@ -432,6 +449,8 @@ public class BigtableIO {
     @Nullable
     public abstract BigtableOptions getBigtableOptions();
 
+    abstract boolean getValidate();
+
     abstract Builder toBuilder();
 
     @AutoValue.Builder
@@ -443,6 +462,8 @@ public class BigtableIO {
 
       abstract Builder setBigtableService(BigtableService bigtableService);
 
+      abstract Builder setValidate(boolean validate);
+
       abstract Write build();
     }
 
@@ -482,6 +503,11 @@ public class BigtableIO {
       return toBuilder().setBigtableOptions(optionsWithAgent).build();
     }
 
+    /** Disables validation that the table being written to exists. */
+    public Write withoutValidation() {
+      return toBuilder().setValidate(false).build();
+    }
+
     /**
      * Returns a new {@link BigtableIO.Write} that will write to the specified table.
      *
@@ -509,13 +535,15 @@ public class BigtableIO {
 
     @Override
     public void validate(PipelineOptions options) {
-      try {
-        checkArgument(
-            getBigtableService(options).tableExists(getTableId()),
-            "Table %s does not exist",
-            getTableId());
-      } catch (IOException e) {
-        LOG.warn("Error checking whether table {} exists; proceeding.", getTableId(), e);
+      if (getValidate()) {
+        try {
+          checkArgument(
+              getBigtableService(options).tableExists(getTableId()),
+              "Table %s does not exist",
+              getTableId());
+        } catch (IOException e) {
+          LOG.warn("Error checking whether table {} exists; proceeding.", getTableId(), e);
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9cea17eb/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 58370f7..af3354b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -589,6 +589,32 @@ public class BigtableIOTest {
         + "display data", displayData, Matchers.hasItem(hasDisplayItem("rowFilter")));
   }
 
+  @Test
+  public void testReadWithoutValidate() {
+    final String table = "fooTable";
+    BigtableIO.Read read = BigtableIO.read()
+        .withBigtableOptions(BIGTABLE_OPTIONS)
+        .withTableId(table)
+        .withBigtableService(service)
+        .withoutValidation();
+
+    // validate() will throw if withoutValidation() isn't working
+    read.validate(TestPipeline.testingPipelineOptions());
+  }
+
+  @Test
+  public void testWriteWithoutValidate() {
+    final String table = "fooTable";
+    BigtableIO.Write write = BigtableIO.write()
+        .withBigtableOptions(BIGTABLE_OPTIONS)
+        .withTableId(table)
+        .withBigtableService(service)
+        .withoutValidation();
+
+    // validate() will throw if withoutValidation() isn't working
+    write.validate(TestPipeline.testingPipelineOptions());
+  }
+
   /** Tests that a record gets written to the service and messages are logged. */
   @Test
   public void testWriting() throws Exception {


Mime
View raw message