beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] beam git commit: BigQueryIO: Remove tempLocation usage at pipeline construction time
Date Thu, 04 May 2017 03:51:25 GMT
Repository: beam
Updated Branches:
  refs/heads/master ad12f6316 -> f3f881084


BigQueryIO: Remove tempLocation usage at pipeline construction time


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d0a2249f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d0a2249f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d0a2249f

Branch: refs/heads/master
Commit: d0a2249f17446156d9ce35b7d0b559b51e62b0b8
Parents: 1bc50d6
Author: Vikas Kedigehalli <vikasrk@google.com>
Authored: Wed May 3 15:42:50 2017 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Wed May 3 20:50:53 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    | 42 +++++++--------
 .../sdk/io/gcp/bigquery/BigQueryHelpers.java    | 15 ++++++
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 18 +++----
 .../io/gcp/bigquery/BigQueryQuerySource.java    |  5 +-
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 14 +++--
 .../io/gcp/bigquery/BigQueryTableSource.java    |  6 +--
 .../io/gcp/bigquery/WriteBundlesToFiles.java    | 19 +++----
 .../beam/sdk/io/gcp/bigquery/WriteTables.java   | 19 +++----
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 57 ++++----------------
 9 files changed, 75 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 4e14696..78d39b5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -19,11 +19,11 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
 
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -44,8 +45,6 @@ import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.Reshuffle;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.KV;
@@ -106,26 +105,23 @@ class BatchLoads<DestinationT>
   @Override
   public WriteResult expand(PCollection<KV<DestinationT, TableRow>> input) {
     Pipeline p = input.getPipeline();
-    BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
-
-    validate(p.getOptions());
-
     final String stepUuid = BigQueryHelpers.randomUUIDString();
 
-    String tempLocation = options.getTempLocation();
-    String tempFilePrefix;
-    try {
-      IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
-      tempFilePrefix =
-          factory.resolve(factory.resolve(tempLocation, "BigQueryWriteTemp"), stepUuid);
-    } catch (IOException e) {
-      throw new RuntimeException(
-          String.format("Failed to resolve BigQuery temp location in %s", tempLocation),
e);
-    }
-
     // Create a singleton job ID token at execution time. This will be used as the base for
all
-    // load jobs issued from this instance of the transfomr.
-    PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix));
+    // load jobs issued from this instance of the transform.
+    PCollection<String> singleton = p
+        .apply("Create", Create.of((Void) null))
+        .apply("GetTempFilePrefix", ParDo.of(new DoFn<Void, String>() {
+          @ProcessElement
+          public void getTempFilePrefix(ProcessContext c) {
+            c.output(
+                resolveTempLocation(
+                    c.getPipelineOptions().getTempLocation(),
+                    "BigQueryWriteTemp",
+                    stepUuid));
+          }
+        }));
+
     PCollectionView<String> jobIdTokenView =
         p.apply("TriggerIdCreation", Create.of("ignored"))
             .apply(
@@ -152,7 +148,7 @@ class BatchLoads<DestinationT>
     PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
         inputInGlobalWindow
             .apply("WriteBundlesToFiles", ParDo.of(
-                new WriteBundlesToFiles<DestinationT>(tempFilePrefix)))
+                new WriteBundlesToFiles<DestinationT>(stepUuid)))
             .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
 
     TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag
=
@@ -209,7 +205,7 @@ class BatchLoads<DestinationT>
                             bigQueryServices,
                             jobIdTokenView,
                             schemasView,
-                            tempFilePrefix,
+                            stepUuid,
                             WriteDisposition.WRITE_EMPTY,
                             CreateDisposition.CREATE_IF_NEEDED,
                             dynamicDestinations))
@@ -247,7 +243,7 @@ class BatchLoads<DestinationT>
                         bigQueryServices,
                         jobIdTokenView,
                         schemasView,
-                        tempFilePrefix,
+                        stepUuid,
                         writeDisposition,
                         createDisposition,
                         dynamicDestinations))

