beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From reuven...@apache.org
Subject [1/3] beam git commit: Add unit-test coverage.
Date Fri, 22 Sep 2017 03:13:59 GMT
Repository: beam
Updated Branches:
  refs/heads/master 0d5d00d70 -> 0a073af40


Add unit-test coverage.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/63b60b1c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/63b60b1c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/63b60b1c

Branch: refs/heads/master
Commit: 63b60b1c7438d78702332b4cbc41cf717f2089b7
Parents: cc24c86
Author: Reuven Lax <relax@relax-macbookpro2.roam.corp.google.com>
Authored: Tue Sep 19 22:11:29 2017 -0700
Committer: Reuven Lax <relax@relax-macbookpro2.roam.corp.google.com>
Committed: Thu Sep 21 20:09:21 2017 -0700

----------------------------------------------------------------------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 51 ++++++++++++++++++--
 1 file changed, 48 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/63b60b1c/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index c4403b0..9120507 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -134,7 +134,6 @@ import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.ShardedKey;
 import org.apache.beam.sdk.values.TupleTag;
@@ -741,6 +740,53 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
+  public void testFailuresNoRetryPolicy() throws Exception {
+    BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+    bqOptions.setProject("project-id");
+    bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+
+    FakeDatasetService datasetService = new FakeDatasetService();
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+        .withJobService(new FakeJobService())
+        .withDatasetService(datasetService);
+
+    datasetService.createDataset("project-id", "dataset-id", "", "");
+
+    TableRow row1 = new TableRow().set("name", "a").set("number", "1");
+    TableRow row2 = new TableRow().set("name", "b").set("number", "2");
+    TableRow row3 = new TableRow().set("name", "c").set("number", "3");
+
+    TableDataInsertAllResponse.InsertErrors ephemeralError =
+        new TableDataInsertAllResponse.InsertErrors().setErrors(
+            ImmutableList.of(new ErrorProto().setReason("timeout")));
+
+    datasetService.failOnInsert(
+        ImmutableMap.<TableRow, List<TableDataInsertAllResponse.InsertErrors>>of(
+            row1, ImmutableList.of(ephemeralError, ephemeralError),
+            row2, ImmutableList.of(ephemeralError, ephemeralError)));
+
+    Pipeline p = TestPipeline.create(bqOptions);
+    p.apply(Create.of(row1, row2, row3))
+        .apply(
+            BigQueryIO.writeTableRows()
+                .to("project-id:dataset-id.table-id")
+                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+                .withMethod(Method.STREAMING_INSERTS)
+                .withSchema(
+                    new TableSchema()
+                        .setFields(
+                            ImmutableList.of(
+                                new TableFieldSchema().setName("name").setType("STRING"),
+                                new TableFieldSchema().setName("number").setType("INTEGER"))))
+                .withTestServices(fakeBqServices)
+                .withoutValidation());
+    p.run();
+
+    assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id"),
+        containsInAnyOrder(row1, row2, row3));
+  }
+
+  @Test
   public void testRetryPolicy() throws Exception {
     BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
     bqOptions.setProject("project-id");
@@ -772,9 +818,9 @@ public class BigQueryIOTest implements Serializable {
     Pipeline p = TestPipeline.create(bqOptions);
     PCollection<TableRow> failedRows =
         p.apply(Create.of(row1, row2, row3))
-            .setIsBoundedInternal(IsBounded.UNBOUNDED)
             .apply(BigQueryIO.writeTableRows().to("project-id:dataset-id.table-id")
             .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+            .withMethod(Method.STREAMING_INSERTS)
             .withSchema(new TableSchema().setFields(
                 ImmutableList.of(
                     new TableFieldSchema().setName("name").setType("STRING"),
@@ -790,7 +836,6 @@ public class BigQueryIOTest implements Serializable {
     // Only row1 and row3 were successfully inserted.
     assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id"),
         containsInAnyOrder(row1, row3));
-
   }
 
   @Test


Mime
View raw message