beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] incubator-beam git commit: Update to bigtable-client-core-0.3.0 and use bulk writes
Date Thu, 30 Jun 2016 20:06:17 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 4a0e426a8 -> 38866cd55


Update to bigtable-client-core-0.3.0 and use bulk writes

Generally more stable, plus bulk writes bring 5x write throughput in
batch jobs by more efficiently using the network.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/653c504f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/653c504f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/653c504f

Branch: refs/heads/master
Commit: 653c504f2f9460bc8861d149694ed2595701ce16
Parents: 4a0e426
Author: Ian Zhou <ianzhou@google.com>
Authored: Thu Jun 16 13:52:11 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Thu Jun 30 13:05:07 2016 -0700

----------------------------------------------------------------------
 pom.xml                                         |  2 +-
 sdks/java/core/pom.xml                          |  2 +-
 sdks/java/io/google-cloud-platform/pom.xml      |  2 +-
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    | 60 +++++++++++++++++++-
 .../io/gcp/bigtable/BigtableServiceImpl.java    | 31 +++++-----
 .../sdk/io/gcp/bigtable/BigtableIOTest.java     | 59 +++++++++++++++++++
 .../sdk/io/gcp/bigtable/BigtableWriteIT.java    | 18 +++++-
 7 files changed, 150 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/653c504f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6f1eaac..14a9c67 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,7 +119,7 @@
     <google-cloud-bigdataoss.version>1.4.5</google-cloud-bigdataoss.version>
     <google-cloud-dataflow-java-proto-library-all.version>0.5.160304</google-cloud-dataflow-java-proto-library-all.version>
     <guava.version>19.0</guava.version>
-    <grpc.version>0.12.0</grpc.version>
+    <grpc.version>0.13.1</grpc.version>
     <hamcrest.version>1.3</hamcrest.version>
     <jackson.version>2.7.2</jackson.version>
     <findbugs.version>3.0.1</findbugs.version>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/653c504f/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 67c7fe9..9ec8f3d 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -311,7 +311,7 @@
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-handler</artifactId>
-      <version>4.1.0.Beta8</version>
+      <version>4.1.0.CR1</version>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/653c504f/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index c95ea71..c7e77f1 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -32,7 +32,7 @@
   <packaging>jar</packaging>
 
   <properties>
-    <bigtable.version>0.2.3</bigtable.version>
+    <bigtable.version>0.3.0</bigtable.version>
   </properties>
 
   <build>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/653c504f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index cddb333..47c68dd 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -47,6 +47,8 @@ import com.google.bigtable.v1.Row;
 import com.google.bigtable.v1.RowFilter;
 import com.google.bigtable.v1.SampleRowKeysResponse;
 import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.config.BulkOptions;
+import com.google.cloud.bigtable.config.RetryOptions;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.FutureCallback;
@@ -54,6 +56,8 @@ import com.google.common.util.concurrent.Futures;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Empty;
 
+import io.grpc.Status;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -204,6 +208,8 @@ public class BigtableIO {
       checkNotNull(optionsBuilder, "optionsBuilder");
       // TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
       BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
+      clonedBuilder.setDataChannelCount(1);
+      clonedBuilder = addRetryOptions(clonedBuilder);
       BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
       return new Read(optionsWithAgent, tableId, filter, bigtableService);
     }
@@ -388,6 +394,8 @@ public class BigtableIO {
       checkNotNull(optionsBuilder, "optionsBuilder");
       // TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
       BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
+      clonedBuilder = addBulkOptions(clonedBuilder);
+      clonedBuilder = addRetryOptions(clonedBuilder);
       BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
       return new Write(optionsWithAgent, tableId, bigtableService);
     }
