beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [2/3] beam git commit: Fully general dynamic tables (including schemas) in BigQueryIO.
Date Wed, 03 May 2017 23:28:09 GMT
Fully general dynamic tables (including schemas) in BigQueryIO.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/35db7457
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/35db7457
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/35db7457

Branch: refs/heads/master
Commit: 35db7457bf8c95ac693c2c36c5f8909afc3f16ab
Parents: 17f0843
Author: Reuven Lax <relax@google.com>
Authored: Wed Apr 19 07:56:52 2017 -0700
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Wed May 3 16:06:47 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    | 129 +++---
 .../sdk/io/gcp/bigquery/BigQueryHelpers.java    |  74 ++--
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 422 +++++++++++--------
 .../sdk/io/gcp/bigquery/CalculateSchemas.java   |  79 ++++
 .../beam/sdk/io/gcp/bigquery/CreateTables.java  |  65 ++-
 .../io/gcp/bigquery/DynamicDestinations.java    | 178 ++++++++
 .../bigquery/DynamicDestinationsHelpers.java    | 192 +++++++++
 .../beam/sdk/io/gcp/bigquery/PrepareWrite.java  |  36 +-
 .../sdk/io/gcp/bigquery/StreamingInserts.java   |  59 +--
 .../io/gcp/bigquery/StreamingWriteTables.java   |   8 +
 .../io/gcp/bigquery/TableDestinationCoder.java  |   2 +-
 .../io/gcp/bigquery/WriteBundlesToFiles.java    |  56 +--
 .../sdk/io/gcp/bigquery/WritePartition.java     |  69 ++-
 .../beam/sdk/io/gcp/bigquery/WriteTables.java   | 112 ++---
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 336 +++++++++++----
 15 files changed, 1241 insertions(+), 576 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 593c580..4e14696 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -21,26 +21,24 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
 import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
@@ -58,27 +56,31 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 
 /** PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery. */
-class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>, WriteResult> {
-  BigQueryIO.Write<?> write;
+class BatchLoads<DestinationT>
+    extends PTransform<PCollection<KV<DestinationT, TableRow>>, WriteResult> {
+  private BigQueryServices bigQueryServices;
+  private final WriteDisposition writeDisposition;
+  private final CreateDisposition createDisposition;
+  // Indicates that we are writing to a constant single table. If this is the case, we will create
+  // the table, even if there is no data in it.
+  private final boolean singletonTable;
+  private final DynamicDestinations<?, DestinationT> dynamicDestinations;
+  private final Coder<DestinationT> destinationCoder;
 
-  private static class ConstantSchemaFunction
-      implements SerializableFunction<TableDestination, TableSchema> {
-    private final @Nullable ValueProvider<String> jsonSchema;
-
-    ConstantSchemaFunction(ValueProvider<String> jsonSchema) {
-      this.jsonSchema = jsonSchema;
-    }
-
-    @Override
-    @Nullable
-    public TableSchema apply(TableDestination table) {
-      return BigQueryHelpers.fromJsonString(
-          jsonSchema == null ? null : jsonSchema.get(), TableSchema.class);
-    }
+  BatchLoads(WriteDisposition writeDisposition, CreateDisposition createDisposition,
+             boolean singletonTable,
+             DynamicDestinations<?, DestinationT> dynamicDestinations,
+             Coder<DestinationT> destinationCoder) {
+    bigQueryServices = new BigQueryServicesImpl();
+    this.writeDisposition = writeDisposition;
+    this.createDisposition = createDisposition;
+    this.singletonTable = singletonTable;
+    this.dynamicDestinations = dynamicDestinations;
+    this.destinationCoder = destinationCoder;
   }
 
-  BatchLoads(BigQueryIO.Write<?> write) {
-    this.write = write;
+  void setTestServices(BigQueryServices bigQueryServices) {
+    this.bigQueryServices = bigQueryServices;
   }
 
   @Override
@@ -88,7 +90,7 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>,
     checkArgument(
         !Strings.isNullOrEmpty(tempLocation),
         "BigQueryIO.Write needs a GCS temp location to store temp files.");
-    if (write.getBigQueryServices() == null) {
+    if (bigQueryServices == null) {
       try {
         GcsPath.fromUri(tempLocation);
       } catch (IllegalArgumentException e) {
@@ -102,7 +104,7 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>,
   }
 
   @Override
-  public WriteResult expand(PCollection<KV<TableDestination, TableRow>> input) {
+  public WriteResult expand(PCollection<KV<DestinationT, TableRow>> input) {
     Pipeline p = input.getPipeline();
     BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
 
@@ -137,28 +139,32 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>,
                     }))
             .apply(View.<String>asSingleton());
 
-    PCollection<KV<TableDestination, TableRow>> inputInGlobalWindow =
+    PCollection<KV<DestinationT, TableRow>> inputInGlobalWindow =
         input.apply(
             "rewindowIntoGlobal",
-            Window.<KV<TableDestination, TableRow>>into(new GlobalWindows())
+            Window.<KV<DestinationT, TableRow>>into(new GlobalWindows())
                 .triggering(DefaultTrigger.of())
                 .discardingFiredPanes());
+    PCollectionView<Map<DestinationT, String>> schemasView =
+        inputInGlobalWindow.apply(new CalculateSchemas<>(dynamicDestinations));
 
     // PCollection of filename, file byte size, and table destination.
-    PCollection<WriteBundlesToFiles.Result> results =
+    PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
         inputInGlobalWindow
-            .apply("WriteBundlesToFiles", ParDo.of(new WriteBundlesToFiles(tempFilePrefix)))
-            .setCoder(WriteBundlesToFiles.ResultCoder.of());
+            .apply("WriteBundlesToFiles", ParDo.of(
+                new WriteBundlesToFiles<DestinationT>(tempFilePrefix)))
+            .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
 
-    TupleTag<KV<ShardedKey<TableDestination>, List<String>>> multiPartitionsTag =
-        new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("multiPartitionsTag") {};
-    TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag =
-        new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("singlePartitionTag") {};
+    TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag =
+        new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("multiPartitionsTag") {};
+    TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag =
+        new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("singlePartitionTag") {};
 
     // Turn the list of files and record counts in a PCollectionView that can be used as a
     // side input.
-    PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView =
-        results.apply("ResultsView", View.<WriteBundlesToFiles.Result>asIterable());
+    PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> resultsView =
+        results.apply("ResultsView",
+            View.<WriteBundlesToFiles.Result<DestinationT>>asIterable());
     // This transform will look at the set of files written for each table, and if any table has
     // too many files or bytes, will partition that table's files into multiple partitions for
     // loading.
@@ -166,24 +172,23 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>,
         singleton.apply(
             "WritePartition",
             ParDo.of(
-                    new WritePartition(
-                        write.getJsonTableRef(),
-                        write.getTableDescription(),
+                    new WritePartition<>(
+                        singletonTable,
                         resultsView,
                         multiPartitionsTag,
                         singlePartitionTag))
                 .withSideInputs(resultsView)
                 .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
 
-    // Since BigQueryIO.java does not yet have support for per-table schemas, inject a constant
-    // schema function here. If no schema is specified, this function will return null.
-    // TODO: Turn this into a side-input instead.
-    SerializableFunction<TableDestination, TableSchema> schemaFunction =
-        new ConstantSchemaFunction(write.getJsonSchema());
+    List<PCollectionView<?>> writeTablesSideInputs =
+        Lists.newArrayList(jobIdTokenView, schemasView);
+    writeTablesSideInputs.addAll(dynamicDestinations.getSideInputs());
+
+    Coder<KV<ShardedKey<DestinationT>, List<String>>> partitionsCoder =
+          KvCoder.of(
+              ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
+              ListCoder.of(StringUtf8Coder.of()));
 
-    Coder<KV<ShardedKey<TableDestination>, List<String>>> partitionsCoder =
-        KvCoder.of(
-            ShardedKeyCoder.of(TableDestinationCoder.of()), ListCoder.of(StringUtf8Coder.of()));
     // If WriteBundlesToFiles produced more than MAX_NUM_FILES files or MAX_SIZE_BYTES bytes, then
     // the import needs to be split into multiple partitions, and those partitions will be
     // specified in multiPartitionsTag.
@@ -195,19 +200,20 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>,
             // reexecution of the WritePartitions step once WriteTables has begun.
             .apply(
                 "MultiPartitionsReshuffle",
-                Reshuffle.<ShardedKey<TableDestination>, List<String>>of())
+                Reshuffle.<ShardedKey<DestinationT>, List<String>>of())
             .apply(
                 "MultiPartitionsWriteTables",
                 ParDo.of(
-                        new WriteTables(
+                        new WriteTables<>(
                             false,
-                            write.getBigQueryServices(),
+                            bigQueryServices,
                             jobIdTokenView,
+                            schemasView,
                             tempFilePrefix,
                             WriteDisposition.WRITE_EMPTY,
                             CreateDisposition.CREATE_IF_NEEDED,
-                            schemaFunction))
-                    .withSideInputs(jobIdTokenView));
+                            dynamicDestinations))
+                    .withSideInputs(writeTablesSideInputs));
 
     // This view maps each final table destination to the set of temporary partitioned tables
     // the PCollection was loaded into.
@@ -218,10 +224,10 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>,
         "WriteRename",
         ParDo.of(
                 new WriteRename(
-                    write.getBigQueryServices(),
+                    bigQueryServices,
                     jobIdTokenView,
-                    write.getWriteDisposition(),
-                    write.getCreateDisposition(),
+                    writeDisposition,
+                    createDisposition,
                     tempTablesView))
             .withSideInputs(tempTablesView, jobIdTokenView));
 
@@ -232,19 +238,20 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>,
         // Reshuffle will distribute this among multiple workers, and also guard against
         // reexecution of the WritePartitions step once WriteTables has begun.
         .apply(
-            "SinglePartitionsReshuffle", Reshuffle.<ShardedKey<TableDestination>, List<String>>of())
+            "SinglePartitionsReshuffle", Reshuffle.<ShardedKey<DestinationT>, List<String>>of())
         .apply(
             "SinglePartitionWriteTables",
             ParDo.of(
-                    new WriteTables(
+                    new WriteTables<>(
                         true,
-                        write.getBigQueryServices(),
+                        bigQueryServices,
                         jobIdTokenView,
+                        schemasView,
                         tempFilePrefix,
-                        write.getWriteDisposition(),
-                        write.getCreateDisposition(),
-                        schemaFunction))
-                .withSideInputs(jobIdTokenView));
+                        writeDisposition,
+                        createDisposition,
+                        dynamicDestinations))
+                .withSideInputs(writeTablesSideInputs));
 
     return WriteResult.in(input.getPipeline());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 3850cbd..70e7a5f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -41,9 +41,7 @@ import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 
