Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5AC11200B38 for ; Fri, 8 Jul 2016 19:43:52 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5964A160A36; Fri, 8 Jul 2016 17:43:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5800F160A5A for ; Fri, 8 Jul 2016 19:43:51 +0200 (CEST) Received: (qmail 76695 invoked by uid 500); 8 Jul 2016 17:43:50 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 76686 invoked by uid 99); 8 Jul 2016 17:43:50 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Jul 2016 17:43:50 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 29990C206F for ; Fri, 8 Jul 2016 17:43:50 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.507 X-Spam-Level: X-Spam-Status: No, score=-4.507 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.287] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 0bmxcACgtRP7 for ; Fri, 8 Jul 2016 17:43:49 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id F17AD5FB37 for ; Fri, 8 Jul 2016 17:43:47 +0000 (UTC) Received: (qmail 76584 invoked by uid 99); 8 Jul 2016 17:43:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Jul 2016 17:43:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2C1BAE03C0; Fri, 8 Jul 2016 17:43:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dhalperi@apache.org To: commits@beam.incubator.apache.org Date: Fri, 08 Jul 2016 17:43:48 -0000 Message-Id: <6fd199ac3fa5459bae4dbf266c9ebdf2@git.apache.org> In-Reply-To: <2cb1a62ffc684c3c9f6eb506b7382053@git.apache.org> References: <2cb1a62ffc684c3c9f6eb506b7382053@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] incubator-beam git commit: Modified BigtableIO to support streaming archived-at: Fri, 08 Jul 2016 17:43:52 -0000 Modified BigtableIO to support streaming Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/12f15993 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/12f15993 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/12f15993 Branch: refs/heads/master Commit: 12f159934ec7965c3974cda319681103a817778b Parents: a7312be Author: Ian Zhou Authored: Fri Jul 1 16:14:53 2016 -0700 Committer: Dan Halperin Committed: Fri Jul 8 10:43:32 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 260 +++++++------------ 1 file changed, 92 insertions(+), 168 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12f15993/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 0c485bf..4bab45e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -23,18 +23,17 @@ import static com.google.common.base.Preconditions.checkState; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.coders.protobuf.ProtoCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; -import org.apache.beam.sdk.io.Sink.WriteOperation; -import org.apache.beam.sdk.io.Sink.Writer; import org.apache.beam.sdk.io.range.ByteKey; import org.apache.beam.sdk.io.range.ByteKeyRange; import org.apache.beam.sdk.io.range.ByteKeyRangeTracker; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.ReleaseInfo; import org.apache.beam.sdk.values.KV; @@ -453,8 +452,8 @@ public class BigtableIO { @Override public PDone apply(PCollection>> input) { - Sink sink = new Sink(tableId, getBigtableService()); - return input.apply(org.apache.beam.sdk.io.Write.to(sink)); + input.apply(ParDo.of(new BigtableWriterFn(tableId, getBigtableService()))); + return PDone.in(input.getPipeline()); } @Override @@ -514,6 +513,94 @@ public class BigtableIO { } return new BigtableServiceImpl(options); } + + private class BigtableWriterFn extends DoFn>, Void> { + + public BigtableWriterFn(String tableId, BigtableService bigtableService) { + this.tableId = checkNotNull(tableId, "tableId"); + this.bigtableService = checkNotNull(bigtableService, "bigtableService"); + this.failures = new ConcurrentLinkedQueue<>(); + } + + @Override + public void startBundle(Context c) throws Exception { + bigtableWriter = bigtableService.openForWriting(tableId); + recordsWritten = 0; + } + + @Override + public void processElement(ProcessContext c) throws Exception { + checkForFailures(); + Futures.addCallback( + bigtableWriter.writeRecord(c.element()), new WriteExceptionCallback(c.element())); + ++recordsWritten; + } + + @Override + public void finishBundle(Context c) throws Exception { + bigtableWriter.close(); + bigtableWriter = null; + checkForFailures(); + logger.info("Wrote {} records", recordsWritten); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + Write.this.populateDisplayData(builder); + } + + /////////////////////////////////////////////////////////////////////////////// + private final String tableId; + private final BigtableService bigtableService; + private BigtableService.Writer bigtableWriter; + private long recordsWritten; + private final ConcurrentLinkedQueue failures; + + /** + * If any write has asynchronously failed, fail the bundle with a useful error. + */ + private void checkForFailures() throws IOException { + // Note that this function is never called by multiple threads and is the only place that + // we remove from failures, so this code is safe. + if (failures.isEmpty()) { + return; + } + + StringBuilder logEntry = new StringBuilder(); + int i = 0; + for (; i < 10 && !failures.isEmpty(); ++i) { + BigtableWriteException exc = failures.remove(); + logEntry.append("\n").append(exc.getMessage()); + if (exc.getCause() != null) { + logEntry.append(": ").append(exc.getCause().getMessage()); + } + } + String message = + String.format( + "At least %d errors occurred writing to Bigtable. First %d errors: %s", + i + failures.size(), + i, + logEntry.toString()); + logger.error(message); + throw new IOException(message); + } + + private class WriteExceptionCallback implements FutureCallback { + private final KV> value; + + public WriteExceptionCallback(KV> value) { + this.value = value; + } + + @Override + public void onFailure(Throwable cause) { + failures.add(new BigtableWriteException(value, cause)); + } + + @Override + public void onSuccess(Empty produced) {} + } + } } ////////////////////////////////////////////////////////////////////////////////////////// @@ -871,169 +958,6 @@ public class BigtableIO { } } - private static class Sink - extends org.apache.beam.sdk.io.Sink>> { - - public Sink(String tableId, BigtableService bigtableService) { - this.tableId = checkNotNull(tableId, "tableId"); - this.bigtableService = checkNotNull(bigtableService, "bigtableService"); - } - - public String getTableId() { - return tableId; - } - - public BigtableService getBigtableService() { - return bigtableService; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(Sink.class) - .add("bigtableService", bigtableService) - .add("tableId", tableId) - .toString(); - } - - /////////////////////////////////////////////////////////////////////////////// - private final String tableId; - private final BigtableService bigtableService; - - @Override - public WriteOperation>, Long> createWriteOperation( - PipelineOptions options) { - return new BigtableWriteOperation(this); - } - - /** Does nothing, as it is redundant with {@link Write#validate}. */ - @Override - public void validate(PipelineOptions options) {} - } - - private static class BigtableWriteOperation - extends WriteOperation>, Long> { - private final Sink sink; - - public BigtableWriteOperation(Sink sink) { - this.sink = sink; - } - - @Override - public Writer>, Long> createWriter(PipelineOptions options) - throws Exception { - return new BigtableWriter(this); - } - - @Override - public void initialize(PipelineOptions options) {} - - @Override - public void finalize(Iterable writerResults, PipelineOptions options) { - long count = 0; - for (Long value : writerResults) { - value += count; - } - logger.debug("Wrote {} elements to BigtableIO.Sink {}", sink); - } - - @Override - public Sink getSink() { - return sink; - } - - @Override - public Coder getWriterResultCoder() { - return VarLongCoder.of(); - } - } - - private static class BigtableWriter extends Writer>, Long> { - private final BigtableWriteOperation writeOperation; - private final Sink sink; - private BigtableService.Writer bigtableWriter; - private long recordsWritten; - private final ConcurrentLinkedQueue failures; - - public BigtableWriter(BigtableWriteOperation writeOperation) { - this.writeOperation = writeOperation; - this.sink = writeOperation.getSink(); - this.failures = new ConcurrentLinkedQueue<>(); - } - - @Override - public void open(String uId) throws Exception { - bigtableWriter = sink.getBigtableService().openForWriting(sink.getTableId()); - recordsWritten = 0; - } - - /** - * If any write has asynchronously failed, fail the bundle with a useful error. - */ - private void checkForFailures() throws IOException { - // Note that this function is never called by multiple threads and is the only place that - // we remove from failures, so this code is safe. - if (failures.isEmpty()) { - return; - } - - StringBuilder logEntry = new StringBuilder(); - int i = 0; - for (; i < 10 && !failures.isEmpty(); ++i) { - BigtableWriteException exc = failures.remove(); - logEntry.append("\n").append(exc.getMessage()); - if (exc.getCause() != null) { - logEntry.append(": ").append(exc.getCause().getMessage()); - } - } - String message = - String.format( - "At least %d errors occurred writing to Bigtable. First %d errors: %s", - i + failures.size(), - i, - logEntry.toString()); - logger.error(message); - throw new IOException(message); - } - - @Override - public void write(KV> rowMutations) throws Exception { - checkForFailures(); - Futures.addCallback( - bigtableWriter.writeRecord(rowMutations), new WriteExceptionCallback(rowMutations)); - ++recordsWritten; - } - - @Override - public Long close() throws Exception { - bigtableWriter.close(); - bigtableWriter = null; - checkForFailures(); - logger.info("Wrote {} records", recordsWritten); - return recordsWritten; - } - - @Override - public WriteOperation>, Long> getWriteOperation() { - return writeOperation; - } - - private class WriteExceptionCallback implements FutureCallback { - private final KV> value; - - public WriteExceptionCallback(KV> value) { - this.value = value; - } - - @Override - public void onFailure(Throwable cause) { - failures.add(new BigtableWriteException(value, cause)); - } - - @Override - public void onSuccess(Empty produced) {} - } - } - /** * An exception that puts information about the failed record being written in its message. */