beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pabl...@apache.org
Subject [beam] branch master updated: [BEAM-12504] Make CreateTransaction wait on input signal
Date Mon, 16 Aug 2021 20:49:39 GMT
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b5f298  [BEAM-12504] Make CreateTransaction wait on input signal
     new 6774062  Merge pull request #15048 from [BEAM-12504] Make SpannerIO.ConnectTransaction
wait on input signal
6b5f298 is described below

commit 6b5f298dfce8d8c7fc5def3a57b033a95016ca52
Author: Minbo Bae <baeminbo@google.com>
AuthorDate: Sun Jun 20 23:28:17 2021 -0700

    [BEAM-12504] Make CreateTransaction wait on input signal
    
    The current SpannerIO creates transaction at the beginning of pipeline run. This causes
SpannerIO error by session not found if the pipeline runs long with `ReadAll` which gets `ReadOperation`
as input .
    
    This change introduces a delayed transaction creation. It makes a pipeline able to create
Spanner transaction when it is needed (e.g. just before ReadAll).
    
    ```
    SpannerConfig config = ...
    
    PCollection<ReadOperation> readOperations = ...
    
    // Transaction will be created after readOperations is ready. So, it can avoid session
expiration error.
    PCollectionView<Transaction> transaction = readOperations.apply(SpannerIO.createTransaction());
    
    PCollection<Struct> users = readOperations
      .apply(SpannerIO.readAll().withSpannerConfig(config).withTransaction(transaction))
    ```
---
 .../sdk/io/gcp/spanner/CreateTransactionFn.java    | 12 ++++++-----
 .../apache/beam/sdk/io/gcp/spanner/SpannerIO.java  | 24 ++++++++++++++++------
 2 files changed, 25 insertions(+), 11 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java
index cff3fb0..cf26985 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.gcp.spanner;
 
 import com.google.cloud.spanner.BatchReadOnlyTransaction;
+import com.google.cloud.spanner.TimestampBound;
 import org.apache.beam.sdk.transforms.DoFn;
 
 /** Creates a batch transaction. */
@@ -25,18 +26,19 @@ import org.apache.beam.sdk.transforms.DoFn;
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
 class CreateTransactionFn extends DoFn<Object, Transaction> {
+  private final SpannerConfig config;
+  private final TimestampBound timestampBound;
 
-  private final SpannerIO.CreateTransaction config;
-
-  CreateTransactionFn(SpannerIO.CreateTransaction config) {
+  CreateTransactionFn(SpannerConfig config, TimestampBound timestampBound) {
     this.config = config;
+    this.timestampBound = timestampBound;
   }
 
   private transient SpannerAccessor spannerAccessor;
 
   @DoFn.Setup
   public void setup() throws Exception {
-    spannerAccessor = SpannerAccessor.getOrCreate(config.getSpannerConfig());
+    spannerAccessor = SpannerAccessor.getOrCreate(config);
   }
 
   @Teardown
@@ -47,7 +49,7 @@ class CreateTransactionFn extends DoFn<Object, Transaction> {
   @ProcessElement
   public void processElement(ProcessContext c) throws Exception {
     BatchReadOnlyTransaction tx =
-        spannerAccessor.getBatchClient().batchReadOnlyTransaction(config.getTimestampBound());
+        spannerAccessor.getBatchClient().batchReadOnlyTransaction(timestampBound);
     c.output(Transaction.create(tx.getBatchTransactionId()));
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index b31d716..07ff216 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -79,6 +79,7 @@ import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
@@ -739,13 +740,15 @@ public class SpannerIO {
   }
 
   /**
-   * A {@link PTransform} that create a transaction.
+   * A {@link PTransform} that create a transaction. If applied to a {@link PCollection},
it will
+   * create a transaction after the {@link PCollection} is closed.
    *
    * @see SpannerIO
+   * @see Wait
    */
   @AutoValue
   public abstract static class CreateTransaction
-      extends PTransform<PBegin, PCollectionView<Transaction>> {
+      extends PTransform<PInput, PCollectionView<Transaction>> {
 
     abstract SpannerConfig getSpannerConfig();
 
@@ -754,12 +757,21 @@ public class SpannerIO {
     abstract Builder toBuilder();
 
     @Override
-    public PCollectionView<Transaction> expand(PBegin input) {
+    public PCollectionView<Transaction> expand(PInput input) {
       getSpannerConfig().validate();
 
-      return input
-          .apply(Create.of(1))
-          .apply("Create transaction", ParDo.of(new CreateTransactionFn(this)))
+      PCollection<?> collection = input.getPipeline().apply(Create.of(1));
+
+      if (input instanceof PCollection) {
+        collection = collection.apply(Wait.on((PCollection<?>) input));
+      } else if (!(input instanceof PBegin)) {
+        throw new RuntimeException("input must be PBegin or PCollection");
+      }
+
+      return collection
+          .apply(
+              "Create transaction",
+              ParDo.of(new CreateTransactionFn(this.getSpannerConfig(), this.getTimestampBound())))
           .apply("As PCollectionView", View.asSingleton());
     }
 

Mime
View raw message