From commits-return-114613-archive-asf-public=cust-asf.ponee.io@beam.apache.org Mon Aug 16 20:49:45 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-he-de.apache.org (mxout1-he-de.apache.org [95.216.194.37]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id 8319E180661 for ; Mon, 16 Aug 2021 22:49:45 +0200 (CEST) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-he-de.apache.org (ASF Mail Server at mxout1-he-de.apache.org) with SMTP id E00A36004E for ; Mon, 16 Aug 2021 20:49:44 +0000 (UTC) Received: (qmail 99377 invoked by uid 500); 16 Aug 2021 20:49:44 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 99368 invoked by uid 99); 16 Aug 2021 20:49:44 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Aug 2021 20:49:44 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 24C6481EF0; Mon, 16 Aug 2021 20:49:44 +0000 (UTC) Date: Mon, 16 Aug 2021 20:49:39 +0000 To: "commits@beam.apache.org" Subject: [beam] branch master updated: [BEAM-12504] Make CreateTransaction wait on input signal MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <162914697512.6150.13476593749047275189@gitbox.apache.org> From: pabloem@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: beam X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 9cdf661c2c6b754e21f5eb454a357ddda2f7107b X-Git-Newrev: 6b5f298dfce8d8c7fc5def3a57b033a95016ca52 X-Git-Rev: 6b5f298dfce8d8c7fc5def3a57b033a95016ca52 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 6b5f298 [BEAM-12504] Make CreateTransaction wait on input signal new 6774062 Merge pull request #15048 from [BEAM-12504] Make SpannerIO.ConnectTransaction wait on input signal 6b5f298 is described below commit 6b5f298dfce8d8c7fc5def3a57b033a95016ca52 Author: Minbo Bae AuthorDate: Sun Jun 20 23:28:17 2021 -0700 [BEAM-12504] Make CreateTransaction wait on input signal The current SpannerIO creates transaction at the beginning of pipeline run. This causes SpannerIO error by session not found if the pipeline runs long with `ReadAll` which gets `ReadOperation` as input . This change introduces a delayed transaction creation. It makes a pipeline able to create Spanner transaction when it is needed (e.g. just before ReadAll). ``` SpannerConfig config = ... PCollection readOperations = ... // Transaction will be created after readOperations is ready. So, it can avoid session expiration error. PCollectionView transaction = readOperations.apply(SpannerIO.createTransaction()); PCollection users = readOperations .apply(SpannerIO.readAll().withSpannerConfig(config).withTransaction(transaction)) ``` --- .../sdk/io/gcp/spanner/CreateTransactionFn.java | 12 ++++++----- .../apache/beam/sdk/io/gcp/spanner/SpannerIO.java | 24 ++++++++++++++++------ 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java index cff3fb0..cf26985 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.spanner; import com.google.cloud.spanner.BatchReadOnlyTransaction; +import com.google.cloud.spanner.TimestampBound; import org.apache.beam.sdk.transforms.DoFn; /** Creates a batch transaction. */ @@ -25,18 +26,19 @@ import org.apache.beam.sdk.transforms.DoFn; "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) }) class CreateTransactionFn extends DoFn { + private final SpannerConfig config; + private final TimestampBound timestampBound; - private final SpannerIO.CreateTransaction config; - - CreateTransactionFn(SpannerIO.CreateTransaction config) { + CreateTransactionFn(SpannerConfig config, TimestampBound timestampBound) { this.config = config; + this.timestampBound = timestampBound; } private transient SpannerAccessor spannerAccessor; @DoFn.Setup public void setup() throws Exception { - spannerAccessor = SpannerAccessor.getOrCreate(config.getSpannerConfig()); + spannerAccessor = SpannerAccessor.getOrCreate(config); } @Teardown @@ -47,7 +49,7 @@ class CreateTransactionFn extends DoFn { @ProcessElement public void processElement(ProcessContext c) throws Exception { BatchReadOnlyTransaction tx = - spannerAccessor.getBatchClient().batchReadOnlyTransaction(config.getTimestampBound()); + spannerAccessor.getBatchClient().batchReadOnlyTransaction(timestampBound); c.output(Transaction.create(tx.getBatchTransactionId())); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index b31d716..07ff216 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -79,6 +79,7 @@ import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; @@ -739,13 +740,15 @@ public class SpannerIO { } /** - * A {@link PTransform} that create a transaction. + * A {@link PTransform} that create a transaction. If applied to a {@link PCollection}, it will + * create a transaction after the {@link PCollection} is closed. * * @see SpannerIO + * @see Wait */ @AutoValue public abstract static class CreateTransaction - extends PTransform> { + extends PTransform> { abstract SpannerConfig getSpannerConfig(); @@ -754,12 +757,21 @@ public class SpannerIO { abstract Builder toBuilder(); @Override - public PCollectionView expand(PBegin input) { + public PCollectionView expand(PInput input) { getSpannerConfig().validate(); - return input - .apply(Create.of(1)) - .apply("Create transaction", ParDo.of(new CreateTransactionFn(this))) + PCollection collection = input.getPipeline().apply(Create.of(1)); + + if (input instanceof PCollection) { + collection = collection.apply(Wait.on((PCollection) input)); + } else if (!(input instanceof PBegin)) { + throw new RuntimeException("input must be PBegin or PCollection"); + } + + return collection + .apply( + "Create transaction", + ParDo.of(new CreateTransactionFn(this.getSpannerConfig(), this.getTimestampBound()))) .apply("As PCollectionView", View.asSingleton()); }