beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [beam] 01/01: [BEAM-4858] Clean up and improve batch size estimator.
Date Wed, 03 Oct 2018 10:16:47 GMT
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch jira-4858
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 5d4dc86f2ada8958a1aa34a66b41e917587403ff
Author: Robert Bradshaw <robertwb@google.com>
AuthorDate: Thu Sep 20 16:13:36 2018 +0200

    [BEAM-4858] Clean up and improve batch size estimator.
    
    In particular, this change
    
    * Simplifies the aging out of old data. Not only was the old formula hard to
      understand, but it meant that bad data could stick around and poison the
      estimates forever.
    * Adds a variance parameter allowing the batch size to vary over a fixed
      range giving a broader base for the linear regression.
    * Uses numpy when available to do the regression.  This is both much more
      efficient and allows for less error-prone expression of more complicated
      analysis.
    
    The algorithm was also changed to:
    
    * Eliminates outliers, both using Cook's distance and just throwing out the
      top (often high-variance and high-influence) 20% when there is sufficient
      data.
    * Weight by the inverse of batch size, to provide a more stable fixed size
      estimate (which the default "overhead" target is sensitive to).
    
    These changes were tested against a large TFT pipeline and found to produce
    more uniform batch sizes and similar, possibly slightly improved, total
    runtimes and total costs.
---
 sdks/python/apache_beam/transforms/util.py      | 112 ++++++++++++++++++------
 sdks/python/apache_beam/transforms/util_test.py |  70 +++++++++++++++
 2 files changed, 153 insertions(+), 29 deletions(-)

diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 8a99969..1746d09 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -30,7 +30,6 @@ from builtins import range
 from builtins import zip
 
 from future.utils import itervalues
-from past.utils import old_div
 
 from apache_beam import typehints
 from apache_beam.metrics import Metrics
@@ -213,6 +212,7 @@ class _BatchSizeEstimator(object):
                max_batch_size=1000,
                target_batch_overhead=.1,
                target_batch_duration_secs=1,
