beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-5272) Randomize the reduced splits in BigtableIO so that multiple workers may not hit the same tablet server
Date Thu, 27 Sep 2018 20:18:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-5272?focusedWorklogId=148879&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-148879
]

ASF GitHub Bot logged work on BEAM-5272:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Sep/18 20:17
            Start Date: 27/Sep/18 20:17
    Worklog Time Spent: 10m 
      Work Description: aaltay closed pull request #6503: [BEAM-5272] Randomize the reduced
splits in BigtableIO so that multiple workers may not hit the same tablet server
URL: https://github.com/apache/beam/pull/6503
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index edad185323c..755d889b491 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -848,18 +848,25 @@ protected BigtableSource withEstimatedSizeBytes(Long estimatedSizeBytes)
{
       // Delegate to testable helper.
       List<BigtableSource> splits =
           splitBasedOnSamples(desiredBundleSizeBytes, getSampleRowKeys(options));
-      return reduceSplits(splits, options, MAX_SPLIT_COUNT);
+
+      // Reduce the splits.
+      List<BigtableSource> reduced = reduceSplits(splits, options, MAX_SPLIT_COUNT);
+      // Randomize the result before returning an immutable copy of the splits, the default
behavior
+      // may lead to multiple workers hitting the same tablet.
+      Collections.shuffle(reduced);
+      return ImmutableList.copyOf(reduced);
     }
 
+    /** Returns a mutable list of reduced splits. */
     @VisibleForTesting
     protected List<BigtableSource> reduceSplits(
         List<BigtableSource> splits, PipelineOptions options, long maxSplitCounts)
         throws IOException {
       int numberToCombine = (int) ((splits.size() + maxSplitCounts - 1) / maxSplitCounts);
       if (splits.size() < maxSplitCounts || numberToCombine < 2) {
-        return splits;
+        return new ArrayList<>(splits);
       }
-      ImmutableList.Builder<BigtableSource> reducedSplits = ImmutableList.builder();
+      List<BigtableSource> reducedSplits = new ArrayList<>();
       List<ByteKeyRange> previousSourceRanges = new ArrayList<ByteKeyRange>();
       int counter = 0;
       long size = 0;
@@ -879,7 +886,7 @@ protected BigtableSource withEstimatedSizeBytes(Long estimatedSizeBytes)
{
       if (size > 0) {
         reducedSplits.add(new BigtableSource(config, filter, previousSourceRanges, size));
       }
-      return reducedSplits.build();
+      return reducedSplits;
     }
 
     /**
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index cadb908be5a..54a2fee99b0 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -102,7 +102,7 @@
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.hamcrest.Matchers;
-import org.hamcrest.collection.IsIterableContainingInOrder;
+import org.hamcrest.collection.IsIterableContainingInAnyOrder;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -736,10 +736,10 @@ public void testReduceSplitsWithSomeNonAdjacentRanges() throws Exception
{
             keyRanges,
             null /*size*/);
 
-    List<BigtableSource> splits =
-        source.split(numRows * bytesPerRow / numSamples, null /* options */);
-
-    assertThat(splits, hasSize(keyRanges.size()));
+    List<BigtableSource> splits = new ArrayList<>();
+    for (ByteKeyRange range : keyRanges) {
+      splits.add(source.withSingleRange(range));
+    }
 
     List<BigtableSource> reducedSplits = source.reduceSplits(splits, null, maxSplit);
 
@@ -753,7 +753,8 @@ public void testReduceSplitsWithSomeNonAdjacentRanges() throws Exception
{
 
     assertThat(
         actualRangesAfterSplit,
-        IsIterableContainingInOrder.contains(expectedKeyRangesAfterReducedSplits.toArray()));
+        IsIterableContainingInAnyOrder.containsInAnyOrder(
+            expectedKeyRangesAfterReducedSplits.toArray()));
   }
 
   /** Tests reduce split with all non adjacent ranges. */
