beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [10/50] [abbrv] beam git commit: Add PrepareWrite transform.
Date Wed, 19 Apr 2017 19:14:44 GMT
Add PrepareWrite transform.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/67a5f827
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/67a5f827
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/67a5f827

Branch: refs/heads/DSL_SQL
Commit: 67a5f82706e52fe025b63aa2e9652368f22c8344
Parents: c939a43
Author: Reuven Lax <relax@google.com>
Authored: Tue Mar 28 12:53:27 2017 -0700
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Tue Apr 18 21:12:49 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/PrepareWrite.java  | 58 ++++++++++++++++++++
 .../sdk/io/gcp/bigquery/TableDestination.java   |  7 +++
 .../gcp/bigquery/TagWithUniqueIdsAndTable.java  | 15 ++---
 3 files changed, 69 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/67a5f827/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
new file mode 100644
index 0000000..0c08e18
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
@@ -0,0 +1,58 @@
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.base.Strings;
+import java.io.IOException;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+
+/**
+ * Prepare an input {@link PCollection<T>} for writing to BigQuery. Use the table-reference
+ * function to determine which tables each element is written to, and format the element
into a
+ * {@link TableRow} using the user-supplied format function.
+ */
+public class PrepareWrite<T> extends PTransform<PCollection<T>, PCollection<KV<String,
TableRow>>> {
+  private static final String NAME = "PrepareWrite";
+  private SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction;
+  private SerializableFunction<T, TableRow> formatFunction;
+
+  public PrepareWrite(SerializableFunction<ValueInSingleWindow<T>, TableReference>
tableRefFunction,
+                      SerializableFunction<T, TableRow> formatFunction) {
+    super(NAME);
+    this.tableRefFunction = tableRefFunction;
+    this.formatFunction = formatFunction;
+  }
+
+  @Override
+  public PCollection<KV<String, TableRow>> expand(PCollection<T> input)
{
+    PCollection<KV<String, TableRow>> elementsByTable =
+        input.apply(ParDo.of(new DoFn<T, KV<String, TableRow>>() {
+      @ProcessElement
+      public void processElement(ProcessContext context, BoundedWindow window) throws IOException
{
+        String tableSpec = tableSpecFromWindowedValue(
+            context.getPipelineOptions().as(BigQueryOptions.class),
+            ValueInSingleWindow.of(context.element(), context.timestamp(), window, context.pane()));
+        TableRow tableRow = formatFunction.apply(context.element());
+        context.output(KV.of(tableSpec, tableRow));
+      }
+    }));
+    return elementsByTable;
+  }
+
+  private String tableSpecFromWindowedValue(BigQueryOptions options,
+                                            ValueInSingleWindow<T> value) {
+    TableReference table = tableRefFunction.apply(value);
+    if (Strings.isNullOrEmpty(table.getProjectId())) {
+      table.setProjectId(options.getProject());
+    }
+    return BigQueryHelpers.toTableSpec(table);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/67a5f827/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
new file mode 100644
index 0000000..3cbbf3b
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
@@ -0,0 +1,7 @@
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+/**
+ * Created by relax on 3/28/17.
+ */
+public class TableDestination {
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/67a5f827/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java
index 8d7d1e6..4e50f7c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java
@@ -73,9 +73,9 @@ class TagWithUniqueIdsAndTable<T>
   public void processElement(ProcessContext context, BoundedWindow window) throws IOException
{
     String uniqueId = randomUUID + sequenceNo++;
     ThreadLocalRandom randomGenerator = ThreadLocalRandom.current();
-    String tableSpec = tableSpecFromWindowedValue(
-        context.getPipelineOptions().as(BigQueryOptions.class),
-        ValueInSingleWindow.of(context.element(), context.timestamp(), window, context.pane()));
+      String tableSpec = tableSpecFromWindowedValue(
+          context.getPipelineOptions().as(BigQueryOptions.class),
+          ValueInSingleWindow.of(context.element(), context.timestamp(), window, context.pane()));
     // We output on keys 0-50 to ensure that there's enough batching for
     // BigQuery.
     context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, 50)),
@@ -97,12 +97,5 @@ class TagWithUniqueIdsAndTable<T>
   }
 
 
-  private String tableSpecFromWindowedValue(BigQueryOptions options,
-                                            ValueInSingleWindow<T> value) {
-    TableReference table = write.getTableRefFunction().apply(value);
-    if (Strings.isNullOrEmpty(table.getProjectId())) {
-      table.setProjectId(options.getProject());
-    }
-    return BigQueryHelpers.toTableSpec(table);
-  }
+
 }


Mime
View raw message