beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] incubator-beam git commit: Implement getSplitPointsConsumed() in BigtableIO
Date Thu, 30 Jun 2016 03:48:00 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 9d6d622ab -> e01efbda3


Implement getSplitPointsConsumed() in BigtableIO


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f0f809a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f0f809a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f0f809a0

Branch: refs/heads/master
Commit: f0f809a0960d003b178c82d268598f69456bc8a0
Parents: 9d6d622
Author: Ian Zhou <ianzhou@google.com>
Authored: Wed Jun 29 12:03:27 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Wed Jun 29 20:47:45 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    | 11 +++++--
 .../sdk/io/gcp/bigtable/BigtableIOTest.java     | 32 +++++++++++++++++++-
 2 files changed, 40 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f0f809a0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index f725a66..cddb333 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -765,7 +765,8 @@ public class BigtableIO {
       reader = service.createReader(getCurrentSource());
       boolean hasRecord =
           reader.start()
-              && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()));
+              && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()))
+              || rangeTracker.markDone();
       if (hasRecord) {
         ++recordsReturned;
       }
@@ -781,7 +782,8 @@ public class BigtableIO {
     public boolean advance() throws IOException {
       boolean hasRecord =
           reader.advance()
-              && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()));
+              && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()))
+              || rangeTracker.markDone();
       if (hasRecord) {
         ++recordsReturned;
       }
@@ -808,6 +810,11 @@ public class BigtableIO {
     }
 
     @Override
+    public final long getSplitPointsConsumed() {
+      return rangeTracker.getSplitPointsConsumed();
+    }
+
+    @Override
     public final synchronized BigtableSource splitAtFraction(double fraction) {
       ByteKey splitKey;
       try {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f0f809a0/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index c09943b..cdbaaac 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -25,7 +25,6 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Verify.verifyNotNull;
-
 import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -33,6 +32,7 @@ import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
 import org.apache.beam.sdk.io.range.ByteKey;
 import org.apache.beam.sdk.io.range.ByteKeyRange;
@@ -490,6 +490,36 @@ public class BigtableIOTest {
     assertThat(displayData, hasDisplayItem("tableId", "fooTable"));
   }
 
+  @Test
+  public void testGetSplitPointsConsumed() throws Exception {
+    final String table = "TEST-TABLE";
+    final int numRows = 100;
+    int splitPointsConsumed = 0;
+
+    makeTableData(table, numRows);
+
+    BigtableSource source =
+        new BigtableSource(service, table, null, ByteKeyRange.ALL_KEYS, null);
+
+    BoundedReader<Row> reader = source.createReader(TestPipeline.testingPipelineOptions());
+
+    reader.start();
+    // Started, 0 split points consumed
+    assertEquals("splitPointsConsumed starting",
+        splitPointsConsumed, reader.getSplitPointsConsumed());
+
+    // Split points consumed increases for each row read
+    while (reader.advance()) {
+      assertEquals("splitPointsConsumed advancing",
+          ++splitPointsConsumed, reader.getSplitPointsConsumed());
+    }
+
+    // Reader marked as done, 100 split points consumed
+    assertEquals("splitPointsConsumed done", numRows, reader.getSplitPointsConsumed());
+
+    reader.close();
+  }
+
   ////////////////////////////////////////////////////////////////////////////////////////////
   private static final String COLUMN_FAMILY_NAME = "family";
   private static final ByteString COLUMN_NAME = ByteString.copyFromUtf8("column");


Mime
View raw message