Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5FC6B200C5B for ; Tue, 4 Apr 2017 20:57:11 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5E4E2160B90; Tue, 4 Apr 2017 18:57:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 57D85160B77 for ; Tue, 4 Apr 2017 20:57:10 +0200 (CEST) Received: (qmail 83589 invoked by uid 500); 4 Apr 2017 18:57:09 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 83579 invoked by uid 99); 4 Apr 2017 18:57:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Apr 2017 18:57:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7136DDFE59; Tue, 4 Apr 2017 18:57:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dhalperi@apache.org To: commits@beam.apache.org Date: Tue, 04 Apr 2017 18:57:09 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: Change BigQueryIO.Write return type to allow for future new functionality archived-at: Tue, 04 Apr 2017 18:57:11 -0000 Repository: beam Updated Branches: refs/heads/master 50fc63a9b -> 65ffd6c64 Change BigQueryIO.Write return type to allow for future new functionality Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/475bf135 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/475bf135 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/475bf135 Branch: refs/heads/master Commit: 475bf13575fde12ce4de6e935d828123a0f317f4 Parents: 50fc63a Author: Reuven Lax Authored: Sun Apr 2 20:12:05 2017 -0700 Committer: Dan Halperin Committed: Tue Apr 4 11:56:53 2017 -0700 ---------------------------------------------------------------------- .../complete/game/utils/WriteToBigQuery.java | 3 +- .../game/utils/WriteWindowedToBigQuery.java | 3 +- .../sdk/io/gcp/bigquery/BatchLoadBigQuery.java | 7 ++- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 5 +-- .../sdk/io/gcp/bigquery/StreamWithDeDup.java | 14 ++---- .../beam/sdk/io/gcp/bigquery/WriteResult.java | 47 ++++++++++++++++++++ 6 files changed, 59 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/475bf135/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java index d60510f..5eecddb 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java @@ -119,12 +119,13 @@ public class WriteToBigQuery @Override public PDone expand(PCollection teamAndScore) { - return teamAndScore + teamAndScore .apply("ConvertToRow", ParDo.of(new BuildRowFn())) .apply(BigQueryIO.writeTableRows().to(getTable(teamAndScore.getPipeline(), tableName)) .withSchema(getSchema()) .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) .withWriteDisposition(WriteDisposition.WRITE_APPEND)); + return PDone.in(teamAndScore.getPipeline()); } /** Utility to construct an output table reference. */ http://git-wip-us.apache.org/repos/asf/beam/blob/475bf135/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java index b8e12c1..e602258 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java @@ -59,13 +59,14 @@ public class WriteWindowedToBigQuery @Override public PDone expand(PCollection teamAndScore) { - return teamAndScore + teamAndScore .apply("ConvertToRow", ParDo.of(new BuildRowFn())) .apply(BigQueryIO.writeTableRows() .to(getTable(teamAndScore.getPipeline(), tableName)) .withSchema(getSchema()) .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) .withWriteDisposition(WriteDisposition.WRITE_APPEND)); + return PDone.in(teamAndScore.getPipeline()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/475bf135/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java index 75b1cc7..c323858 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java @@ -45,7 +45,6 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptor; @@ -53,7 +52,7 @@ import org.apache.beam.sdk.values.TypeDescriptor; /** * PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery. */ -class BatchLoadBigQuery extends PTransform, PDone> { +class BatchLoadBigQuery extends PTransform, WriteResult> { BigQueryIO.Write write; BatchLoadBigQuery(BigQueryIO.Write write) { @@ -61,7 +60,7 @@ class BatchLoadBigQuery extends PTransform, PDone> { } @Override - public PDone expand(PCollection input) { + public WriteResult expand(PCollection input) { Pipeline p = input.getPipeline(); BigQueryOptions options = p.getOptions().as(BigQueryOptions.class); ValueProvider table = write.getTableWithDefaultProject(options); @@ -177,6 +176,6 @@ class BatchLoadBigQuery extends PTransform, PDone> { write.getTableDescription())) .withSideInputs(jobIdTokenView)); - return PDone.in(input.getPipeline()); + return WriteResult.in(input.getPipeline()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/475bf135/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index cc6ec09..3c7b549 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -70,7 +70,6 @@ import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; -import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -674,7 +673,7 @@ public class BigQueryIO { /** Implementation of {@link #write}. */ @AutoValue - public abstract static class Write extends PTransform, PDone> { + public abstract static class Write extends PTransform, WriteResult> { @VisibleForTesting // Maximum number of files in a single partition. static final int MAX_NUM_FILES = 10000; @@ -984,7 +983,7 @@ public class BigQueryIO { } @Override - public PDone expand(PCollection input) { + public WriteResult expand(PCollection input) { // When writing an Unbounded PCollection, or when a tablespec function is defined, we use // StreamWithDeDup and BigQuery's streaming import API. if (input.isBounded() == IsBounded.UNBOUNDED || getTableRefFunction() != null) { http://git-wip-us.apache.org/repos/asf/beam/blob/475bf135/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java index f667295..1fa26d1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java @@ -32,13 +32,12 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.Reshuffle; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; /** * PTransform that performs streaming BigQuery write. To increase consistency, * it leverages BigQuery best effort de-dup mechanism. */ -class StreamWithDeDup extends PTransform, PDone> { +class StreamWithDeDup extends PTransform, WriteResult> { private final Write write; /** Constructor. */ @@ -52,7 +51,7 @@ class StreamWithDeDup extends PTransform, PDone> { } @Override - public PDone expand(PCollection input) { + public WriteResult expand(PCollection input) { // A naive implementation would be to simply stream data directly to BigQuery. // However, this could occasionally lead to duplicated data, e.g., when // a VM that runs this code is restarted and the code is re-run. @@ -86,13 +85,6 @@ class StreamWithDeDup extends PTransform, PDone> { write.getCreateDisposition(), write.getTableDescription(), write.getBigQueryServices()))); - - // Note that the implementation to return PDone here breaks the - // implicit assumption about the job execution order. If a user - // implements a PTransform that takes PDone returned here as its - // input, the transform may not necessarily be executed after - // the BigQueryIO.Write. - - return PDone.in(input.getPipeline()); + return WriteResult.in(input.getPipeline()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/475bf135/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java new file mode 100644 index 0000000..07fbc68 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import java.util.Collections; +import java.util.List; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.values.POutputValueBase; +import org.apache.beam.sdk.values.TaggedPValue; + + +/** + * The result of a {@link BigQueryIO.Write} transform. + */ +final class WriteResult extends POutputValueBase { + /** + * Creates a {@link WriteResult} in the given {@link Pipeline}. + */ + static WriteResult in(Pipeline pipeline) { + return new WriteResult(pipeline); + } + + @Override + public List expand() { + return Collections.emptyList(); + } + + private WriteResult(Pipeline pipeline) { + super(pipeline); + } +}