-/**
- * A set of helper functions and classes used by {@link BigQueryIO}.
- */
+/** A set of helper functions and classes used by {@link BigQueryIO}. */
 public class BigQueryHelpers {
   private static final String RESOURCE_NOT_FOUND_ERROR =
       "BigQuery %1$s not found for table \"%2$s\" . Please create the %1$s before pipeline"
@@ -55,9 +53,7 @@ public class BigQueryHelpers {
           + " an earlier stage of the pipeline, this validation can be disabled using"
           + " #withoutValidation.";
 
-  /**
-   * Status of a BigQuery job or request.
-   */
+  /** Status of a BigQuery job or request. */
   enum Status {
     SUCCEEDED,
     FAILED,
@@ -65,20 +61,15 @@ public class BigQueryHelpers {
   }
 
   @Nullable
-  /**
-   * Return a displayable string representation for a {@link TableReference}.
-   */
-  static ValueProvider<String> displayTable(
-      @Nullable ValueProvider<TableReference> table) {
+  /** Return a displayable string representation for a {@link TableReference}. */
+  static ValueProvider<String> displayTable(@Nullable ValueProvider<TableReference> table) {
     if (table == null) {
       return null;
     }
     return NestedValueProvider.of(table, new TableRefToTableSpec());
   }
 
-  /**
-   * Returns a canonical string representation of the {@link TableReference}.
-   */
+  /** Returns a canonical string representation of the {@link TableReference}. */
   public static String toTableSpec(TableReference ref) {
     StringBuilder sb = new StringBuilder();
     if (ref.getProjectId() != null) {
@@ -100,8 +91,8 @@ public class BigQueryHelpers {
   }
 
   /**
-   * Parse a table specification in the form
-   * {@code "[project_id]:[dataset_id].[table_id]"} or {@code "[dataset_id].[table_id]"}.
+   * Parse a table specification in the form {@code "[project_id]:[dataset_id].[table_id]"} or
+   * {@code "[dataset_id].[table_id]"}.
    *
    * <p>If the project id is omitted, the default project id is used.
    */
@@ -110,7 +101,8 @@ public class BigQueryHelpers {
     if (!match.matches()) {
       throw new IllegalArgumentException(
           "Table reference is not in [project_id]:[dataset_id].[table_id] "
-          + "format: " + tableSpec);
+              + "format: "
+              + tableSpec);
     }
 
     TableReference ref = new TableReference();
@@ -164,8 +156,7 @@ public class BigQueryHelpers {
       return BigQueryIO.JSON_FACTORY.fromString(json, clazz);
     } catch (IOException e) {
       throw new RuntimeException(
-          String.format("Cannot deserialize %s from a JSON string: %s.", clazz, json),
-          e);
+          String.format("Cannot deserialize %s from a JSON string: %s.", clazz, json), e);
     }
   }
 
@@ -178,9 +169,7 @@ public class BigQueryHelpers {
     return UUID.randomUUID().toString().replaceAll("-", "");
   }
 
-  static void verifyTableNotExistOrEmpty(
-      DatasetService datasetService,
-      TableReference tableRef) {
+  static void verifyTableNotExistOrEmpty(DatasetService datasetService, TableReference tableRef) {
     try {
       if (datasetService.getTable(tableRef) != null) {
         checkState(
@@ -193,8 +182,7 @@ public class BigQueryHelpers {
         Thread.currentThread().interrupt();
       }
       throw new RuntimeException(
-          "unable to confirm BigQuery table emptiness for table "
-              + toTableSpec(tableRef), e);
+          "unable to confirm BigQuery table emptiness for table " + toTableSpec(tableRef), e);
     }
   }
 
@@ -206,12 +194,12 @@ public class BigQueryHelpers {
       if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) {
         throw new IllegalArgumentException(
             String.format(RESOURCE_NOT_FOUND_ERROR, "dataset", toTableSpec(table)), e);
-      } else if (e instanceof  RuntimeException) {
+      } else if (e instanceof RuntimeException) {
         throw (RuntimeException) e;
       } else {
         throw new RuntimeException(
-            String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "dataset",
-                toTableSpec(table)),
+            String.format(
+                UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "dataset", toTableSpec(table)),
             e);
       }
     }
@@ -225,12 +213,13 @@ public class BigQueryHelpers {
       if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) {
         throw new IllegalArgumentException(
             String.format(RESOURCE_NOT_FOUND_ERROR, "table", toTableSpec(table)), e);
-      } else if (e instanceof  RuntimeException) {
+      } else if (e instanceof RuntimeException) {
         throw (RuntimeException) e;
       } else {
         throw new RuntimeException(
-            String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "table",
-                toTableSpec(table)), e);
+            String.format(
+                UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "table", toTableSpec(table)),
+            e);
       }
     }
   }
@@ -248,40 +237,42 @@ public class BigQueryHelpers {
   }
 
   @VisibleForTesting
