Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 53A21200CB4 for ; Mon, 12 Jun 2017 18:55:21 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5269F160BF3; Mon, 12 Jun 2017 16:55:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6AD9F160BEF for ; Mon, 12 Jun 2017 18:55:20 +0200 (CEST) Received: (qmail 9342 invoked by uid 500); 12 Jun 2017 16:55:19 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 8562 invoked by uid 99); 12 Jun 2017 16:55:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Jun 2017 16:55:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6573BF4A4A; Mon, 12 Jun 2017 16:55:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.apache.org Date: Mon, 12 Jun 2017 16:55:36 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [21/50] [abbrv] beam git commit: Raise entity limit per RPC to 9MB. archived-at: Mon, 12 Jun 2017 16:55:21 -0000 Raise entity limit per RPC to 9MB. This is closer to the API limit, while still leaving room for overhead. Brings the Java SDK into line with the Python SDK. Switch the unit test to use the size of each entity, which is what the connector is actually using, rather than the property size (which is slightly smaller and would cause the test to fail for some values). Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/156f326a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/156f326a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/156f326a Branch: refs/heads/gearpump-runner Commit: 156f326a16e15b4e22a189a2a263d11d7b273656 Parents: fdfc70e Author: Colin Phipps Authored: Mon Jun 5 12:12:49 2017 +0000 Committer: Ahmet Altay Committed: Thu Jun 8 10:57:09 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/datastore/DatastoreV1.java | 2 +- .../beam/sdk/io/gcp/datastore/DatastoreV1Test.java | 16 +++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/156f326a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java index b198a6f..06b9c8a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java @@ -213,7 +213,7 @@ public class DatastoreV1 { * the mutations themselves and not the CommitRequest wrapper around them. */ @VisibleForTesting - static final int DATASTORE_BATCH_UPDATE_BYTES_LIMIT = 5_000_000; + static final int DATASTORE_BATCH_UPDATE_BYTES_LIMIT = 9_000_000; /** * Returns an empty {@link DatastoreV1.Read} builder. Configure the source {@code projectId}, http://git-wip-us.apache.org/repos/asf/beam/blob/156f326a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java index 460049e..229b1fb 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java @@ -651,12 +651,14 @@ public class DatastoreV1Test { @Test public void testDatatoreWriterFnWithLargeEntities() throws Exception { List mutations = new ArrayList<>(); - int propertySize = 900_000; + int entitySize = 0; for (int i = 0; i < 12; ++i) { - Entity.Builder entity = Entity.newBuilder().setKey(makeKey("key" + i, i + 1)); - entity.putProperties("long", makeValue(new String(new char[propertySize]) - ).setExcludeFromIndexes(true).build()); - mutations.add(makeUpsert(entity.build()).build()); + Entity entity = Entity.newBuilder().setKey(makeKey("key" + i, i + 1)) + .putProperties("long", makeValue(new String(new char[900_000]) + ).setExcludeFromIndexes(true).build()) + .build(); + entitySize = entity.getSerializedSize(); // Take the size of any one entity. + mutations.add(makeUpsert(entity).build()); } DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(StaticValueProvider.of(PROJECT_ID), @@ -667,10 +669,10 @@ public class DatastoreV1Test { // This test is over-specific currently; it requires that we split the 12 entity writes into 3 // requests, but we only need each CommitRequest to be less than 10MB in size. - int propertiesPerRpc = DATASTORE_BATCH_UPDATE_BYTES_LIMIT / propertySize; + int entitiesPerRpc = DATASTORE_BATCH_UPDATE_BYTES_LIMIT / entitySize; int start = 0; while (start < mutations.size()) { - int end = Math.min(mutations.size(), start + propertiesPerRpc); + int end = Math.min(mutations.size(), start + entitiesPerRpc); CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); commitRequest.addAllMutations(mutations.subList(start, end));