beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [1/2] incubator-beam git commit: Forward port DataflowJavaSDK-337 to Beam
Date Tue, 13 Sep 2016 18:05:58 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master b304d037f -> 84e8bfb13


Forward port DataflowJavaSDK-337 to Beam


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/31bd5ba4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/31bd5ba4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/31bd5ba4

Branch: refs/heads/master
Commit: 31bd5ba4bc44862c8042cdf6a79be8a6ebda49a4
Parents: b304d03
Author: Pei He <peihe@google.com>
Authored: Mon Sep 12 20:38:28 2016 -0700
Committer: Pei He <peihe@google.com>
Committed: Mon Sep 12 20:49:50 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 16 ++++++
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 57 ++++++++++++++------
 2 files changed, 58 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/31bd5ba4/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 6dde581..1306e59 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -481,6 +481,22 @@ public class BigQueryIO {
         // read is properly specified.
         BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
 
+        String tempLocation = bqOptions.getTempLocation();
+        checkArgument(
+            !Strings.isNullOrEmpty(tempLocation),
+            "BigQueryIO.Read needs a GCS temp location to store temp files.");
+        if (bigQueryServices == null) {
+          try {
+            GcsPath.fromUri(tempLocation);
+          } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "BigQuery temp location expected a valid 'gs://' path, but was given
'%s'",
+                    tempLocation),
+                e);
+          }
+        }
+
         TableReference table = getTableWithDefaultProject(bqOptions);
         if (table == null && query == null) {
           throw new IllegalStateException(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/31bd5ba4/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 57eb4ff..19eeca5 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -360,8 +360,6 @@ public class BigQueryIOTest implements Serializable {
   @Mock private transient IOChannelFactory mockIOChannelFactory;
   @Mock(extraInterfaces = Serializable.class) private transient DatasetService mockDatasetService;
 
-  private transient BigQueryOptions bqOptions;
-
   private void checkReadTableObject(
       BigQueryIO.Read.Bound bound, String project, String dataset, String table) {
     checkReadTableObjectWithValidate(bound, project, dataset, table, true);
@@ -410,10 +408,6 @@ public class BigQueryIOTest implements Serializable {
 
   @Before
   public void setUp() throws IOException {
-    bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
-    bqOptions.setProject("defaultProject");
-    bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
-
     MockitoAnnotations.initMocks(this);
   }
 
@@ -468,8 +462,9 @@ public class BigQueryIOTest implements Serializable {
   public void testValidateReadSetsDefaultProject() throws Exception {
     String projectId = "someproject";
     String datasetId = "somedataset";
-    BigQueryOptions options = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
-    options.setProject(projectId);
+    BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class);
+    bqOptions.setProject(projectId);
+    bqOptions.setTempLocation("gs://testbucket/testdir");
 
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(mockJobService)
@@ -477,7 +472,7 @@ public class BigQueryIOTest implements Serializable {
     when(mockDatasetService.getDataset(projectId, datasetId)).thenThrow(
         new RuntimeException("Unable to confirm BigQuery dataset presence"));
 
-    Pipeline p = TestPipeline.create(options);
+    Pipeline p = TestPipeline.create(bqOptions);
 
     TableReference tableRef = new TableReference();
     tableRef.setDatasetId(datasetId);
@@ -495,7 +490,11 @@ public class BigQueryIOTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testBuildSourceWithoutTableQueryOrValidation() {
-    Pipeline p = TestPipeline.create();
+    BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class);
+    bqOptions.setProject("defaultProject");
+    bqOptions.setTempLocation("gs://testbucket/testdir");
+
+    Pipeline p = TestPipeline.create(bqOptions);
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage(
         "Invalid BigQuery read operation, either table reference or query has to be set");
@@ -506,7 +505,11 @@ public class BigQueryIOTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testBuildSourceWithTableAndQuery() {
-    Pipeline p = TestPipeline.create();
+    BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class);
+    bqOptions.setProject("defaultProject");
+    bqOptions.setTempLocation("gs://testbucket/testdir");
+
+    Pipeline p = TestPipeline.create(bqOptions);
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage(
         "Invalid BigQuery read operation. Specifies both a query and a table, only one of
these"
@@ -521,7 +524,11 @@ public class BigQueryIOTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testBuildSourceWithTableAndFlatten() {
-    Pipeline p = TestPipeline.create();
+    BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class);
+    bqOptions.setProject("defaultProject");
+    bqOptions.setTempLocation("gs://testbucket/testdir");
+
+    Pipeline p = TestPipeline.create(bqOptions);
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage(
         "Invalid BigQuery read operation. Specifies a"
@@ -536,7 +543,11 @@ public class BigQueryIOTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testBuildSourceWithTableAndFlattenWithoutValidation() {
-    Pipeline p = TestPipeline.create();
+    BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class);
+    bqOptions.setProject("defaultProject");
+    bqOptions.setTempLocation("gs://testbucket/testdir");
+
+    Pipeline p = TestPipeline.create(bqOptions);
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage(
         "Invalid BigQuery read operation. Specifies a"
@@ -551,7 +562,11 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   @Category(NeedsRunner.class)
-  public void testReadFromTable() {
+  public void testReadFromTable() throws IOException {
+    BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class);
+    bqOptions.setProject("defaultProject");
+    bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(new FakeJobService()
             .startJobReturns("done", "done")
@@ -583,6 +598,10 @@ public class BigQueryIOTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testWrite() throws Exception {
+    BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class);
+    bqOptions.setProject("defaultProject");
+    bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(new FakeJobService()
             .startJobReturns("done", "done", "done")
@@ -617,6 +636,10 @@ public class BigQueryIOTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testWriteUnknown() throws Exception {
+    BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class);
+    bqOptions.setProject("defaultProject");
+    bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(new FakeJobService()
             .startJobReturns("done", "done")
@@ -1347,7 +1370,7 @@ public class BigQueryIOTest implements Serializable {
 
     DoFnTester<String, KV<Long, List<String>>> tester = DoFnTester.of(writePartition);
     tester.setSideInput(filesView, GlobalWindow.INSTANCE, files);
-    tester.processElement(bqOptions.getTempLocation());
+    tester.processElement(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
 
     List<KV<Long, List<String>>> partitions;
     if (expectedNumPartitions > 1) {
@@ -1428,6 +1451,10 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testRemoveTemporaryFiles() throws Exception {
+    BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class);
+    bqOptions.setProject("defaultProject");
+    bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+
     int numFiles = 10;
     List<String> fileNames = Lists.newArrayList();
     String tempFilePrefix = bqOptions.getTempLocation() + "/";


Mime
View raw message