beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] incubator-beam git commit: [BEAM-50] BigQueryIO: fix autocompleting project and test it
Date Tue, 19 Apr 2016 01:50:59 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 5f20e5361 -> bf78e9667


[BEAM-50] BigQueryIO: fix autocompleting project and test it


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e3c876c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e3c876c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e3c876c2

Branch: refs/heads/master
Commit: e3c876c2616fe8fc5c549973e8444a6c55f12e77
Parents: 5f20e53
Author: Dan Halperin <dhalperi@google.com>
Authored: Mon Apr 18 16:10:54 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Mon Apr 18 17:44:04 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/BigQueryIO.java | 91 +++++++++++---------
 .../org/apache/beam/sdk/io/BigQueryIOTest.java  | 40 ++++++++-
 2 files changed, 89 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c876c2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index aff1e4c..d9debbd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
@@ -526,7 +527,7 @@ public class BigQueryIO {
       }
 
       /**
-       * Returns the table to write, or {@code null} if reading from a query instead.
+       * Returns the table to read, or {@code null} if reading from a query instead.
        */
       public TableReference getTable() {
         return table;
@@ -931,37 +932,34 @@ public class BigQueryIO {
       public void validate(PCollection<TableRow> input) {
         BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
 
-        TableReference table = getTable();
-        if (table == null && tableRefFunction == null) {
-          throw new IllegalStateException(
-              "must set the table reference of a BigQueryIO.Write transform");
-        }
-        if (table != null && tableRefFunction != null) {
-          throw new IllegalStateException(
-              "Cannot set both a table reference and a table function for a BigQueryIO.Write
"
-                + "transform");
-        }
+        // Exactly one of the table and table reference can be configured.
+        checkState(
+            jsonTableRef != null || tableRefFunction != null,
+            "must set the table reference of a BigQueryIO.Write transform");
+        checkState(
+            jsonTableRef == null || tableRefFunction == null,
+            "Cannot set both a table reference and a table function for a BigQueryIO.Write"
+                + " transform");
 
-        if (createDisposition == CreateDisposition.CREATE_IF_NEEDED && jsonSchema
== null) {
-          throw new IllegalArgumentException("CreateDisposition is CREATE_IF_NEEDED, "
-              + "however no schema was provided.");
-        }
+        // Require a schema if creating one or more tables.
+        checkArgument(
+            createDisposition != CreateDisposition.CREATE_IF_NEEDED || jsonSchema != null,
+            "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
+
+        // The user specified a table.
+        if (jsonTableRef != null && validate) {
+          TableReference table = getTable();
 
-        if (table != null && table.getProjectId() == null) {
           // If user does not specify a project we assume the table to be located in the
project
-          // that owns the Dataflow job.
-          String projectIdFromOptions = options.getProject();
-          LOG.warn(String.format(BigQueryIO.SET_PROJECT_FROM_OPTIONS_WARNING, table.getDatasetId(),
-              table.getTableId(), projectIdFromOptions));
-          table.setProjectId(projectIdFromOptions);
-        }
+          // configured in BigQueryOptions.
+          if (Strings.isNullOrEmpty(table.getProjectId())) {
+            table.setProjectId(options.getProject());
+          }
 
-        // Check for destination table presence and emptiness for early failure notification.
-        // Note that a presence check can fail if the table or dataset are created by earlier
stages
-        // of the pipeline. For these cases the withoutValidation method can be used to disable
-        // the check.
-        // Unfortunately we can't validate anything early if tableRefFunction is specified.
-        if (table != null && validate) {
+          // Check for destination table presence and emptiness for early failure notification.
+          // Note that a presence check can fail when the table or dataset is created by
an earlier
+          // stage of the pipeline. For these cases the #withoutValidation method can be
used to
+          // disable the check.
           verifyDatasetPresence(options, table);
           if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
{
             verifyTablePresence(options, table);
@@ -972,16 +970,16 @@ public class BigQueryIO {
         }
 
         if (options.isStreaming() || tableRefFunction != null) {
-          // We will use BigQuery's streaming write API -- validate support dispositions.
-          if (createDisposition == CreateDisposition.CREATE_NEVER) {
-            throw new IllegalArgumentException("CreateDispostion.CREATE_NEVER is not "
-                + "supported for unbounded PCollections or when using tablespec functions.");
-          }
+          // We will use BigQuery's streaming write API -- validate supported dispositions.
+          checkArgument(
+              createDisposition != CreateDisposition.CREATE_NEVER,
+              "CreateDisposition.CREATE_NEVER is not supported for an unbounded PCollection
or when"
+                  + " using a tablespec function.");
 
-          if (writeDisposition == WriteDisposition.WRITE_TRUNCATE) {
-            throw new IllegalArgumentException("WriteDisposition.WRITE_TRUNCATE is not "
-                + "supported for unbounded PCollections or when using tablespec functions.");
-          }
+          checkArgument(
+              writeDisposition != WriteDisposition.WRITE_TRUNCATE,
+              "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection
or"
+                  + " when using a tablespec function.");
         } else {
           // We will use a BigQuery load job -- validate the temp location.
           String tempLocation = options.getTempLocation();
@@ -1012,13 +1010,17 @@ public class BigQueryIO {
           return input.apply(new StreamWithDeDup(getTable(), tableRefFunction, getSchema()));
         }
 
+        TableReference table = fromJsonString(jsonTableRef, TableReference.class);
+        if (Strings.isNullOrEmpty(table.getProjectId())) {
+          table.setProjectId(options.getProject());
+        }
         String jobIdToken = UUID.randomUUID().toString();
         String tempFilePrefix = options.getTempLocation() + "/BigQuerySinkTemp/" + jobIdToken;
         BigQueryServices bqServices = getBigQueryServices();
         return input.apply("Write", org.apache.beam.sdk.io.Write.to(
             new BigQuerySink(
                 jobIdToken,
-                jsonTableRef,
+                table,
                 jsonSchema,
                 getWriteDisposition(),
                 getCreateDisposition(),
@@ -1047,7 +1049,8 @@ public class BigQueryIO {
         return fromJsonString(jsonSchema, TableSchema.class);
       }
 
-      /** Returns the table reference, or {@code null} if a . */
+      /** Returns the table reference, or {@code null}. */
+      @Nullable
       public TableReference getTable() {
         return fromJsonString(jsonTableRef, TableReference.class);
       }
@@ -1086,7 +1089,7 @@ public class BigQueryIO {
 
     public BigQuerySink(
         String jobIdToken,
-        @Nullable String jsonTable,
+        @Nullable TableReference table,
         @Nullable String jsonSchema,
         WriteDisposition writeDisposition,
         CreateDisposition createDisposition,
@@ -1095,7 +1098,13 @@ public class BigQueryIO {
         BigQueryServices bqServices) {
       super(tempFile, ".json");
       this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken");
-      this.jsonTable = jsonTable;
+      if (table == null) {
+        this.jsonTable = null;
+      } else {
+        checkArgument(!Strings.isNullOrEmpty(table.getProjectId()),
+            "Table %s should have a project specified", table);
+        this.jsonTable = toJsonString(table);
+      }
       this.jsonSchema = jsonSchema;
       this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition");
       this.createDisposition = checkNotNull(createDisposition, "createDisposition");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c876c2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
index b9af1e2..e1f8e4d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
@@ -31,6 +31,8 @@ import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.BigQueryServices;
 import org.apache.beam.sdk.util.BigQueryServices.Status;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -339,7 +341,7 @@ public class BigQueryIOTest {
         new TableRow().set("name", "b").set("number", 2),
         new TableRow().set("name", "c").set("number", 3)))
     .setCoder(TableRowJsonCoder.of())
-    .apply(BigQueryIO.Write.to("project-id:dataset-id.table-id")
+    .apply(BigQueryIO.Write.to("dataset-id.table-id")
         .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
         .withSchema(new TableSchema().setFields(
             ImmutableList.of(
@@ -604,4 +606,40 @@ public class BigQueryIOTest {
     assertEquals("ReadMyTable", BigQueryIO.Read.named("ReadMyTable").getName());
     assertEquals("WriteMyTable", BigQueryIO.Write.named("WriteMyTable").getName());
   }
+
+  @Test
+  public void testWriteValidateFailsCreateNoSchema() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("no schema was provided");
+    TestPipeline.create()
+        .apply(Create.<TableRow>of())
+        .apply(BigQueryIO.Write
+            .to("dataset.table")
+            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));
+  }
+
+  @Test
+  public void testWriteValidateFailsTableAndTableSpec() {
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("Cannot set both a table reference and a table function");
+    TestPipeline.create()
+        .apply(Create.<TableRow>of())
+        .apply(BigQueryIO.Write
+            .to("dataset.table")
+            .to(new SerializableFunction<BoundedWindow, String>() {
+              @Override
+              public String apply(BoundedWindow input) {
+                return null;
+              }
+            }));
+  }
+
+  @Test
+  public void testWriteValidateFailsNoTableAndNoTableSpec() {
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("must set the table reference of a BigQueryIO.Write transform");
+    TestPipeline.create()
+        .apply(Create.<TableRow>of())
+        .apply(BigQueryIO.Write.named("name"));
+  }
 }


Mime
View raw message