-  static class JsonSchemaToTableSchema
-      implements SerializableFunction<String, TableSchema> {
+  static class JsonSchemaToTableSchema implements SerializableFunction<String, TableSchema> {
     @Override
     public TableSchema apply(String from) {
       return fromJsonString(from, TableSchema.class);
     }
   }
 
-  static class TableSchemaToJsonSchema
-      implements SerializableFunction<TableSchema, String> {
+  static class TableSchemaToJsonSchema implements SerializableFunction<TableSchema, String> {
     @Override
     public String apply(TableSchema from) {
       return toJsonString(from);
     }
   }
 
-  static class JsonTableRefToTableRef
-      implements SerializableFunction<String, TableReference> {
+  static class JsonTableRefToTableRef implements SerializableFunction<String, TableReference> {
     @Override
     public TableReference apply(String from) {
       return fromJsonString(from, TableReference.class);
     }
   }
 
-  static class TableRefToTableSpec
-      implements SerializableFunction<TableReference, String> {
+  static class JsonTableRefToTableSpec implements SerializableFunction<String, String> {
+    @Override
+    public String apply(String from) {
+      return toTableSpec(fromJsonString(from, TableReference.class));
+    }
+  }
+
+  static class TableRefToTableSpec implements SerializableFunction<TableReference, String> {
     @Override
     public String apply(TableReference from) {
       return toTableSpec(from);
     }
   }
 
-  static class TableRefToJson
-      implements SerializableFunction<TableReference, String> {
+  static class TableRefToJson implements SerializableFunction<TableReference, String> {
     @Override
     public String apply(TableReference from) {
       return toJsonString(from);
@@ -289,8 +280,7 @@ public class BigQueryHelpers {
   }
 
   @VisibleForTesting
-  static class TableSpecToTableRef
-      implements SerializableFunction<String, TableReference> {
+  static class TableSpecToTableRef implements SerializableFunction<String, TableReference> {
     @Override
     public TableReference apply(String from) {
       return parseTableSpec(from);

http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 2ff5cd7..29491d8 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -32,15 +32,18 @@ import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicates;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
-
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
-
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
@@ -52,6 +55,9 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSche
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSpecToTableRef;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
+import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantSchemaDestinations;
+import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.SchemaFromViewDestinations;
+import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.TableFunctionDestinations;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -68,6 +74,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,12 +93,11 @@ import org.slf4j.LoggerFactory;
  *   <li>{@code tableId}: a table id, unique within a dataset.
  * </ul>
  *
- * <p>BigQuery table references are stored as a {@link TableReference}, which comes
- * from the <a href="https://cloud.google.com/bigquery/client-libraries">
- * BigQuery Java Client API</a>.
- * Tables can be referred to as Strings, with or without the {@code projectId}.
- * A helper function is provided ({@link BigQueryHelpers#parseTableSpec(String)})
- * that parses the following string forms into a {@link TableReference}:
+ * <p>BigQuery table references are stored as a {@link TableReference}, which comes from the <a
+ * href="https://cloud.google.com/bigquery/client-libraries">BigQuery Java Client API</a>. Tables
+ * can be referred to as Strings, with or without the {@code projectId}. A helper function is
+ * provided ({@link BigQueryHelpers#parseTableSpec(String)}) that parses the following string forms
+ * into a {@link TableReference}:
  *
  * <ul>
  *   <li>[{@code project_id}]:[{@code dataset_id}].[{@code table_id}]
@@ -102,6 +108,7 @@ import org.slf4j.LoggerFactory;
  *
  * <p>To read from a BigQuery table, apply a {@link BigQueryIO.Read} transformation.
  * This produces a {@link PCollection} of {@link TableRow TableRows} as output:
+ *
  * <pre>{@code
  * PCollection<TableRow> weatherData = pipeline.apply(
  *     BigQueryIO.read().from("clouddataflow-readonly:samples.weather_stations"));
@@ -129,6 +136,7 @@ import org.slf4j.LoggerFactory;
  * {@link BigQueryIO#write()}. When using a user-defined type, a function must be provided to
  * turn this type into a {@link TableRow} using
  * {@link BigQueryIO.Write#withFormatFunction(SerializableFunction)}.
+ *
  * <pre>{@code
  * PCollection<TableRow> quotes = ...
  *
@@ -143,19 +151,18 @@ import org.slf4j.LoggerFactory;
  *     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
  * }</pre>
  *
- * <p>See {@link BigQueryIO.Write} for details on how to specify if a write should
- * append to an existing table, replace the table, or verify that the table is
- * empty. Note that the dataset being written to must already exist. Unbounded PCollections can only
- * be written using {@link Write.WriteDisposition#WRITE_EMPTY} or
- * {@link Write.WriteDisposition#WRITE_APPEND}.
+ * <p>See {@link BigQueryIO.Write} for details on how to specify if a write should append to an
+ * existing table, replace the table, or verify that the table is empty. Note that the dataset being
+ * written to must already exist. Unbounded PCollections can only be written using {@link
+ * Write.WriteDisposition#WRITE_EMPTY} or {@link Write.WriteDisposition#WRITE_APPEND}.
  *
  * <h3>Sharding BigQuery output tables</h3>
  *
- * <p>A common use case is to dynamically generate BigQuery table names based on
- * the current window or the current value. To support this,
- * {@link BigQueryIO.Write#to(SerializableFunction)}
- * accepts a function mapping the current element to a tablespec. For example,
- * here's code that outputs daily tables to BigQuery:
+ * <p>A common use case is to dynamically generate BigQuery table names based on the current window
+ * or the current value. To support this, {@link BigQueryIO.Write#to(SerializableFunction)} accepts
+ * a function mapping the current element to a tablespec. For example, here's code that outputs
+ * daily tables to BigQuery:
+ *
  * <pre>{@code
  * PCollection<TableRow> quotes = ...
  * quotes.apply(Window.<TableRow>into(CalendarWindows.days(1)))
@@ -173,20 +180,31 @@ import org.slf4j.LoggerFactory;
  * }</pre>
  *
  * <p>Note that this also allows the table to be a function of the element as well as the current
- * pane, in the case of triggered windows. In this case it might be convenient to call
- * {@link BigQueryIO#write()} directly instead of using the {@link BigQueryIO#writeTableRows()}
- * helper. This will allow the mapping function to access the element of the user-defined type.
- * In this case, a formatting function must be specified using
- * {@link BigQueryIO.Write#withFormatFunction} to convert each element into a {@link TableRow}
- * object.
+ * pane, in the case of triggered windows. In this case it might be convenient to call {@link
+ * BigQueryIO#write()} directly instead of using the {@link BigQueryIO#writeTableRows()} helper.
+ * This will allow the mapping function to access the element of the user-defined type. In this
+ * case, a formatting function must be specified using {@link BigQueryIO.Write#withFormatFunction}
+ * to convert each element into a {@link TableRow} object.
  *
- * <p>Per-value tables currently do not perform well in batch mode.
+ * <p>Per-table schemas can also be provided using {@link BigQueryIO.Write#withSchemaFromView}. This
+ * allows you the schemas to be calculated based on a previous pipeline stage or statically via a
+ * {@link org.apache.beam.sdk.transforms.Create} transform. This method expects to receive a
+ * map-valued {@link PCollectionView}, mapping table specifications (project:dataset.table-id), to
+ * JSON formatted {@link TableSchema} objects. All destination tables must be present in this map,
+ * or the pipeline will fail to create tables. Care should be taken if the map value is based on a
+ * triggered aggregation over and unbounded {@link PCollection}; the side input will contain the
+ * entire history of all table schemas ever generated, which might blow up memory usage. This method
+ * can also be useful when writing to a single table, as it allows a previous stage to calculate the
+ * schema (possibly based on the full collection of records being written to BigQuery).
+ *
+ * <p>For the most general form of dynamic table destinations and schemas, look at
+ * {@link BigQueryIO.Write#to(DynamicDestinations)}.
  *
  * <h3>Permissions</h3>
  *
  * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
- * pipeline. Please refer to the documentation of corresponding {@link PipelineRunner}s for
- * more details.
+ * pipeline. Please refer to the documentation of corresponding {@link PipelineRunner}s for more
+ * details.
  *
  * <p>Please see <a href="https://cloud.google.com/bigquery/access-control">BigQuery Access Control
  * </a> for security and permission related information specific to BigQuery.
@@ -195,8 +213,7 @@ public class BigQueryIO {
   private static final Logger LOG = LoggerFactory.getLogger(BigQueryIO.class);
 
   /**
-   * Singleton instance of the JSON factory used to read and write JSON
-   * formatted rows.
+   * Singleton instance of the JSON factory used to read and write JSON formatted rows.
    */
   static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();
 
@@ -232,13 +249,13 @@ public class BigQueryIO {
    * A formatting function that maps a TableRow to itself. This allows sending a
    * {@code PCollection<TableRow>} directly to BigQueryIO.Write.
    */
-   static final SerializableFunction<TableRow, TableRow> IDENTITY_FORMATTER =
+  static final SerializableFunction<TableRow, TableRow> IDENTITY_FORMATTER =
       new SerializableFunction<TableRow, TableRow>() {
-    @Override
-    public TableRow apply(TableRow input) {
-      return input;
-    }
-  };
+        @Override
+        public TableRow apply(TableRow input) {
+          return input;
+        }
+      };
 
   /**
    * A {@link PTransform} that reads from a BigQuery table and returns a
@@ -246,6 +263,7 @@ public class BigQueryIO {
    *
    * <p>Each {@link TableRow} contains values indexed by column name. Here is a
    * sample processing function that processes a "line" column from rows:
+   *
    * <pre>{@code
    * static class ExtractWordsFn extends DoFn<TableRow, String> {
    *   public void processElement(ProcessContext c) {
@@ -258,7 +276,8 @@ public class BigQueryIO {
    *       }
    *     }
    *   }
-   * }}</pre>
+   * }
+   * }</pre>
    */
   public static Read read() {
     return new AutoValue_BigQueryIO_Read.Builder()
@@ -276,7 +295,6 @@ public class BigQueryIO {
     @Nullable abstract Boolean getFlattenResults();
     @Nullable abstract Boolean getUseLegacySql();
     abstract BigQueryServices getBigQueryServices();
-
     abstract Builder toBuilder();
 
     @AutoValue.Builder
@@ -287,7 +305,6 @@ public class BigQueryIO {
       abstract Builder setFlattenResults(Boolean flattenResults);
       abstract Builder setUseLegacySql(Boolean useLegacySql);
       abstract Builder setBigQueryServices(BigQueryServices bigQueryServices);
-
       abstract Read build();
     }
 
@@ -305,28 +322,26 @@ public class BigQueryIO {
       return from(StaticValueProvider.of(tableSpec));
     }
 
-    /**
-     * Same as {@code from(String)}, but with a {@link ValueProvider}.
-     */
+    /** Same as {@code from(String)}, but with a {@link ValueProvider}. */
     public Read from(ValueProvider<String> tableSpec) {
       ensureFromNotCalledYet();
       return toBuilder()
           .setJsonTableRef(
               NestedValueProvider.of(
                   NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
-                  new TableRefToJson())).build();
+                  new TableRefToJson()))
+          .build();
     }
 
     /**
      * Reads results received after executing the given query.
      *
-     * <p>By default, the query results will be flattened -- see
-     * "flattenResults" in the <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs">
-     * Jobs documentation</a> for more information.  To disable flattening, use
-     * {@link BigQueryIO.Read#withoutResultFlattening}.
+     * <p>By default, the query results will be flattened -- see "flattenResults" in the <a
+     * href="https://cloud.google.com/bigquery/docs/reference/v2/jobs">Jobs documentation</a> for
+     * more information. To disable flattening, use {@link BigQueryIO.Read#withoutResultFlattening}.
      *
-     * <p>By default, the query will use BigQuery's legacy SQL dialect. To use the BigQuery
-     * Standard SQL dialect, use {@link BigQueryIO.Read#usingStandardSql}.
+     * <p>By default, the query will use BigQuery's legacy SQL dialect. To use the BigQuery Standard
+     * SQL dialect, use {@link BigQueryIO.Read#usingStandardSql}.
      */
     public Read fromQuery(String query) {
       return fromQuery(StaticValueProvider.of(query));
@@ -352,17 +367,16 @@ public class BigQueryIO {
         + " pipeline, This validation can be disabled using #withoutValidation.";
 
     /**
-     * Disable validation that the table exists or the query succeeds prior to pipeline
-     * submission. Basic validation (such as ensuring that a query or table is specified) still
-     * occurs.
+     * Disable validation that the table exists or the query succeeds prior to pipeline submission.
+     * Basic validation (such as ensuring that a query or table is specified) still occurs.
      */
     public Read withoutValidation() {
       return toBuilder().setValidate(false).build();
     }
 
     /**
-     * Disable <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs">
-     * flattening of query results</a>.
+     * Disable <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs">flattening of
+     * query results</a>.
      *
      * <p>Only valid when a query is used ({@link #fromQuery}). Setting this option when reading
      * from a table will cause an error during validation.
@@ -441,8 +455,8 @@ public class BigQueryIO {
       // Note that a table or query check can fail if the table or dataset are created by
       // earlier stages of the pipeline or if a query depends on earlier stages of a pipeline.
       // For these cases the withoutValidation method can be used to disable the check.
-      if (getValidate() && table != null && table.isAccessible() && table.get().getProjectId()
-          != null) {
+      if (getValidate() && table != null && table.isAccessible()
+          && table.get().getProjectId() != null) {
         checkState(table.isAccessible(), "Cannot call validate if table is dynamically set.");
         // Check for source table presence for early failure notification.
         DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions);
@@ -543,15 +557,15 @@ public class BigQueryIO {
       super.populateDisplayData(builder);
       builder
           .addIfNotNull(DisplayData.item("table", BigQueryHelpers.displayTable(getTableProvider()))
-            .withLabel("Table"))
+                  .withLabel("Table"))
           .addIfNotNull(DisplayData.item("query", getQuery())
-            .withLabel("Query"))
+              .withLabel("Query"))
           .addIfNotNull(DisplayData.item("flattenResults", getFlattenResults())
-            .withLabel("Flatten Query Results"))
+                  .withLabel("Flatten Query Results"))
           .addIfNotNull(DisplayData.item("useLegacySql", getUseLegacySql())
-            .withLabel("Use Legacy SQL Dialect"))
+                  .withLabel("Use Legacy SQL Dialect"))
           .addIfNotDefault(DisplayData.item("validation", getValidate())
-            .withLabel("Validation Enabled"),
+              .withLabel("Validation Enabled"),
               true);
     }
 
@@ -563,9 +577,7 @@ public class BigQueryIO {
       return getJsonTableRef() == null
           ? null : NestedValueProvider.of(getJsonTableRef(), new JsonTableRefToTableRef());
     }
-    /**
-     * Returns the table to read, or {@code null} if reading from a query instead.
-     */
+    /** Returns the table to read, or {@code null} if reading from a query instead. */
     @Nullable
     public TableReference getTable() {
       ValueProvider<TableReference> provider = getTableProvider();
@@ -583,9 +595,10 @@ public class BigQueryIO {
     List<Long> counts = jobStats.getExtract().getDestinationUriFileCounts();
     if (counts.size() != 1) {
       String errorMessage = (counts.size() == 0
-          ? "No destination uri file count received."
-          : String.format("More than one destination uri file count received. First two are %s, %s",
-              counts.get(0), counts.get(1)));
+              ? "No destination uri file count received."
+              : String.format(
+                  "More than one destination uri file count received. First two are %s, %s",
+                  counts.get(0), counts.get(1)));
       throw new RuntimeException(errorMessage);
     }
     long filesCount = counts.get(0);
@@ -616,11 +629,12 @@ public class BigQueryIO {
    * or else the transform may fail at runtime with an {@link IllegalArgumentException}.
    *
    * <p>By default, writes require an empty table, which corresponds to
-   * a {@link Write.WriteDisposition#WRITE_EMPTY} disposition that matches the
-   * default of BigQuery's Jobs API.
+   * a {@link Write.WriteDisposition#WRITE_EMPTY} disposition that matches the default of
+   * BigQuery's Jobs API.
+   *
+   * <p>Here is a sample transform that produces TableRow values containing "word" and "count"
+   * columns:
    *
-   * <p>Here is a sample transform that produces TableRow values containing
-   * "word" and "count" columns:
    * <pre>{@code
    * static class FormatCountsFn extends DoFn<KV<String, Long>, TableRow> {
    *   public void processElement(ProcessContext c) {
@@ -629,7 +643,8 @@ public class BigQueryIO {
    *         .set("count", c.element().getValue().intValue());
    *     c.output(row);
    *   }
-   * }}</pre>
+   * }
+   * }</pre>
    */
   public static <T> Write<T> write() {
     return new AutoValue_BigQueryIO_Write.Builder<T>()
@@ -642,8 +657,8 @@ public class BigQueryIO {
   }
 
   /**
-   * A {@link PTransform} that writes a {@link PCollection} containing {@link TableRow TableRows}
-   * to a BigQuery table.
+   * A {@link PTransform} that writes a {@link PCollection} containing {@link TableRow TableRows} to
+   * a BigQuery table.
    */
   public static Write<TableRow> writeTableRows() {
     return BigQueryIO.<TableRow>write().withFormatFunction(IDENTITY_FORMATTER);
@@ -671,7 +686,8 @@ public class BigQueryIO {
     @Nullable abstract SerializableFunction<ValueInSingleWindow<T>, TableDestination>
       getTableFunction();
     @Nullable abstract SerializableFunction<T, TableRow> getFormatFunction();
-    /** Table schema. The schema is required only if the table does not exist. */
+    @Nullable abstract DynamicDestinations<T, ?> getDynamicDestinations();
+    @Nullable abstract PCollectionView<Map<String, String>> getSchemaFromView();
     @Nullable abstract ValueProvider<String> getJsonSchema();
     abstract CreateDisposition getCreateDisposition();
     abstract WriteDisposition getWriteDisposition();
@@ -688,8 +704,9 @@ public class BigQueryIO {
       abstract Builder<T> setJsonTableRef(ValueProvider<String> jsonTableRef);
       abstract Builder<T> setTableFunction(
           SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction);
-      abstract Builder<T> setFormatFunction(
-          SerializableFunction<T, TableRow> formatFunction);
+      abstract Builder<T> setFormatFunction(SerializableFunction<T, TableRow> formatFunction);
+      abstract Builder<T> setDynamicDestinations(DynamicDestinations<T, ?> dynamicDestinations);
+      abstract Builder<T> setSchemaFromView(PCollectionView<Map<String, String>> view);
       abstract Builder<T> setJsonSchema(ValueProvider<String> jsonSchema);
       abstract Builder<T> setCreateDisposition(CreateDisposition createDisposition);
       abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
@@ -703,8 +720,9 @@ public class BigQueryIO {
     /**
      * An enumeration type for the BigQuery create disposition strings.
      *
-     * @see <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.createDisposition">
-     * <code>configuration.query.createDisposition</code> in the BigQuery Jobs API</a>
+     * @see
+     * <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.createDisposition">
+     *     <code>configuration.query.createDisposition</code> in the BigQuery Jobs API</a>
      */
     public enum CreateDisposition {
       /**
@@ -715,17 +733,15 @@ public class BigQueryIO {
       CREATE_NEVER,
 
       /**
-       * Specifies that tables should be created if needed. This is the default
-       * behavior.
+       * Specifies that tables should be created if needed. This is the default behavior.
        *
        * <p>Requires that a table schema is provided via {@link BigQueryIO.Write#withSchema}.
-       * This precondition is checked before starting a job. The schema is
-       * not required to match an existing table's schema.
+       * This precondition is checked before starting a job. The schema is not required to match an
+       * existing table's schema.
        *
-       * <p>When this transformation is executed, if the output table does not
-       * exist, the table is created from the provided schema. Note that even if
-       * the table exists, it may be recreated if necessary when paired with a
-       * {@link WriteDisposition#WRITE_TRUNCATE}.
+       * <p>When this transformation is executed, if the output table does not exist, the table is
+       * created from the provided schema. Note that even if the table exists, it may be recreated
+       * if necessary when paired with a {@link WriteDisposition#WRITE_TRUNCATE}.
        */
       CREATE_IF_NEEDED
     }
@@ -733,50 +749,39 @@ public class BigQueryIO {
     /**
      * An enumeration type for the BigQuery write disposition strings.
      *
-     * @see <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.writeDisposition">
-     * <code>configuration.query.writeDisposition</code> in the BigQuery Jobs API</a>
+     * @see <a
+     *     href="https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.writeDisposition">
+     *     <code>configuration.query.writeDisposition</code> in the BigQuery Jobs API</a>
      */
     public enum WriteDisposition {
       /**
        * Specifies that write should replace a table.
        *
-       * <p>The replacement may occur in multiple steps - for instance by first
-       * removing the existing table, then creating a replacement, then filling
-       * it in. This is not an atomic operation, and external programs may
-       * see the table in any of these intermediate steps.
+       * <p>The replacement may occur in multiple steps - for instance by first removing the
+       * existing table, then creating a replacement, then filling it in. This is not an atomic
+       * operation, and external programs may see the table in any of these intermediate steps.
        */
       WRITE_TRUNCATE,
 
-      /**
-       * Specifies that rows may be appended to an existing table.
-       */
+      /** Specifies that rows may be appended to an existing table. */
       WRITE_APPEND,
 
       /**
-       * Specifies that the output table must be empty. This is the default
-       * behavior.
+       * Specifies that the output table must be empty. This is the default behavior.
        *
        * <p>If the output table is not empty, the write fails at runtime.
        *
-       * <p>This check may occur long before data is written, and does not
-       * guarantee exclusive access to the table. If two programs are run
-       * concurrently, each specifying the same output table and
-       * a {@link WriteDisposition} of {@link WriteDisposition#WRITE_EMPTY}, it is possible
-       * for both to succeed.
+       * <p>This check may occur long before data is written, and does not guarantee exclusive
+       * access to the table. If two programs are run concurrently, each specifying the same output
+       * table and a {@link WriteDisposition} of {@link WriteDisposition#WRITE_EMPTY}, it is
+       * possible for both to succeed.
        */
       WRITE_EMPTY
     }
 
-    /** Ensures that methods of the to() family are called at most once. */
-    private void ensureToNotCalledYet() {
-      checkState(
-          getJsonTableRef() == null && getTable() == null
-              && getTableFunction() == null, "to() already called");
-    }
-
     /**
-     * Writes to the given table, specified in the format described in
-     * {@link BigQueryHelpers#parseTableSpec}.
+     * Writes to the given table, specified in the format described in {@link
+     * BigQueryHelpers#parseTableSpec}.
      */
     public Write<T> to(String tableSpec) {
       return to(StaticValueProvider.of(tableSpec));
@@ -789,14 +794,11 @@ public class BigQueryIO {
 
     /** Same as {@link #to(String)}, but with a {@link ValueProvider}. */
     public Write<T> to(ValueProvider<String> tableSpec) {
-      ensureToNotCalledYet();
-      String tableDescription = getTableDescription();
       return toBuilder()
           .setJsonTableRef(
               NestedValueProvider.of(
                   NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
                   new TableRefToJson()))
-          .setTableFunction(new ConstantTableFunction<T>(tableSpec, tableDescription))
           .build();
     }
 
@@ -806,18 +808,14 @@ public class BigQueryIO {
      */
     public Write<T> to(
         SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction) {
-      ensureToNotCalledYet();
       return toBuilder().setTableFunction(tableFunction).build();
     }
 
     /**
-     * Like {@link BigQueryIO.Write#to(SerializableFunction)}, but the function returns a
-     * {@link TableReference} instead of a string table specification.
+     * Writes to the table and schema specified by the {@link DynamicDestinations} object.
      */
-    private Write<T> toTableReference(
-        SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction) {
-      ensureToNotCalledYet();
-      return toBuilder().setTableFunction(tableFunction).build();
+    public Write<T> to(DynamicDestinations<T, ?> dynamicDestinations) {
+      return toBuilder().setDynamicDestinations(dynamicDestinations).build();
     }
 
     /**
@@ -827,42 +825,45 @@ public class BigQueryIO {
       return toBuilder().setFormatFunction(formatFunction).build();
     }
 
-    static class ConstantTableFunction<T> implements
-        SerializableFunction<ValueInSingleWindow<T>, TableDestination> {
-      private final ValueProvider<String> tableSpec;
-      private final String tableDescription;
+    /**
+     * Uses the specified schema for rows to be written.
+     *
+     * <p>The schema is <i>required</i> only if writing to a table that does not already exist, and
+     * {@link CreateDisposition} is set to {@link CreateDisposition#CREATE_IF_NEEDED}.
+     */
+    public Write<T> withSchema(TableSchema schema) {
+      return withJsonSchema(StaticValueProvider.of(BigQueryHelpers.toJsonString(schema)));
+    }
 
-      ConstantTableFunction(ValueProvider<String> tableSpec, String tableDescription) {
-        this.tableSpec = tableSpec;
-        this.tableDescription = tableDescription;
-      }
+    /** Same as {@link #withSchema(TableSchema)} but using a deferred {@link ValueProvider}. */
+    public Write<T> withSchema(ValueProvider<TableSchema> schema) {
+      return withJsonSchema(NestedValueProvider.of(schema, new TableSchemaToJsonSchema()));
+    }
 
-      @Override
-      public TableDestination apply(ValueInSingleWindow<T> value) {
-        return new TableDestination(tableSpec.get(), tableDescription);
-      }
+    /**
+     * Similar to {@link #withSchema(TableSchema)} but takes in a JSON-serialized {@link
+     * TableSchema}.
+     */
+    public Write<T> withJsonSchema(String jsonSchema) {
+      return withJsonSchema(StaticValueProvider.of(jsonSchema));
     }
 
     /**
-     * Uses the specified schema for rows to be written.
-     *
-     * <p>The schema is <i>required</i> only if writing to a table that does not already
-     * exist, and {@link CreateDisposition} is set to
-     * {@link CreateDisposition#CREATE_IF_NEEDED}.
+     * Same as {@link #withJsonSchema(String)} but using a deferred {@link ValueProvider}.
      */
-    public Write<T> withSchema(TableSchema schema) {
-      return toBuilder()
-          .setJsonSchema(StaticValueProvider.of(BigQueryHelpers.toJsonString(schema)))
-          .build();
+    public Write<T> withJsonSchema(ValueProvider<String> jsonSchema) {
+      return toBuilder().setJsonSchema(jsonSchema).build();
     }
 
     /**
-     * Use the specified schema for rows to be written.
+     * Allows the schemas for each table to be computed within the pipeline itself.
+     *
+     * <p>The input is a map-valued {@link PCollectionView} mapping string tablespecs to
+     * JSON-formatted {@link TableSchema}s. Tablespecs must be in the same format as taken by
+     * {@link #to(String)}.
      */
-    public Write<T> withSchema(ValueProvider<TableSchema> schema) {
-      return toBuilder()
-          .setJsonSchema(NestedValueProvider.of(schema, new TableSchemaToJsonSchema()))
-          .build();
+    public Write<T> withSchemaFromView(PCollectionView<Map<String, String>> view) {
+      return toBuilder().setSchemaFromView(view).build();
     }
 
     /** Specifies whether the table should be created if it does not exist. */
@@ -895,23 +896,38 @@ public class BigQueryIO {
       BigQueryOptions options = pipelineOptions.as(BigQueryOptions.class);
 
       // We must have a destination to write to!
-      checkState(getTableFunction() != null,
+      checkState(
+          getTableFunction() != null || getJsonTableRef() != null
+              || getDynamicDestinations() != null,
           "must set the table reference of a BigQueryIO.Write transform");
 
       checkArgument(getFormatFunction() != null,
-                    "A function must be provided to convert type into a TableRow. "
-      + "use BigQueryIO.Write.withFormatFunction to provide a formatting function.");
+          "A function must be provided to convert type into a TableRow. "
+              + "use BigQueryIO.Write.withFormatFunction to provide a formatting function.");
 
       // Require a schema if creating one or more tables.
-      checkArgument(
-          getCreateDisposition() != CreateDisposition.CREATE_IF_NEEDED || getJsonSchema() != null,
+      checkArgument(getCreateDisposition() != CreateDisposition.CREATE_IF_NEEDED
+              || getJsonSchema() != null
+              || getDynamicDestinations() != null
+              || getSchemaFromView() != null,
           "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
 
+      List<?> allToArgs = Lists.newArrayList(getJsonTableRef(), getTableFunction(),
+          getDynamicDestinations());
+      checkArgument(1
+              == Iterables.size(Iterables.filter(allToArgs, Predicates.notNull())),
+          "Exactly one of jsonTableRef, tableFunction, or " + "dynamicDestinations must be set");
+
+      List<?> allSchemaArgs = Lists.newArrayList(getJsonSchema(), getSchemaFromView(),
+          getDynamicDestinations());
+      checkArgument(2
+              > Iterables.size(Iterables.filter(allSchemaArgs, Predicates.notNull())),
+          "No more than one of jsonSchema, schemaFromView, or dynamicDestinations may "
+          + "be set");
+
       // The user specified a table.
-      if (getJsonTableRef() != null && getValidate()) {
+      if (getJsonTableRef() != null && getJsonTableRef().isAccessible() && getValidate()) {
         TableReference table = getTableWithDefaultProject(options).get();
-        // TODO: This seems wrong - what if the ValueProvider is not accessible?
-
         DatasetService datasetService = getBigQueryServices().getDatasetService(options);
         // Check for destination table presence and emptiness for early failure notification.
         // Note that a presence check can fail when the table or dataset is created by an earlier
@@ -929,26 +945,66 @@ public class BigQueryIO {
 
     @Override
     public WriteResult expand(PCollection<T> input) {
-
       validate(input.getPipeline().getOptions());
 
-      PCollection<KV<TableDestination, TableRow>> rowsWithDestination =
-          input.apply("PrepareWrite", new PrepareWrite<T>(
-              getTableFunction(), getFormatFunction()))
-              .setCoder(KvCoder.of(TableDestinationCoder.of(), TableRowJsonCoder.of()));
+      DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations();
+      if (dynamicDestinations == null) {
+        if (getJsonTableRef() != null) {
+          dynamicDestinations =
+              DynamicDestinationsHelpers.ConstantTableDestinations.fromJsonTableRef(
+                  getJsonTableRef(), getTableDescription());
+        } else if (getTableFunction() != null) {
+          dynamicDestinations = new TableFunctionDestinations(getTableFunction());
+        }
 
+        // Wrap with a DynamicDestinations class that will provide a schema. There might be no
+        // schema provided if the create disposition is CREATE_NEVER.
+        if (getJsonSchema() != null) {
+          dynamicDestinations =
+              new ConstantSchemaDestinations(dynamicDestinations, getJsonSchema());
+        } else if (getSchemaFromView() != null) {
+          dynamicDestinations =
+              new SchemaFromViewDestinations(dynamicDestinations, getSchemaFromView());
+        }
+      }
+      return expandTyped(input, dynamicDestinations);
+    }
 
-      // When writing an Unbounded PCollection, or when a tablespec function is defined, we use
-      // StreamingInserts and BigQuery's streaming import API.
+    private <DestinationT> WriteResult expandTyped(
+        PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) {
+      Coder<DestinationT> destinationCoder = null;
+      try {
+        destinationCoder = dynamicDestinations.getDestinationCoderWithDefault(
+            input.getPipeline().getCoderRegistry());
+      } catch (CannotProvideCoderException e) {
+          throw new RuntimeException(e);
+      }
+
+      PCollection<KV<DestinationT, TableRow>> rowsWithDestination =
+          input
+              .apply("PrepareWrite", new PrepareWrite<>(dynamicDestinations, getFormatFunction()))
+              .setCoder(KvCoder.of(destinationCoder, TableRowJsonCoder.of()));
+
+      // When writing an Unbounded PCollection, we use StreamingInserts and BigQuery's streaming
+      // import API.
       if (input.isBounded() == IsBounded.UNBOUNDED) {
         checkArgument(
             getWriteDisposition() != WriteDisposition.WRITE_TRUNCATE,
             "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded"
                 + " PCollection.");
-
-        return rowsWithDestination.apply(new StreamingInserts(this));
+        StreamingInserts<DestinationT> streamingInserts =
+            new StreamingInserts<>(getCreateDisposition(), dynamicDestinations);
+        streamingInserts.setTestServices(getBigQueryServices());
+        return rowsWithDestination.apply(streamingInserts);
       } else {
-        return rowsWithDestination.apply(new BatchLoads(this));
+        BatchLoads<DestinationT> batchLoads = new BatchLoads<>(
+            getWriteDisposition(),
+            getCreateDisposition(),
+            getJsonTableRef() != null,
+            dynamicDestinations,
+            destinationCoder);
+        batchLoads.setTestServices(getBigQueryServices());
+        return rowsWithDestination.apply(batchLoads);
       }
     }
 
@@ -961,32 +1017,28 @@ public class BigQueryIO {
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
 
-      builder
-          .addIfNotNull(DisplayData.item("table", getJsonTableRef())
-            .withLabel("Table Reference"))
-          .addIfNotNull(DisplayData.item("schema", getJsonSchema())
-            .withLabel("Table Schema"));
+      builder.addIfNotNull(DisplayData.item("table", getJsonTableRef())
+          .withLabel("Table Reference"));
+      if (getJsonSchema() != null) {
+        builder.addIfNotNull(DisplayData.item("schema", getJsonSchema()).withLabel("Table Schema"));
+      } else {
+        builder.add(DisplayData.item("schema", "Custom Schema Function").withLabel("Table Schema"));
+      }
 
       if (getTableFunction() != null) {
         builder.add(DisplayData.item("tableFn", getTableFunction().getClass())
-          .withLabel("Table Reference Function"));
+                .withLabel("Table Reference Function"));
       }
 
       builder
           .add(DisplayData.item("createDisposition", getCreateDisposition().toString())
-            .withLabel("Table CreateDisposition"))
+                  .withLabel("Table CreateDisposition"))
           .add(DisplayData.item("writeDisposition", getWriteDisposition().toString())
-            .withLabel("Table WriteDisposition"))
+                  .withLabel("Table WriteDisposition"))
           .addIfNotDefault(DisplayData.item("validation", getValidate())
-            .withLabel("Validation Enabled"), true)
+              .withLabel("Validation Enabled"), true)
           .addIfNotDefault(DisplayData.item("tableDescription", getTableDescription())
-            .withLabel("Table Description"), "");
-    }
-
-    /** Returns the table schema. */
-    public TableSchema getSchema() {
-      return BigQueryHelpers.fromJsonString(
-          getJsonSchema() == null ? null : getJsonSchema().get(), TableSchema.class);
+                  .withLabel("Table Description"), "");
     }
 
     /**
@@ -994,7 +1046,8 @@ public class BigQueryIO {
      *
      * <p>If the table's project is not specified, use the executing project.
      */
-    @Nullable ValueProvider<TableReference> getTableWithDefaultProject(BigQueryOptions bqOptions) {
+    @Nullable
+    ValueProvider<TableReference> getTableWithDefaultProject(BigQueryOptions bqOptions) {
       ValueProvider<TableReference> table = getTable();
       if (table == null) {
         return table;
@@ -1002,7 +1055,7 @@ public class BigQueryIO {
 
       if (!table.isAccessible()) {
         LOG.info("Using a dynamic value for table input. This must contain a project"
-            + " in the table reference: {}", table);
+                + " in the table reference: {}", table);
         return table;
       }
       if (Strings.isNullOrEmpty(table.get().getProjectId())) {
@@ -1022,8 +1075,6 @@ public class BigQueryIO {
       return getJsonTableRef() == null ? null :
           NestedValueProvider.of(getJsonTableRef(), new JsonTableRefToTableRef());
     }
-
-
   }
 
   /**
@@ -1038,5 +1089,4 @@ public class BigQueryIO {
 
   /** Disallow construction of utility class. */
   private BigQueryIO() {}
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CalculateSchemas.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CalculateSchemas.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CalculateSchemas.java
new file mode 100644
index 0000000..db172dc
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CalculateSchemas.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.Distinct;
+import org.apache.beam.sdk.transforms.DoFn;
+
+import org.apache.beam.sdk.transforms.Keys;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Compute the mapping of destinations to json-formatted schema objects. */
+class CalculateSchemas<DestinationT>
+    extends PTransform<
+        PCollection<KV<DestinationT, TableRow>>, PCollectionView<Map<DestinationT, String>>> {
+  private static final Logger LOG = LoggerFactory.getLogger(CalculateSchemas.class);
+
+  private final DynamicDestinations<?, DestinationT> dynamicDestinations;
+
+  public CalculateSchemas(DynamicDestinations<?, DestinationT> dynamicDestinations) {
+    this.dynamicDestinations = dynamicDestinations;
+  }
+
+  @Override
+  public PCollectionView<Map<DestinationT, String>> expand(
+      PCollection<KV<DestinationT, TableRow>> input) {
+    List<PCollectionView<?>> sideInputs = Lists.newArrayList();
+    sideInputs.addAll(dynamicDestinations.getSideInputs());
+
+    return input
+        .apply("Keys", Keys.<DestinationT>create())
+        .apply("Distinct Keys", Distinct.<DestinationT>create())
+        .apply(
+            "GetSchemas",
+            ParDo.of(
+                    new DoFn<DestinationT, KV<DestinationT, String>>() {
+                      @ProcessElement
+                      public void processElement(ProcessContext c) throws Exception {
+                        dynamicDestinations.setSideInputAccessorFromProcessContext(c);
+                        TableSchema tableSchema = dynamicDestinations.getSchema(c.element());
+                        if (tableSchema != null) {
+                          // If the createDisposition is CREATE_NEVER, then there's no need for a
+                          // schema, and getSchema might return null. In this case, we simply
+                          // leave it out of the map.
+                          c.output(KV.of(c.element(), BigQueryHelpers.toJsonString(tableSchema)));
+                        }
+                      }
+                    })
+                .withSideInputs(sideInputs))
+        .apply("asMap", View.<DestinationT, String>asMap());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
index a377af7..210a072 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
@@ -22,8 +22,11 @@ import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -32,20 +35,20 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 
 /**
  * Creates any tables needed before performing streaming writes to the tables. This is a side-effect
  * {@link DoFn}, and returns the original collection unchanged.
  */
-public class CreateTables
+public class CreateTables<DestinationT>
     extends PTransform<
-        PCollection<KV<TableDestination, TableRow>>, PCollection<KV<TableDestination, TableRow>>> {
+        PCollection<KV<DestinationT, TableRow>>, PCollection<KV<TableDestination, TableRow>>> {
   private final CreateDisposition createDisposition;
   private final BigQueryServices bqServices;
-  private final SerializableFunction<TableDestination, TableSchema> schemaFunction;
+  private final DynamicDestinations<?, DestinationT> dynamicDestinations;
 
   /**
    * The list of tables created so far, so we don't try the creation each time.
@@ -57,40 +60,59 @@ public class CreateTables
 
   public CreateTables(
       CreateDisposition createDisposition,
-      SerializableFunction<TableDestination, TableSchema> schemaFunction) {
-    this(createDisposition, new BigQueryServicesImpl(), schemaFunction);
+      DynamicDestinations<?, DestinationT> dynamicDestinations) {
+    this(createDisposition, new BigQueryServicesImpl(), dynamicDestinations);
   }
 
   private CreateTables(
       CreateDisposition createDisposition,
       BigQueryServices bqServices,
-      SerializableFunction<TableDestination, TableSchema> schemaFunction) {
+      DynamicDestinations<?, DestinationT> dynamicDestinations) {
     this.createDisposition = createDisposition;
     this.bqServices = bqServices;
-    this.schemaFunction = schemaFunction;
+    this.dynamicDestinations = dynamicDestinations;
   }
 
-  CreateTables withTestServices(BigQueryServices bqServices) {
-    return new CreateTables(createDisposition, bqServices, schemaFunction);
+  CreateTables<DestinationT> withTestServices(BigQueryServices bqServices) {
+    return new CreateTables<DestinationT>(createDisposition, bqServices, dynamicDestinations);
   }
 
   @Override
   public PCollection<KV<TableDestination, TableRow>> expand(
-      PCollection<KV<TableDestination, TableRow>> input) {
+      PCollection<KV<DestinationT, TableRow>> input) {
+    List<PCollectionView<?>> sideInputs = Lists.newArrayList();
+    sideInputs.addAll(dynamicDestinations.getSideInputs());
+
     return input.apply(
         ParDo.of(
-            new DoFn<KV<TableDestination, TableRow>, KV<TableDestination, TableRow>>() {
-              @ProcessElement
-              public void processElement(ProcessContext context)
-                  throws InterruptedException, IOException {
-                BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class);
-                possibleCreateTable(options, context.element().getKey());
-                context.output(context.element());
-              }
-            }));
+                new DoFn<KV<DestinationT, TableRow>, KV<TableDestination, TableRow>>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext context)
+                      throws InterruptedException, IOException {
+                    dynamicDestinations.setSideInputAccessorFromProcessContext(context);
+                    TableDestination tableDestination =
+                        dynamicDestinations.getTable(context.element().getKey());
+                    TableReference tableReference = tableDestination.getTableReference();
+                    if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
+                      tableReference.setProjectId(
+                          context.getPipelineOptions().as(BigQueryOptions.class).getProject());
+                      tableDestination =
+                          new TableDestination(
+                              tableReference, tableDestination.getTableDescription());
+                    }
+                    TableSchema tableSchema =
+                        dynamicDestinations.getSchema(context.element().getKey());
+                    BigQueryOptions options =
+                        context.getPipelineOptions().as(BigQueryOptions.class);
+                    possibleCreateTable(options, tableDestination, tableSchema);
+                    context.output(KV.of(tableDestination, context.element().getValue()));
+                  }
+                })
+            .withSideInputs(sideInputs));
   }
 
-  private void possibleCreateTable(BigQueryOptions options, TableDestination tableDestination)
+  private void possibleCreateTable(
+      BigQueryOptions options, TableDestination tableDestination, TableSchema tableSchema)
       throws InterruptedException, IOException {
     String tableSpec = tableDestination.getTableSpec();
     TableReference tableReference = tableDestination.getTableReference();
@@ -102,7 +124,6 @@ public class CreateTables
         // every thread from attempting a create and overwhelming our BigQuery quota.
         DatasetService datasetService = bqServices.getDatasetService(options);
         if (!createdTables.contains(tableSpec)) {
-          TableSchema tableSchema = schemaFunction.apply(tableDestination);
           if (datasetService.getTable(tableReference) == null) {
             datasetService.createTable(
                 new Table()

http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
new file mode 100644
index 0000000..dc8bcff
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.Lists;
+import java.io.Serializable;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+
+/**
+ * This class provides the most general way of specifying dynamic BigQuery table destinations.
+ * Destinations can be extracted from the input element, and stored as a custom type. Mappings are
+ * provided to convert the destination into a BigQuery table reference and a BigQuery schema. The
+ * class can read side inputs while performing these mappings.
+ *
+ * <p>For example, consider a PCollection of events, each containing a user-id field. You want to
+ * write each user's events to a separate table with a separate schema per user. Since the user-id
+ * field is a string, you will represent the destination as a string.
+ *
+ * <pre>{@code
+ * events.apply(BigQueryIO.<UserEvent>write()
+ *  .to(new DynamicDestinations<UserEvent, String>() {
+ *        public String getDestination(ValueInSingleWindow<String> element) {
+ *          return element.getValue().getUserId();
+ *        }
+ *        public TableDestination getTable(String user) {
+ *          return new TableDestination(tableForUser(user), "Table for user " + user);
+ *        }
+ *        public TableSchema getSchema(String user) {
+ *          return tableSchemaForUser(user);
+ *        }
+ *      })
+ *  .withFormatFunction(new SerializableFunction<UserEvent, TableRow>() {
+ *     public TableRow apply(UserEvent event) {
+ *       return convertUserEventToTableRow(event);
+ *     }
+ *   }));
+ * }</pre>
+ *
+ * <p>An instance of {@link DynamicDestinations} can also use side inputs using {@link
+ * #sideInput(PCollectionView)}. The side inputs must be present in {@link #getSideInputs()}.
+ * Side inputs are accessed in the global window, so they must be globally windowed.
+ *
+ * <p>{@code DestinationT} is expected to provide proper hash and equality members. Ideally it will
+ * be a compact type with an efficient coder, as these objects may be used as a key in a {@link
+ * org.apache.beam.sdk.transforms.GroupByKey}.
+ */
+public abstract class DynamicDestinations<T, DestinationT> implements Serializable {
+  interface SideInputAccessor {
+    <SideInputT> SideInputT sideInput(PCollectionView<SideInputT> view);
+  }
+
+  private SideInputAccessor sideInputAccessor;
+
+  static class SideInputAccessorViaProcessContext implements SideInputAccessor {
+    private DoFn<?, ?>.ProcessContext processContext;
+
+    SideInputAccessorViaProcessContext(DoFn<?, ?>.ProcessContext processContext) {
+      this.processContext = processContext;
+    }
+
+    @Override
+    public <SideInputT> SideInputT sideInput(PCollectionView<SideInputT> view) {
+      return processContext.sideInput(view);
+    }
+  }
+
+  /**
+   * Specifies that this object needs access to one or more side inputs. This side inputs must be
+   * globally windowed, as they will be accessed from the global window.
+   */
+  public List<PCollectionView<?>> getSideInputs() {
+    return Lists.newArrayList();
+  }
+
+  /**
+   * Returns the value of a given side input. The view must be present in {@link #getSideInputs()}.
+   */
+  protected <SideInputT> SideInputT sideInput(PCollectionView<SideInputT> view) {
+    checkArgument(
+        getSideInputs().contains(view),
+        "View %s not declared in getSideInputs() (%s)",
+        view,
+        getSideInputs());
+    return sideInputAccessor.sideInput(view);
+  }
+
+  void setSideInputAccessor(SideInputAccessor sideInputAccessor) {
+    this.sideInputAccessor = sideInputAccessor;
+  }
+
+  void setSideInputAccessorFromProcessContext(DoFn<?, ?>.ProcessContext context) {
+    this.sideInputAccessor = new SideInputAccessorViaProcessContext(context);
+  }
+
+  /**
+   * Returns an object that represents at a high level which table is being written to. May not
+   * return null.
+   */
+  public abstract DestinationT getDestination(ValueInSingleWindow<T> element);
+
+  /**
+   * Returns the coder for {@link DestinationT}. If this is not overridden, then
+   * {@link BigQueryIO} will look in the coder registry for a suitable coder. This must be a
+   * deterministic coder, as {@link DestinationT} will be used as a key type in a
+   * {@link org.apache.beam.sdk.transforms.GroupByKey}.
+   */
+  @Nullable
+  public Coder<DestinationT> getDestinationCoder() {
+    return null;
+  }
+
+  /**
+   * Returns a {@link TableDestination} object for the destination. May not return null.
+   */
+  public abstract TableDestination getTable(DestinationT destination);
+
+  /**
+   * Returns the table schema for the destination. May not return null.
+   */
+  public abstract TableSchema getSchema(DestinationT destination);
+
+
+  // Gets the destination coder. If the user does not provide one, try to find one in the coder
+  // registry. If no coder can be found, throws CannotProvideCoderException.
+  Coder<DestinationT> getDestinationCoderWithDefault(CoderRegistry registry)
+      throws CannotProvideCoderException {
+    Coder<DestinationT> destinationCoder = getDestinationCoder();
+    if (destinationCoder != null) {
+      return destinationCoder;
+    }
+    // If dynamicDestinations doesn't provide a coder, try to find it in the coder registry.
+    // We must first use reflection to figure out what the type parameter is.
+    for (Type superclass = getClass().getGenericSuperclass();
+        superclass != null;
+        superclass = ((Class) superclass).getGenericSuperclass()) {
+      if (superclass instanceof ParameterizedType) {
+        ParameterizedType parameterized = (ParameterizedType) superclass;
+        if (parameterized.getRawType() == DynamicDestinations.class) {
+          // DestinationT is the second parameter.
+          Type parameter = parameterized.getActualTypeArguments()[1];
+          @SuppressWarnings("unchecked")
+          Class<DestinationT> parameterClass = (Class<DestinationT>) parameter;
+          return registry.getDefaultCoder(parameterClass);
+        }
+      }
+    }
+    throw new AssertionError(
+        "Couldn't find the DynamicDestinations superclass of " + this.getClass());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
new file mode 100644
index 0000000..72a3314
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableSpec;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+
+/**
+ * Contains some useful helper instances of {@link DynamicDestinations}.
+ */
+class DynamicDestinationsHelpers {
+  /**
+   * Always returns a constant table destination.
+   */
+  static class ConstantTableDestinations<T> extends DynamicDestinations<T, TableDestination> {
+    private final ValueProvider<String> tableSpec;
+    private final String tableDescription;
+
+    ConstantTableDestinations(ValueProvider<String> tableSpec, String tableDescription) {
+      this.tableSpec = tableSpec;
+      this.tableDescription = tableDescription;
+    }
+
+    static <T> ConstantTableDestinations<T> fromTableSpec(
+        ValueProvider<String> tableSpec, String tableDescription) {
+      return new ConstantTableDestinations<T>(tableSpec, tableDescription);
+    }
+
+    static <T> ConstantTableDestinations<T> fromJsonTableRef(
+        ValueProvider<String> jsonTableRef, String tableDescription) {
+      return new ConstantTableDestinations<T>(
+          NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableSpec()), tableDescription);
+    }
+
+    @Override
+    public TableDestination getDestination(ValueInSingleWindow<T> element) {
+      return new TableDestination(tableSpec.get(), tableDescription);
+    }
+
+    @Override
+    public TableSchema getSchema(TableDestination destination) {
+      return null;
+    }
+
+    @Override
+    public TableDestination getTable(TableDestination destination) {
+      return destination;
+    }
+
+    @Override
+    public Coder<TableDestination> getDestinationCoder() {
+      return TableDestinationCoder.of();
+    }
+  }
+
+  /**
+   * Returns a tables based on a user-supplied function.
+   */
+  static class TableFunctionDestinations<T> extends DynamicDestinations<T, TableDestination> {
+    private final SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction;
+
+    TableFunctionDestinations(
+        SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction) {
+      this.tableFunction = tableFunction;
+    }
+
+    @Override
+    public TableDestination getDestination(ValueInSingleWindow<T> element) {
+      return tableFunction.apply(element);
+    }
+
+    @Override
+    public TableSchema getSchema(TableDestination destination) {
+      return null;
+    }
+
+    @Override
+    public TableDestination getTable(TableDestination destination) {
+      return destination;
+    }
+
+    @Override
+    public Coder<TableDestination> getDestinationCoder() {
+      return TableDestinationCoder.of();
+    }
+  }
+
+  /**
+   * Delegates all calls to an inner instance of {@link DynamicDestinations}. This allows
+   * subclasses to modify another instance of {@link DynamicDestinations} by subclassing and
+   * overriding just the methods they want to alter.
+   */
+  static class DelegatingDynamicDestinations<T, DestinationT>
+      extends DynamicDestinations<T, DestinationT> {
+    private final DynamicDestinations<T, DestinationT> inner;
+    DelegatingDynamicDestinations(DynamicDestinations<T, DestinationT> inner) {
+      this.inner = inner;
+    }
+    @Override
+    public DestinationT getDestination(ValueInSingleWindow<T> element) {
+      return inner.getDestination(element);
+    }
+
+    @Override
+    public TableSchema getSchema(DestinationT destination) {
+      return inner.getSchema(destination);
+    }
+
+    @Override
+    public TableDestination getTable(DestinationT destination) {
+      return inner.getTable(destination);
+    }
+
+    @Override
+    public Coder<DestinationT> getDestinationCoder() {
+      return inner.getDestinationCoder();
+    }
+  }
+
+  /**
+   * Returns the same schema for every table.
+   */
+  static class ConstantSchemaDestinations<T>
+      extends DelegatingDynamicDestinations<T, TableDestination> {
+    @Nullable
+    private final ValueProvider<String> jsonSchema;
+
+    ConstantSchemaDestinations(DynamicDestinations<T, TableDestination> inner,
+                               ValueProvider<String> jsonSchema) {
+      super(inner);
+      this.jsonSchema = jsonSchema;
+    }
+
+    @Override
+    public TableSchema getSchema(TableDestination destination) {
+      return BigQueryHelpers.fromJsonString(jsonSchema.get(), TableSchema.class);
+    }
+  }
+
+  /**
+   * Takes in a side input mapping tablespec to json table schema, and always returns the
+   * matching schema from the side input.
+   */
+  static class SchemaFromViewDestinations<T>
+      extends DelegatingDynamicDestinations<T, TableDestination> {
+    PCollectionView<Map<String, String>> schemaView;
+    SchemaFromViewDestinations(DynamicDestinations<T, TableDestination> inner,
+                               PCollectionView<Map<String, String>> schemaView) {
+      super(inner);
+      this.schemaView = schemaView;
+    }
+
+
+    @Override
+    public List<PCollectionView<?>> getSideInputs() {
+      return ImmutableList.<PCollectionView<?>>builder().add(schemaView).build();
+    }
+
+    @Override
+    public TableSchema getSchema(TableDestination destination) {
+      Map<String, String> mapValue = sideInput(schemaView);
+      return BigQueryHelpers.fromJsonString(mapValue.get(destination.getTableSpec()),
+          TableSchema.class);
+    }
+  }
+}


Mime
View raw message