@@ -786,10 +787,10 @@ public void testReduceSplitsWithAllNonAdjacentRange() throws Exception
{
             keyRanges,
             null /*size*/);
 
-    List<BigtableSource> splits =
-        source.split(numRows * bytesPerRow / numSamples, null /* options */);
-
-    assertThat(splits, hasSize(keyRanges.size()));
+    List<BigtableSource> splits = new ArrayList<>();
+    for (ByteKeyRange range : keyRanges) {
+      splits.add(source.withSingleRange(range));
+    }
 
     List<BigtableSource> reducedSplits = source.reduceSplits(splits, null, maxSplit);
 
@@ -801,8 +802,10 @@ public void testReduceSplitsWithAllNonAdjacentRange() throws Exception
{
 
     assertAllSourcesHaveSingleRanges(reducedSplits);
 
-    //The expected split source ranges are exactly same as original
-    assertThat(actualRangesAfterSplit, IsIterableContainingInOrder.contains(keyRanges.toArray()));
+    // The expected split source ranges are exactly same as original
+    assertThat(
+        actualRangesAfterSplit,
+        IsIterableContainingInAnyOrder.containsInAnyOrder(keyRanges.toArray()));
   }
 
   /** Tests reduce Splits with all adjacent ranges. */
@@ -826,10 +829,22 @@ public void tesReduceSplitsWithAdjacentRanges() throws Exception {
             Arrays.asList(ByteKeyRange.ALL_KEYS),
             null /*size*/);
 
-    List<BigtableSource> splits =
-        source.split(numRows * bytesPerRow / numSamples, null /* options */);
-
-    assertThat(splits, hasSize(numSamples));
+    List<BigtableSource> splits = new ArrayList<>();
+    List<ByteKeyRange> keyRanges =
+        Arrays.asList(
+            ByteKeyRange.of(ByteKey.EMPTY, createByteKey(1)),
+            ByteKeyRange.of(createByteKey(1), createByteKey(2)),
+            ByteKeyRange.of(createByteKey(2), createByteKey(3)),
+            ByteKeyRange.of(createByteKey(3), createByteKey(4)),
+            ByteKeyRange.of(createByteKey(4), createByteKey(5)),
+            ByteKeyRange.of(createByteKey(5), createByteKey(6)),
+            ByteKeyRange.of(createByteKey(6), createByteKey(7)),
+            ByteKeyRange.of(createByteKey(7), createByteKey(8)),
+            ByteKeyRange.of(createByteKey(8), createByteKey(9)),
+            ByteKeyRange.of(createByteKey(9), ByteKey.EMPTY));
+    for (ByteKeyRange range : keyRanges) {
+      splits.add(source.withSingleRange(range));
+    }
 
     //Splits Source have ranges [..1][1..2][2..3][3..4][4..5][5..6][6..7][7..8][8..9][9..]
     //expected reduced Split source ranges are [..4][4..8][8..]
@@ -849,7 +864,8 @@ public void tesReduceSplitsWithAdjacentRanges() throws Exception {
 
     assertThat(
         actualRangesAfterSplit,
-        IsIterableContainingInOrder.contains(expectedKeyRangesAfterReducedSplits.toArray()));
+        IsIterableContainingInAnyOrder.containsInAnyOrder(
+            expectedKeyRangesAfterReducedSplits.toArray()));
     assertAllSourcesHaveSingleAdjacentRanges(reducedSplits);
     assertSourcesEqualReferenceSource(source, reducedSplits, null /* options */);
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 148879)
    Time Spent: 1.5h  (was: 1h 20m)

> Randomize the reduced splits in BigtableIO so that multiple workers may not hit the same
tablet server
> ------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-5272
>                 URL: https://issues.apache.org/jira/browse/BEAM-5272
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-gcp
>            Reporter: Kevin Si
>            Assignee: Chamikara Jayalath
>            Priority: Minor
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Randomize the reduced splits in BigtableIO so that multiple workers may not hit the same
tablet server.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message