beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [1/2] beam git commit: [BEAM-662] Fix for allowing floating point periods in windows
Date Thu, 20 Apr 2017 15:54:13 GMT
Repository: beam
Updated Branches:
  refs/heads/master 4e0c8333c -> 4e0d5f596


[BEAM-662] Fix for allowing floating point periods in windows


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1bc1bdd3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1bc1bdd3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1bc1bdd3

Branch: refs/heads/master
Commit: 1bc1bdd33494b4123855e2e3c9fa823654b31998
Parents: 4e0c833
Author: Sourabh Bajaj <sourabhbajaj@google.com>
Authored: Wed Apr 19 18:20:11 2017 -0700
Committer: Robert Bradshaw <robertwb@gmail.com>
Committed: Thu Apr 20 08:53:55 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/transforms/window.py      | 10 ++++++----
 sdks/python/apache_beam/transforms/window_test.py | 14 ++++++++++++++
 sdks/python/apache_beam/utils/timestamp.py        |  4 ----
 3 files changed, 20 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1bc1bdd3/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 319a7b4..931a17d 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -388,13 +388,15 @@ class SlidingWindows(NonMergingWindowFn):
       raise ValueError('The size parameter must be strictly positive.')
     self.size = Duration.of(size)
     self.period = Duration.of(period)
-    self.offset = Timestamp.of(offset) % size
+    self.offset = Timestamp.of(offset) % period
 
   def assign(self, context):
     timestamp = context.timestamp
-    start = timestamp - (timestamp - self.offset) % self.period
-    return [IntervalWindow(Timestamp.of(s), Timestamp.of(s) + self.size)
-            for s in range(start, start - self.size, -self.period)]
+    start = timestamp - ((timestamp - self.offset) % self.period)
+    return [
+        IntervalWindow(Timestamp(micros=s), Timestamp(micros=s) + self.size)
+        for s in range(start.micros, timestamp.micros - self.size.micros,
+                       -self.period.micros)]
 
   def __eq__(self, other):
     if type(self) == type(other) == SlidingWindows:

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc1bdd3/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index 1ac95e4..cbfd0b2 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -108,6 +108,20 @@ class WindowTest(unittest.TestCase):
     self.assertEqual(expected, windowfn.assign(context('v', 8)))
     self.assertEqual(expected, windowfn.assign(context('v', 11)))
 
+  def test_sliding_windows_assignment_fraction(self):
+    windowfn = SlidingWindows(size=3.5, period=2.5, offset=1.5)
+    self.assertEqual([IntervalWindow(1.5, 5.0), IntervalWindow(-1.0, 2.5)],
+                     windowfn.assign(context('v', 1.7)))
+    self.assertEqual([IntervalWindow(1.5, 5.0)],
+                     windowfn.assign(context('v', 3)))
+
+  def test_sliding_windows_assignment_fraction_large_offset(self):
+    windowfn = SlidingWindows(size=3.5, period=2.5, offset=4.0)
+    self.assertEqual([IntervalWindow(1.5, 5.0), IntervalWindow(-1.0, 2.5)],
+                     windowfn.assign(context('v', 1.7)))
+    self.assertEqual([IntervalWindow(4.0, 7.5), IntervalWindow(1.5, 5.0)],
+                     windowfn.assign(context('v', 4.5)))
+
   def test_sessions_merging(self):
     windowfn = Sessions(10)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc1bdd3/sdks/python/apache_beam/utils/timestamp.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/timestamp.py b/sdks/python/apache_beam/utils/timestamp.py
index 647f4bd..8b2ccda 100644
--- a/sdks/python/apache_beam/utils/timestamp.py
+++ b/sdks/python/apache_beam/utils/timestamp.py
@@ -167,10 +167,6 @@ class Duration(object):
     # Note that the returned value may have lost precision.
     return float(self.micros) / 1000000
 
-  def __int__(self):
-    # Note that the returned value may have lost precision.
-    return self.micros / 1000000
-
   def __cmp__(self, other):
     # Allow comparisons between Duration and Timestamp values.
     if not isinstance(other, Timestamp):


Mime
View raw message