beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-4565) Hot key fanout should not distribute keys to all shards.
Date Thu, 21 Jun 2018 17:28:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-4565?focusedWorklogId=114397&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114397
]

ASF GitHub Bot logged work on BEAM-4565:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Jun/18 17:27
            Start Date: 21/Jun/18 17:27
    Worklog Time Spent: 10m 
      Work Description: robertwb closed pull request #5649: [BEAM-4565] Fix hot key fanout
in the face of combiner lifting.
URL: https://github.com/apache/beam/pull/5649
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 5ad685a5b9e..dd5e551b7a6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -1852,21 +1852,25 @@ public void populateDisplayData(DisplayData.Builder builder) {
       final TupleTag<KV<K, InputT>> cold = new TupleTag<>();
       PCollectionTuple split = input.apply("AddNonce", ParDo.of(
           new DoFn<KV<K, InputT>, KV<K, InputT>>() {
-            transient int counter;
+            transient int nonce;
             @StartBundle
             public void startBundle() {
-              counter = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
+              // Spreading a hot key across all possible sub-keys for all bundles
+              // would defeat the goal of not overwhelming downstream reducers
+              // (as well as making less efficient use of PGBK combining tables).
+              // Instead, each bundle independently makes a consistent choice about
+              // which "shard" of a key to send its intermediate results.
+              nonce = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
             }
 
             @ProcessElement
             public void processElement(@Element KV<K, InputT> kv,
                                        MultiOutputReceiver receiver) {
-              int spread = Math.max(1, hotKeyFanout.apply(kv.getKey()));
+              int spread = hotKeyFanout.apply(kv.getKey());
               if (spread <= 1) {
                 receiver.get(cold).output(kv);
               } else {
-                int nonce = counter++ % spread;
-                receiver.get(hot).output(KV.of(KV.of(kv.getKey(), nonce), kv.getValue()));
+                receiver.get(hot).output(KV.of(KV.of(kv.getKey(), nonce % spread), kv.getValue()));
               }
             }
           })
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index afba5558620..f372e881024 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -336,6 +336,14 @@ def test_hot_key_fanout_sharded(self):
               lambda key: random.randrange(0, 5)))
       assert_that(result, equal_to([(None, 499.5)]))
 
+  def test_global_fanout(self):
+    with TestPipeline() as p:
+      result = (
+          p
+          | beam.Create(range(100))
+          | beam.CombineGlobally(combine.MeanCombineFn()).with_fanout(11))
+      assert_that(result, equal_to([49.5]))
+
 
 if __name__ == '__main__':
   unittest.main()
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index f02609ad2c9..331eb4210a5 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1118,7 +1118,7 @@ def _clone(self, **extra_attributes):
     return clone
 
   def with_fanout(self, fanout):
-    return self._clone(fanout=fanout)
+    return self._clone(fanout=self.fanout)
 
   def with_defaults(self, has_defaults=True):
     return self._clone(has_defaults=has_defaults)
@@ -1389,7 +1389,12 @@ def expand(self, pcoll):
     class SplitHotCold(DoFn):
 
       def start_bundle(self):
-        self.counter = random.randrange(10000)
+        # Spreading a hot key across all possible sub-keys for all bundles
+        # would defeat the goal of not overwhelming downstream reducers
+        # (as well as making less efficient use of PGBK combining tables).
+        # Instead, each bundle independently makes a consistent choice about
+        # which "shard" of a key to send its intermediate results.
+        self._nonce = int(random.getrandbits(31))
 
       def process(self, element):
         key, value = element
@@ -1398,10 +1403,7 @@ def process(self, element):
           # Boolean indicates this is not an accumulator.
           yield pvalue.TaggedOutput('cold', (key, (False, value)))
         else:
-          # Round-robin should spread things more evenly than random assignment.
-          self.counter += 1.
-          yield pvalue.TaggedOutput('hot',
-                                    ((self.counter % fanout, key), value))
+          yield pvalue.TaggedOutput('hot', ((self._nonce % fanout, key), value))
 
     class PreCombineFn(CombineFn):
       @staticmethod


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 114397)
    Time Spent: 50m  (was: 40m)

> Hot key fanout should not distribute keys to all shards.
> --------------------------------------------------------
>
>                 Key: BEAM-4565
>                 URL: https://issues.apache.org/jira/browse/BEAM-4565
>             Project: Beam
>          Issue Type: Task
>          Components: sdk-java-core, sdk-py-core
>    Affects Versions: 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0, 2.5.0
>            Reporter: Robert Bradshaw
>            Assignee: Kenneth Knowles
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> The goal is to reduce the number of value sent to a single post-GBK worker. If combiner
lifting happens, each bundle will sends a single value per sub-key, causing an N-fold blowup
in shuffle data and N reducers with the same amount of data to consume as the single reducer
in the non-fanout case. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message