@@ -1024,6 +1032,56 @@ public class BigtableIO {
         info.getName(),
         info.getVersion(),
         javaVersion,
-        "0.2.3" /* TODO get Bigtable client version directly from jar. */);
+        "0.3.0" /* TODO get Bigtable client version directly from jar. */);
+  }
+
+  /**
+   * A helper function to add appropriate bulk options. See
+   * <a href="https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues/899">RetryOptions
+   * toBuilder</a> for issue.
+   */
+  static BigtableOptions.Builder addBulkOptions(BigtableOptions.Builder builder) {
+    BulkOptions bulkOptions = builder.build().getBulkOptions();
+
+    BulkOptions.Builder bulkOptionsBuilder = new BulkOptions.Builder()
+        .setAsyncMutatorWorkerCount(bulkOptions.getAsyncMutatorCount())
+        .setUseBulkApi(true)
+        .setBulkMaxRowKeyCount(bulkOptions.getBulkMaxRowKeyCount())
+        .setBulkMaxRequestSize(bulkOptions.getBulkMaxRequestSize())
+        .setMaxInflightRpcs(bulkOptions.getMaxInflightRpcs())
+        .setMaxMemory(bulkOptions.getMaxMemory());
+
+    builder.setBulkOptions(bulkOptionsBuilder.build());
+    return builder;
+  }
+
+  /**
+   * A helper function to add appropriate retry options. See
+   * <a href="https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues/899">RetryOptions
+   * toBuilder</a> for issue.
+   */
+  static BigtableOptions.Builder addRetryOptions(BigtableOptions.Builder builder) {
+    RetryOptions retryOptions = builder.build().getRetryOptions();
+
+    RetryOptions.Builder retryOptionsBuilder = new RetryOptions.Builder()
+        .setEnableRetries(retryOptions.enableRetries())
+        .setInitialBackoffMillis(retryOptions.getInitialBackoffMillis())
+        .setBackoffMultiplier(retryOptions.getBackoffMultiplier())
+        .setMaxElapsedBackoffMillis(retryOptions.getMaxElaspedBackoffMillis())
+        .setStreamingBufferSize(retryOptions.getStreamingBufferSize())
+        .setStreamingBatchSize(Math.min(retryOptions.getStreamingBatchSize(),
+            retryOptions.getStreamingBufferSize() / 2))
+        .setReadPartialRowTimeoutMillis(retryOptions.getReadPartialRowTimeoutMillis())
+        .setMaxScanTimeoutRetries(retryOptions.getMaxScanTimeoutRetries())
+        .setAllowRetriesWithoutTimestamp(retryOptions.allowRetriesWithoutTimestamp());
+
+    for (Status.Code code : Status.Code.values()) {
+      if (retryOptions.isRetryable(code)) {
+        retryOptionsBuilder.addStatusToRetryOn(code);
+      }
+    }
+
+    builder.setRetryOptions(retryOptionsBuilder.build());
+    return builder;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/653c504f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index 5933e13..a0e6b29 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -30,8 +30,9 @@ import com.google.bigtable.v1.SampleRowKeysRequest;
 import com.google.bigtable.v1.SampleRowKeysResponse;
 import com.google.cloud.bigtable.config.BigtableOptions;
 import com.google.cloud.bigtable.grpc.BigtableSession;
+import com.google.cloud.bigtable.grpc.BigtableTableName;
 import com.google.cloud.bigtable.grpc.async.AsyncExecutor;
-import com.google.cloud.bigtable.grpc.async.HeapSizeManager;
+import com.google.cloud.bigtable.grpc.async.BulkMutation;
 import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
 import com.google.common.base.MoreObjects;
 import com.google.common.io.Closer;
@@ -65,7 +66,7 @@ class BigtableServiceImpl implements BigtableService {
   @Override
   public BigtableWriterImpl openForWriting(String tableId) throws IOException {
     BigtableSession session = new BigtableSession(options);
-    String tableName = options.getClusterName().toTableNameStr(tableId);
+    BigtableTableName tableName = options.getClusterName().toTableName(tableId);
     return new BigtableWriterImpl(session, tableName);
   }
 
@@ -170,24 +171,23 @@ class BigtableServiceImpl implements BigtableService {
   private static class BigtableWriterImpl implements Writer {
     private BigtableSession session;
     private AsyncExecutor executor;
+    private BulkMutation bulkMutation;
     private final MutateRowRequest.Builder partialBuilder;
 
-    public BigtableWriterImpl(BigtableSession session, String tableName) {
+    public BigtableWriterImpl(BigtableSession session, BigtableTableName tableName) {
       this.session = session;
-      this.executor =
-          new AsyncExecutor(
-              session.getDataClient(),
-              new HeapSizeManager(
-                  AsyncExecutor.ASYNC_MUTATOR_MAX_MEMORY_DEFAULT,
-                  AsyncExecutor.MAX_INFLIGHT_RPCS_DEFAULT));
-
-      partialBuilder = MutateRowRequest.newBuilder().setTableName(tableName);
+      executor = session.createAsyncExecutor();
+      bulkMutation = session.createBulkMutation(tableName, executor);
+
+      partialBuilder = MutateRowRequest.newBuilder().setTableName(tableName.toString());
     }
 
     @Override
     public void close() throws IOException {
       try {
-        if (executor != null) {
+        if (bulkMutation != null) {
+          bulkMutation.flush();
+          bulkMutation = null;
           executor.flush();
           executor = null;
         }
@@ -208,12 +208,7 @@ class BigtableServiceImpl implements BigtableService {
               .setRowKey(record.getKey())
               .addAllMutations(record.getValue())
               .build();
-      try {
-        return executor.mutateRowAsync(r);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new IOException("Write interrupted", e);
-      }
+      return bulkMutation.add(r);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/653c504f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index cdbaaac..6a6197e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -27,6 +27,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Verify.verifyNotNull;
 import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 
@@ -54,6 +55,8 @@ import com.google.bigtable.v1.Row;
 import com.google.bigtable.v1.RowFilter;
 import com.google.bigtable.v1.SampleRowKeysResponse;
 import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.config.BulkOptions;
+import com.google.cloud.bigtable.config.RetryOptions;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableList;
@@ -63,6 +66,8 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Empty;
 
+import io.grpc.Status;
+
 import org.hamcrest.Matchers;
 import org.junit.Before;
 import org.junit.Rule;
@@ -76,10 +81,12 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
@@ -520,6 +527,58 @@ public class BigtableIOTest {
     reader.close();
   }
 
+  @Test
+  public void testAddBulkOptions() {
+    BigtableOptions.Builder optionsBuilder = BIGTABLE_OPTIONS.toBuilder();
+    optionsBuilder = BigtableIO.addBulkOptions(optionsBuilder);
+
+    BulkOptions bulkOptions = optionsBuilder.build().getBulkOptions();
+    assertEquals(BulkOptions.BIGTABLE_ASYNC_MUTATOR_COUNT_DEFAULT,
+        bulkOptions.getAsyncMutatorCount());
+    assertEquals(true, bulkOptions.useBulkApi());
+    assertEquals(BulkOptions.BIGTABLE_BULK_MAX_ROW_KEY_COUNT_DEFAULT,
+        bulkOptions.getBulkMaxRowKeyCount());
+    assertEquals(BulkOptions.BIGTABLE_BULK_MAX_REQUEST_SIZE_BYTES_DEFAULT,
+        bulkOptions.getBulkMaxRequestSize());
+    assertEquals(BulkOptions.BIGTABLE_MAX_INFLIGHT_RPCS_PER_CHANNEL_DEFAULT
+        * optionsBuilder.getDataChannelCount(), bulkOptions.getMaxInflightRpcs());
+    assertEquals(BulkOptions.BIGTABLE_MAX_MEMORY_DEFAULT, bulkOptions.getMaxMemory());
+  }
+
+  @Test
+  public void testAddRetryOptions() {
+    final double delta = 0.0000001;
+    BigtableOptions.Builder optionsBuilder = BIGTABLE_OPTIONS.toBuilder();
+    optionsBuilder = BigtableIO.addRetryOptions(optionsBuilder);
+
+    RetryOptions retryOptions = optionsBuilder.build().getRetryOptions();
+    assertEquals(RetryOptions.DEFAULT_ENABLE_GRPC_RETRIES, retryOptions.enableRetries());
+    assertEquals(RetryOptions.DEFAULT_INITIAL_BACKOFF_MILLIS,
+        retryOptions.getInitialBackoffMillis());
+    assertEquals(RetryOptions.DEFAULT_BACKOFF_MULTIPLIER, retryOptions.getBackoffMultiplier(),
+        delta);
+    assertEquals(RetryOptions.DEFAULT_MAX_ELAPSED_BACKOFF_MILLIS,
+        retryOptions.getMaxElaspedBackoffMillis());
+    assertEquals(RetryOptions.DEFAULT_STREAMING_BUFFER_SIZE, retryOptions.getStreamingBufferSize());
+    assertEquals(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE, retryOptions.getStreamingBatchSize());
+    assertEquals(RetryOptions.DEFAULT_READ_PARTIAL_ROW_TIMEOUT_MS,
+        retryOptions.getReadPartialRowTimeoutMillis());
+    assertEquals(RetryOptions.DEFAULT_MAX_SCAN_TIMEOUT_RETRIES,
+        retryOptions.getMaxScanTimeoutRetries());
+    assertFalse(retryOptions.allowRetriesWithoutTimestamp());
+
+    Set<Status.Code> statusToRetryOn = new HashSet<>();
+    for (Status.Code code : Status.Code.values()) {
+      if (retryOptions.isRetryable(code)) {
+        statusToRetryOn.add(code);
+      }
+    }
+
+    Set<Status.Code> defaultStatusToRetryOn =
+        new HashSet<>(RetryOptions.DEFAULT_ENABLE_GRPC_RETRIES_SET);
+    assertThat(statusToRetryOn, Matchers.containsInAnyOrder(defaultStatusToRetryOn.toArray()));
+  }
+
   ////////////////////////////////////////////////////////////////////////////////////////////
   private static final String COLUMN_FAMILY_NAME = "family";
   private static final ByteString COLUMN_NAME = ByteString.copyFromUtf8("column");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/653c504f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
index af7afc5..8e17761 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
@@ -35,7 +35,9 @@ import com.google.bigtable.admin.table.v1.Table;
 import com.google.bigtable.v1.Mutation;
 import com.google.bigtable.v1.ReadRowsRequest;
 import com.google.bigtable.v1.Row;
+import com.google.bigtable.v1.RowRange;
 import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.config.RetryOptions;
 import com.google.cloud.bigtable.grpc.BigtableSession;
 import com.google.cloud.bigtable.grpc.BigtableTableAdminClient;
 import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
@@ -78,11 +80,17 @@ public class BigtableWriteIT implements Serializable {
     PipelineOptionsFactory.register(BigtableTestOptions.class);
     options = TestPipeline.testingPipelineOptions().as(BigtableTestOptions.class);
 
+    // RetryOptions streamingBatchSize must be explicitly set for getTableData()
+    RetryOptions.Builder retryOptionsBuilder = new RetryOptions.Builder();
+    retryOptionsBuilder.setStreamingBatchSize(
+        retryOptionsBuilder.build().getStreamingBufferSize() / 2);
+
     BigtableOptions.Builder bigtableOptionsBuilder = new BigtableOptions.Builder()
         .setProjectId(options.getProjectId())
         .setClusterId(options.getClusterId())
         .setZoneId(options.getZoneId())
-        .setUserAgent("apache-beam-test");
+        .setUserAgent("apache-beam-test")
+        .setRetryOptions(retryOptionsBuilder.build());
     bigtableOptions = bigtableOptionsBuilder.build();
 
     session = new BigtableSession(bigtableOptions);
@@ -172,9 +180,15 @@ public class BigtableWriteIT implements Serializable {
 
   /** Helper function to get a table's data. */
   private List<KV<ByteString, ByteString>> getTableData(String tableName) throws
IOException {
+    // Add empty range to avoid TARGET_NOT_SET error
+    RowRange range = RowRange.newBuilder()
+        .setStartKey(ByteString.EMPTY)
+        .setEndKey(ByteString.EMPTY)
+        .build();
     List<KV<ByteString, ByteString>> tableData = new ArrayList<>();
     ReadRowsRequest.Builder readRowsRequestBuilder = ReadRowsRequest.newBuilder()
-        .setTableName(tableName);
+        .setTableName(tableName)
+        .setRowRange(range);
     ResultScanner<Row> scanner = session.getDataClient().readRows(readRowsRequestBuilder.build());
 
     Row currentRow;


Mime
View raw message