+               variance=0.25,
                clock=time.time):
     if min_batch_size > max_batch_size:
       raise ValueError("Minimum (%s) must not be greater than maximum (%s)" % (
@@ -230,6 +230,7 @@ class _BatchSizeEstimator(object):
     self._max_batch_size = max_batch_size
     self._target_batch_overhead = target_batch_overhead
     self._target_batch_duration_secs = target_batch_duration_secs
+    self._variance = variance
     self._clock = clock
     self._data = []
     self._ignore_next_timing = False
@@ -269,23 +270,62 @@ class _BatchSizeEstimator(object):
         self._thin_data()
 
   def _thin_data(self):
-    sorted_data = sorted(self._data)
-    odd_one_out = [sorted_data[-1]] if len(sorted_data) % 2 == 1 else []
-    # Sort the pairs by how different they are.
-
-    def div_keys(kv1_kv2):
-      (x1, _), (x2, _) = kv1_kv2
-      return old_div(x2, x1) # TODO(BEAM-4858)
-
-    pairs = sorted(zip(sorted_data[::2], sorted_data[1::2]),
-                   key=div_keys)
-    # Keep the top 1/3 most different pairs, average the top 2/3 most similar.
-    threshold = 2 * len(pairs) // 3
-    self._data = (
-        list(sum(pairs[threshold:], ()))
-        + [((x1 + x2) / 2.0, (t1 + t2) / 2.0)
-           for (x1, t1), (x2, t2) in pairs[:threshold]]
-        + odd_one_out)
+    # Make sure we don't change the parity of len(self._data)
+    # As it's used below to alternate jitter.
+    self._data.pop(random.randrange(len(self._data) // 4))
+    self._data.pop(random.randrange(len(self._data) // 2))
+
+  @staticmethod
+  def linear_regression_no_numpy(xs, ys):
+    # Least squares fit for y = a + bx over all points.
+    n = float(len(xs))
+    xbar = sum(xs) / n
+    ybar = sum(ys) / n
+    b = (sum([(x - xbar) * (y - ybar) for x, y in zip(xs, ys)])
+         / sum([(x - xbar)**2 for x in xs]))
+    a = ybar - b * xbar
+    return a, b
+
+  @staticmethod
+  def linear_regression_numpy(xs, ys):
+    # pylint: disable=wrong-import-order, wrong-import-position
+    import numpy as np
+    from numpy import sum
+    xs = np.asarray(xs, dtype=float)
+    ys = np.asarray(ys, dtype=float)
+
+    # First do a simple least squares fit for y = a + bx over all points.
+    b, a = np.polyfit(xs, ys, 1)
+
+    n = len(xs)
+    if n < 10:
+      return a, b
+    else:
+      # Refine this by throwing out outliers, according to Cook's distance.
+      # https://en.wikipedia.org/wiki/Cook%27s_distance
+      sum_x = sum(xs)
+      sum_x2 = sum(xs**2)
+      errs = a + b * xs - ys
+      s2 = sum(errs**2) / (n - 2)
+      if s2 == 0:
+        # It's an exact fit!
+        return a, b
+      h = (sum_x2 - 2 * sum_x * xs + n * xs**2) / (n * sum_x2 - sum_x**2)
+      cook_ds = 0.5 / s2 * errs**2 * (h / (1 - h)**2)
+
+      # Re-compute the regression, excluding those points with Cook's distance
+      # greater than 0.5, and weighting by the inverse of x to give a more
+      # stable y-intercept.
+      weight = (cook_ds <= 0.5) / xs
+      b, a = np.polyfit(xs, ys, 1, w=weight)
+      return a, b
+
+  try:
+    # pylint: disable=wrong-import-order, wrong-import-position
+    import numpy as np
+    linear_regression = linear_regression_numpy
+  except ImportError:
+    linear_regression = linear_regression_no_numpy
 
   def next_batch_size(self):
     if self._min_batch_size == self._max_batch_size:
@@ -300,14 +340,14 @@ class _BatchSizeEstimator(object):
               self._min_batch_size * self._MAX_GROWTH_FACTOR),
           self._min_batch_size + 1))
 
+    # There tends to be a lot of noise in the top quantile, which also
+    # has outsided influence in the regression.  If we have enough data,
+    # Simply declare the top 20% to be outliers.
+    trimmed_data = sorted(self._data)[:max(20, len(self._data) * 4 // 5)]
+
     # Linear regression for y = a + bx, where x is batch size and y is time.
-    xs, ys = zip(*self._data)
-    n = float(len(self._data))
-    xbar = sum(xs) / n
-    ybar = sum(ys) / n
-    b = (sum([(x - xbar) * (y - ybar) for x, y in self._data])
-         / sum([(x - xbar)**2 for x in xs]))
-    a = ybar - b * xbar
+    xs, ys = zip(*trimmed_data)
+    a, b = self.linear_regression(xs, ys)
 
     # Avoid nonsensical or division-by-zero errors below due to noise.
     a = max(a, 1e-10)
@@ -316,17 +356,26 @@ class _BatchSizeEstimator(object):
     last_batch_size = self._data[-1][0]
     cap = min(last_batch_size * self._MAX_GROWTH_FACTOR, self._max_batch_size)
 
+    target = self._max_batch_size
+
     if self._target_batch_duration_secs:
       # Solution to a + b*x = self._target_batch_duration_secs.
-      cap = min(cap, (self._target_batch_duration_secs - a) / b)
+      target = min(target, (self._target_batch_duration_secs - a) / b)
 
     if self._target_batch_overhead:
       # Solution to a / (a + b*x) = self._target_batch_overhead.
-      cap = min(cap, (a / b) * (1 / self._target_batch_overhead - 1))
+      target = min(target, (a / b) * (1 / self._target_batch_overhead - 1))
 
-    # Avoid getting stuck at min_batch_size.
+    # Avoid getting stuck at a single batch size (especially the minimal
+    # batch size) which would not allow us to extrapolate to other batch
+    # sizes.
+    # Jitter alternates between 0 and 1.
     jitter = len(self._data) % 2
-    return int(max(self._min_batch_size + jitter, cap))
+    # Smear our samples across a range centered at the target.
+    if len(self._data) > 10:
+      target += int(target * self._variance * 2 * (random.random() - .5))
+
+    return int(max(self._min_batch_size + jitter, min(target, cap)))
 
 
 class _GlobalWindowsBatchingDoFn(DoFn):
@@ -425,6 +474,9 @@ class BatchElements(PTransform):
         as used in the formula above
     target_batch_duration_secs: (optional) a target for total time per bundle,
         in seconds
+    variance: (optional) the permitted (relative) amount of deviation from the
+        (estimated) ideal batch size used to produce a wider base for
+        linear interpolation
     clock: (optional) an alternative to time.time for measuring the cost of
         donwstream operations (mostly for testing)
   """
@@ -434,12 +486,14 @@ class BatchElements(PTransform):
                max_batch_size=10000,
                target_batch_overhead=.05,
                target_batch_duration_secs=1,
+               variance=0.25,
                clock=time.time):
     self._batch_size_estimator = _BatchSizeEstimator(
         min_batch_size=min_batch_size,
         max_batch_size=max_batch_size,
         target_batch_overhead=target_batch_overhead,
         target_batch_duration_secs=target_batch_duration_secs,
+        variance=variance,
         clock=clock)
 
   def expand(self, pcoll):
diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py
index 6cec4a5..e592f93 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -20,6 +20,7 @@
 from __future__ import absolute_import
 
 import logging
+import random
 import time
 import unittest
 from builtins import object
@@ -125,6 +126,75 @@ class BatchElementsTest(unittest.TestCase):
         clock.sleep(batch_duration(actual_sizes[-1]))
     self.assertEqual(expected_sizes, actual_sizes)
 
+  def test_variance(self):
+    clock = FakeClock()
+    variance = 0.25
+    batch_estimator = util._BatchSizeEstimator(
+        target_batch_overhead=.05, target_batch_duration_secs=None,
+        variance=variance, clock=clock)
+    batch_duration = lambda batch_size: 1 + .7 * batch_size
+    expected_target = 27
+    actual_sizes = []
+    for _ in range(util._BatchSizeEstimator._MAX_DATA_POINTS - 1):
+      actual_sizes.append(batch_estimator.next_batch_size())
+      with batch_estimator.record_time(actual_sizes[-1]):
+        clock.sleep(batch_duration(actual_sizes[-1]))
+    # Check that we're testing a good range of values.
+    stable_set = set(actual_sizes[-20:])
+    self.assertGreater(len(stable_set), 3)
+    self.assertGreater(
+        min(stable_set), expected_target - expected_target * variance)
+    self.assertLess(
+        max(stable_set), expected_target + expected_target * variance)
+
+  def _run_regression_test(self, linear_regression_fn, test_outliers):
+    xs = [random.random() for _ in range(10)]
+    ys = [2*x + 1 for x in xs]
+    a, b = linear_regression_fn(xs, ys)
+    self.assertAlmostEqual(a, 1)
+    self.assertAlmostEqual(b, 2)
+
+    xs = [1 + random.random() for _ in range(100)]
+    ys = [7*x + 5 + 0.01 * random.random() for x in xs]
+    a, b = linear_regression_fn(xs, ys)
+    self.assertAlmostEqual(a, 5, delta=0.01)
+    self.assertAlmostEqual(b, 7, delta=0.01)
+
+    if test_outliers:
+      xs = [1 + random.random() for _ in range(100)]
+      ys = [2*x + 1 for x in xs]
+      a, b = linear_regression_fn(xs, ys)
+      self.assertAlmostEqual(a, 1)
+      self.assertAlmostEqual(b, 2)
+
+      # An outlier or two doesn't affect the result.
+      for _ in range(2):
+        xs += [10]
+        ys += [30]
+        a, b = linear_regression_fn(xs, ys)
+        self.assertAlmostEqual(a, 1)
+        self.assertAlmostEqual(b, 2)
+
+      # But enough of them, and they're no longer outliers.
+      xs += [10] * 10
+      ys += [30] * 10
+      a, b = linear_regression_fn(xs, ys)
+      self.assertLess(a, 0.5)
+      self.assertGreater(b, 2.5)
+
+  def test_no_numpy_regression(self):
+    self._run_regression_test(
+        util._BatchSizeEstimator.linear_regression_no_numpy, False)
+
+  def test_numpy_regression(self):
+    try:
+      # pylint: disable=wrong-import-order, wrong-import-position
+      import numpy as _
+    except ImportError:
+      self.skipTest('numpy not available')
+    self._run_regression_test(
+        util._BatchSizeEstimator.linear_regression_numpy, True)
+
 
 class IdentityWindowTest(unittest.TestCase):
 


Mime
View raw message