beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [46/50] [abbrv] beam git commit: Cache result of BigQuerySourceBase.split
Date Wed, 19 Apr 2017 19:15:20 GMT
Cache result of BigQuerySourceBase.split


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1533e2b9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1533e2b9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1533e2b9

Branch: refs/heads/DSL_SQL
Commit: 1533e2b9bc49971929277b804587d93d8d2cae4c
Parents: 29e054a
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Wed Apr 19 10:09:42 2017 -0700
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Wed Apr 19 11:39:21 2017 -0700

----------------------------------------------------------------------
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 31 +++++++++++++-------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 18 +++++-------
 .../sdk/io/gcp/bigquery/FakeJobService.java     |  9 ++++++
 3 files changed, 37 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1533e2b9/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 1b90dc3..4142da9 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -69,6 +69,8 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow>
{
   protected final BigQueryServices bqServices;
   protected final ValueProvider<String> executingProject;
 
+  private List<BoundedSource<TableRow>> cachedSplitResult;
+
   BigQuerySourceBase(
       ValueProvider<String> jobIdToken,
       String extractDestinationDir,
@@ -83,17 +85,24 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow>
{
   @Override
   public List<BoundedSource<TableRow>> split(
       long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-    BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-    TableReference tableToExtract = getTableToExtract(bqOptions);
-    JobService jobService = bqServices.getJobService(bqOptions);
-    String extractJobId = BigQueryIO.getExtractJobId(jobIdToken);
-    List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService);
-
-    TableSchema tableSchema = bqServices.getDatasetService(bqOptions)
-        .getTable(tableToExtract).getSchema();
-
-    cleanupTempResource(bqOptions);
-    return createSources(tempFiles, tableSchema);
+    // split() can be called multiple times, e.g. Dataflow runner may call it multiple times
+    // with different desiredBundleSizeBytes in case the split() call produces too many sources.
+    // We ignore desiredBundleSizeBytes anyway, however in any case, we should not initiate
+    // another BigQuery extract job for the repeated split() calls.
+    if (cachedSplitResult == null) {
+      BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+      TableReference tableToExtract = getTableToExtract(bqOptions);
+      JobService jobService = bqServices.getJobService(bqOptions);
+      String extractJobId = BigQueryIO.getExtractJobId(jobIdToken);
+      List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService);
+
+      TableSchema tableSchema = bqServices.getDatasetService(bqOptions)
+          .getTable(tableToExtract).getSchema();
+
+      cleanupTempResource(bqOptions);
+      cachedSplitResult = checkNotNull(createSources(tempFiles, tableSchema));
+    }
+    return cachedSplitResult;
   }
 
   protected abstract TableReference getTableToExtract(BigQueryOptions bqOptions) throws Exception;

http://git-wip-us.apache.org/repos/asf/beam/blob/1533e2b9/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index d0004e4..62c5b5f 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -28,7 +28,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-
 import com.google.api.client.util.Data;
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobStatistics;
@@ -1230,17 +1229,10 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBigQueryTableSourceInitSplit() throws Exception {
-    Job extractJob = new Job();
-    JobStatistics jobStats = new JobStatistics();
-    JobStatistics4 extractStats = new JobStatistics4();
-    extractStats.setDestinationUriFileCounts(ImmutableList.of(1L));
-    jobStats.setExtract(extractStats);
-    extractJob.setStatus(new JobStatus())
-        .setStatistics(jobStats);
-
     FakeDatasetService fakeDatasetService = new FakeDatasetService();
+    FakeJobService fakeJobService = new FakeJobService();
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(new FakeJobService())
+        .withJobService(fakeJobService)
         .withDatasetService(fakeDatasetService);
 
     List<TableRow> expected = ImmutableList.of(
@@ -1280,8 +1272,14 @@ public class BigQueryIOTest implements Serializable {
 
     List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options);
     assertEquals(2, sources.size());
+    // Simulate a repeated call to split(), like a Dataflow worker will sometimes do.
+    sources = bqSource.split(200, options);
+    assertEquals(2, sources.size());
     BoundedSource<TableRow> actual = sources.get(0);
     assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
+
+    // A repeated call to split() should not have caused a duplicate extract job.
+    assertEquals(1, fakeJobService.getNumExtractJobCalls());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/1533e2b9/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
index a2454fb..cffd873 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -95,6 +95,7 @@ class FakeJobService implements JobService, Serializable {
 
   private static final com.google.common.collect.Table<String, String, JobInfo> allJobs
=
       HashBasedTable.create();
+  private static int numExtractJobCalls = 0;
 
   private static final com.google.common.collect.Table<String, String, List<String>>
       filesForLoadJobs = HashBasedTable.create();
@@ -136,6 +137,8 @@ class FakeJobService implements JobService, Serializable {
     checkArgument(extractConfig.getDestinationFormat().equals("AVRO"),
         "Only extract to AVRO is supported");
     synchronized (allJobs) {
+      ++numExtractJobCalls;
+
       Job job = new Job();
       job.setJobReference(jobRef);
       job.setConfiguration(new JobConfiguration().setExtract(extractConfig));
@@ -145,6 +148,12 @@ class FakeJobService implements JobService, Serializable {
     }
   }
 
+  public int getNumExtractJobCalls() {
+    synchronized (allJobs) {
+      return numExtractJobCalls;
+    }
+  }
+
   @Override
   public void startQueryJob(JobReference jobRef, JobConfigurationQuery query)
       throws IOException, InterruptedException {


Mime
View raw message