http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 70e7a5f..6b4e518 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -40,6 +40,8 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.IOChannelFactory;
+import org.apache.beam.sdk.util.IOChannelUtils;
 
 /** A set of helper functions and classes used by {@link BigQueryIO}. */
 public class BigQueryHelpers {
@@ -304,4 +306,17 @@ public class BigQueryHelpers {
         .setTableId(queryTempTableId);
     return queryTempTableRef;
   }
+
+  static String resolveTempLocation(
+      String tempLocationDir, String bigQueryOperationName, String stepUuid) {
+    try {
+      IOChannelFactory factory = IOChannelUtils.getFactory(tempLocationDir);
+      return factory.resolve(
+          factory.resolve(tempLocationDir, bigQueryOperationName),  stepUuid);
+    } catch (IOException e) {
+      throw new RuntimeException(
+          String.format("Failed to resolve temp destination directory in %s",
+              tempLocationDir), e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 29491d8..c76ee86 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken;
 import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getExtractJobId;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
 
 import com.google.api.client.json.JsonFactory;
 import com.google.api.services.bigquery.model.Job;
@@ -482,17 +483,7 @@ public class BigQueryIO {
     @Override
     public PCollection<TableRow> expand(PBegin input) {
       final String stepUuid = BigQueryHelpers.randomUUIDString();
-      BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
       BoundedSource<TableRow> source;
-      final String extractDestinationDir;
-      String tempLocation = bqOptions.getTempLocation();
-      try {
-        IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
-        extractDestinationDir = factory.resolve(tempLocation, stepUuid);
-      } catch (IOException e) {
-        throw new RuntimeException(
-            String.format("Failed to resolve extract destination directory in %s", tempLocation));
-      }
 
       if (getQuery() != null
           && (!getQuery().isAccessible() || !Strings.isNullOrEmpty(getQuery().get())))
{
@@ -502,14 +493,12 @@ public class BigQueryIO {
                 getQuery(),
                 getFlattenResults(),
                 getUseLegacySql(),
-                extractDestinationDir,
                 getBigQueryServices());
       } else {
         source =
             BigQueryTableSource.create(
                 stepUuid,
                 getTableProvider(),
-                extractDestinationDir,
                 getBigQueryServices());
       }
       PassThroughThenCleanup.CleanupOperation cleanupOperation =
@@ -517,6 +506,11 @@ public class BigQueryIO {
             @Override
             void cleanup(PipelineOptions options) throws Exception {
               BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+              final String extractDestinationDir =
+                  resolveTempLocation(
+                      bqOptions.getTempLocation(),
+                      "BigQueryExtractTemp",
+                      stepUuid);
 
               JobReference jobRef =
                   new JobReference()

http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
index 205f9cc..710c934 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
@@ -53,14 +53,12 @@ class BigQueryQuerySource extends BigQuerySourceBase {
       ValueProvider<String> query,
       Boolean flattenResults,
       Boolean useLegacySql,
-      String extractDestinationDir,
       BigQueryServices bqServices) {
     return new BigQueryQuerySource(
         stepUuid,
         query,
         flattenResults,
         useLegacySql,
-        extractDestinationDir,
         bqServices);
   }
 
@@ -74,9 +72,8 @@ class BigQueryQuerySource extends BigQuerySourceBase {
       ValueProvider<String> query,
       Boolean flattenResults,
       Boolean useLegacySql,
-      String extractDestinationDir,
       BigQueryServices bqServices) {
-    super(stepUuid, extractDestinationDir, bqServices);
+    super(stepUuid, bqServices);
     this.query = checkNotNull(query, "query");
     this.flattenResults = checkNotNull(flattenResults, "flattenResults");
     this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql");

http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 0171046..41e298c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken;
 import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getExtractJobId;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
 
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfigurationExtract;
@@ -64,14 +65,12 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow>
{
   protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
 
   protected final String stepUuid;
-  protected final String extractDestinationDir;
   protected final BigQueryServices bqServices;
 
   private transient List<BoundedSource<TableRow>> cachedSplitResult;
 
-  BigQuerySourceBase(String stepUuid, String extractDestinationDir, BigQueryServices bqServices)
{
+  BigQuerySourceBase(String stepUuid, BigQueryServices bqServices) {
     this.stepUuid = checkNotNull(stepUuid, "stepUuid");
-    this.extractDestinationDir = checkNotNull(extractDestinationDir, "extractDestinationDir");
     this.bqServices = checkNotNull(bqServices, "bqServices");
   }
 
@@ -86,9 +85,13 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow>
{
       BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
       TableReference tableToExtract = getTableToExtract(bqOptions);
       JobService jobService = bqServices.getJobService(bqOptions);
+
+      final String extractDestinationDir =
+          resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid);
+
       String extractJobId = getExtractJobId(createJobIdToken(options.getJobName(), stepUuid));
       List<String> tempFiles = executeExtract(
-          extractJobId, tableToExtract, jobService, bqOptions.getProject());
+          extractJobId, tableToExtract, jobService, bqOptions.getProject(), extractDestinationDir);
 
       TableSchema tableSchema = bqServices.getDatasetService(bqOptions)
           .getTable(tableToExtract).getSchema();
@@ -114,7 +117,8 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow>
{
   }
 
   private List<String> executeExtract(
-      String jobId, TableReference table, JobService jobService, String executingProject)
+      String jobId, TableReference table, JobService jobService, String executingProject,
+      String extractDestinationDir)
           throws InterruptedException, IOException {
     JobReference jobRef = new JobReference()
         .setProjectId(executingProject)

http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index e754bd2..1d45641 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -45,9 +45,8 @@ class BigQueryTableSource extends BigQuerySourceBase {
   static BigQueryTableSource create(
       String stepUuid,
       ValueProvider<TableReference> table,
-      String extractDestinationDir,
       BigQueryServices bqServices) {
-    return new BigQueryTableSource(stepUuid, table, extractDestinationDir, bqServices);
+    return new BigQueryTableSource(stepUuid, table, bqServices);
   }
 
   private final ValueProvider<String> jsonTable;
@@ -56,9 +55,8 @@ class BigQueryTableSource extends BigQuerySourceBase {
   private BigQueryTableSource(
       String stepUuid,
       ValueProvider<TableReference> table,
-      String extractDestinationDir,
       BigQueryServices bqServices) {
-    super(stepUuid, extractDestinationDir, bqServices);
+    super(stepUuid, bqServices);
     this.jsonTable = NestedValueProvider.of(checkNotNull(table, "table"), new TableRefToJson());
     this.tableSizeBytes = new AtomicReference<>();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index 4f609b2..e90b974 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -18,6 +18,8 @@
 
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
+
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.common.collect.Maps;
 import java.io.IOException;
@@ -32,7 +34,6 @@ import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.KV;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,7 +48,7 @@ class WriteBundlesToFiles<DestinationT>
 
   // Map from tablespec to a writer for that table.
   private transient Map<DestinationT, TableRowWriter> writers;
-  private final String tempFilePrefix;
+  private final String stepUuid;
 
   /**
    * The result of the {@link WriteBundlesToFiles} transform. Corresponds to a single output
file,
@@ -104,8 +105,8 @@ class WriteBundlesToFiles<DestinationT>
     public void verifyDeterministic() {}
   }
 
-  WriteBundlesToFiles(String tempFilePrefix) {
-    this.tempFilePrefix = tempFilePrefix;
+  WriteBundlesToFiles(String stepUuid) {
+    this.stepUuid = stepUuid;
   }
 
   @StartBundle
@@ -117,6 +118,8 @@ class WriteBundlesToFiles<DestinationT>
 
   @ProcessElement
   public void processElement(ProcessContext c) throws Exception {
+    String tempFilePrefix = resolveTempLocation(
+        c.getPipelineOptions().getTempLocation(), "BigQueryWriteTemp", stepUuid);
     TableRowWriter writer = writers.get(c.element().getKey());
     if (writer == null) {
       writer = new TableRowWriter(tempFilePrefix);
@@ -147,12 +150,4 @@ class WriteBundlesToFiles<DestinationT>
     }
     writers.clear();
   }
-
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-    super.populateDisplayData(builder);
-
-    builder.addIfNotNull(
-        DisplayData.item("tempFilePrefix", tempFilePrefix).withLabel("Temporary File Prefix"));
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index b299244..c480b42 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -18,6 +18,8 @@
 
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
+
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfigurationLoad;
 import com.google.api.services.bigquery.model.JobReference;
@@ -41,7 +43,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.FileIOChannelFactory;
 import org.apache.beam.sdk.util.GcsIOChannelFactory;
 import org.apache.beam.sdk.util.GcsUtil;
@@ -73,7 +74,7 @@ class WriteTables<DestinationT>
   private final BigQueryServices bqServices;
   private final PCollectionView<String> jobIdToken;
   private final PCollectionView<Map<DestinationT, String>> schemasView;
-  private final String tempFilePrefix;
+  private final String stepUuid;
   private final WriteDisposition writeDisposition;
   private final CreateDisposition createDisposition;
   private final DynamicDestinations<?, DestinationT> dynamicDestinations;
@@ -83,7 +84,7 @@ class WriteTables<DestinationT>
       BigQueryServices bqServices,
       PCollectionView<String> jobIdToken,
       PCollectionView<Map<DestinationT, String>> schemasView,
-      String tempFilePrefix,
+      String stepUuid,
       WriteDisposition writeDisposition,
       CreateDisposition createDisposition,
       DynamicDestinations<?, DestinationT> dynamicDestinations) {
@@ -91,7 +92,7 @@ class WriteTables<DestinationT>
     this.bqServices = bqServices;
     this.jobIdToken = jobIdToken;
     this.schemasView = schemasView;
-    this.tempFilePrefix = tempFilePrefix;
+    this.stepUuid = stepUuid;
     this.writeDisposition = writeDisposition;
     this.createDisposition = createDisposition;
     this.dynamicDestinations = dynamicDestinations;
@@ -113,6 +114,8 @@ class WriteTables<DestinationT>
           tableReference, tableDestination.getTableDescription());
     }
 
+    String tempFilePrefix = resolveTempLocation(
+        c.getPipelineOptions().getTempLocation(), "BigQueryWriteTemp", stepUuid);
     Integer partition = c.element().getKey().getShardNumber();
     List<String> partitionFiles = Lists.newArrayList(c.element().getValue());
     String jobIdPrefix =
@@ -213,12 +216,4 @@ class WriteTables<DestinationT>
       throw new IOException("Unrecognized file system.");
     }
   }
-
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-    super.populateDisplayData(builder);
-
-    builder.addIfNotNull(
-        DisplayData.item("tempFilePrefix", tempFilePrefix).withLabel("Temporary File Prefix"));
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index e267dab..026afce 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -91,7 +91,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup.CleanupOperati
 import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -136,7 +135,6 @@ import org.joda.time.Instant;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -946,7 +944,6 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
-  @Ignore("[BEAM-436] DirectRunner tempLocation configuration insufficient")
   public void testTableSourcePrimitiveDisplayData() throws IOException, InterruptedException
{
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
     BigQueryIO.Read read = BigQueryIO.read()
@@ -962,7 +959,6 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
-  @Ignore("[BEAM-436] DirectRunner tempLocation configuration insufficient")
   public void testQuerySourcePrimitiveDisplayData() throws IOException, InterruptedException
{
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
     BigQueryIO.Read read = BigQueryIO.read()
@@ -988,40 +984,6 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
-  @Ignore("[BEAM-436] DirectRunner tempLocation configuration insufficient")
-  public void testBatchWritePrimitiveDisplayData() throws IOException, InterruptedException
{
-    testWritePrimitiveDisplayData(/* streaming: */ false);
-  }
-
-  @Test
-  @Ignore("[BEAM-436] DirectRunner tempLocation configuration insufficient")
-  public void testStreamingWritePrimitiveDisplayData() throws IOException, InterruptedException
{
-    testWritePrimitiveDisplayData(/* streaming: */ true);
-  }
-
-  private void testWritePrimitiveDisplayData(boolean streaming) throws IOException,
-      InterruptedException {
-    PipelineOptions options = TestPipeline.testingPipelineOptions();
-    options.as(StreamingOptions.class).setStreaming(streaming);
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options);
-
-    BigQueryIO.Write write = BigQueryIO.writeTableRows()
-        .to("project:dataset.table")
-        .withSchema(new TableSchema().set("col1", "type1").set("col2", "type2"))
-        .withTestServices(new FakeBigQueryServices()
-          .withDatasetService(new FakeDatasetService())
-          .withJobService(new FakeJobService()))
-        .withoutValidation();
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
-    assertThat("BigQueryIO.Write should include the table spec in its primitive display data",
-        displayData, hasItem(hasDisplayItem("tableSpec")));
-
-    assertThat("BigQueryIO.Write should include the table schema in its primitive display
data",
-        displayData, hasItem(hasDisplayItem("schema")));
-  }
-
-  @Test
   public void testBuildWriteWithoutValidation() {
     // This test just checks that using withoutValidation will not trigger object
     // construction errors.
@@ -1360,9 +1322,10 @@ public class BigQueryIOTest implements Serializable {
     Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryTableSourceThroughJsonAPI");
     String stepUuid = "testStepUuid";
     BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
-        stepUuid, StaticValueProvider.of(table), baseDir.toString(), fakeBqServices);
+        stepUuid, StaticValueProvider.of(table), fakeBqServices);
 
     PipelineOptions options = PipelineOptionsFactory.create();
+    options.setTempLocation(baseDir.toString());
     Assert.assertThat(
         SourceTestUtils.readFromSource(bqSource, options),
         CoreMatchers.is(expected));
@@ -1399,9 +1362,8 @@ public class BigQueryIOTest implements Serializable {
     Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryTableSourceInitSplit");
 
     String stepUuid = "testStepUuid";
-    String extractDestinationDir = baseDir.toString();
     BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
-        stepUuid, StaticValueProvider.of(table), extractDestinationDir, fakeBqServices);
+        stepUuid, StaticValueProvider.of(table), fakeBqServices);
 
     PipelineOptions options = PipelineOptionsFactory.create();
     options.setTempLocation(baseDir.toString());
@@ -1479,12 +1441,10 @@ public class BigQueryIOTest implements Serializable {
 
 
     String query = FakeBigQueryServices.encodeQuery(expected);
-    String extractDestinationDir = baseDir.toString();
     BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
         stepUuid, StaticValueProvider.of(query),
-        true /* flattenResults */, true /* useLegacySql */,
-        extractDestinationDir, fakeBqServices);
-    options.setTempLocation(extractDestinationDir);
+        true /* flattenResults */, true /* useLegacySql */, fakeBqServices);
+    options.setTempLocation(baseDir.toString());
 
     TableReference queryTable = new TableReference()
         .setProjectId(bqOptions.getProject())
@@ -1571,7 +1531,7 @@ public class BigQueryIOTest implements Serializable {
     BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
         stepUuid,
         StaticValueProvider.of(query),
-        true /* flattenResults */, true /* useLegacySql */, baseDir.toString(), fakeBqServices);
+        true /* flattenResults */, true /* useLegacySql */, fakeBqServices);
 
     options.setTempLocation(baseDir.toString());
 
@@ -1845,7 +1805,7 @@ public class BigQueryIOTest implements Serializable {
     long numPartitions = 3;
     long numFilesPerPartition = 10;
     String jobIdToken = "jobIdToken";
-    String tempFilePrefix = "tempFilePrefix";
+    String stepUuid = "stepUuid";
     Map<TableDestination, List<String>> expectedTempTables = Maps.newHashMap();
 
     Path baseDir = Files.createTempDirectory(tempFolder, "testWriteTables");
@@ -1898,7 +1858,7 @@ public class BigQueryIOTest implements Serializable {
             fakeBqServices,
             jobIdTokenView,
             schemaMapView,
-            tempFilePrefix,
+            stepUuid,
             WriteDisposition.WRITE_EMPTY,
             CreateDisposition.CREATE_IF_NEEDED,
             new IdentityDynamicTables());
@@ -1907,6 +1867,7 @@ public class BigQueryIOTest implements Serializable {
         KV<TableDestination, String>> tester = DoFnTester.of(writeTables);
     tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken);
     tester.setSideInput(schemaMapView, GlobalWindow.INSTANCE, ImmutableMap.<String, String>of());
+    tester.getPipelineOptions().setTempLocation("tempLocation");
     for (KV<ShardedKey<String>, List<String>> partition : partitions) {
       tester.processElement(partition);
     }


Mime
View raw message