beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [11/50] [abbrv] beam git commit: [BEAM-1502] GroupByKey should not return bare lists in DirectRunner.
Date Thu, 20 Jul 2017 19:53:07 GMT
[BEAM-1502] GroupByKey should not return bare lists in DirectRunner.

This leads to invalidated expectations on other runners.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e7059e5c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e7059e5c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e7059e5c

Branch: refs/heads/DSL_SQL
Commit: e7059e5cb3cd07855582641798c58fc3cf5cd682
Parents: 532256e
Author: Robert Bradshaw <robertwb@google.com>
Authored: Mon Jul 17 13:44:40 2017 -0700
Committer: Robert Bradshaw <robertwb@google.com>
Committed: Mon Jul 17 15:08:02 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/examples/snippets/snippets.py   |  2 +-
 sdks/python/apache_beam/transforms/core.py      |  2 +-
 sdks/python/apache_beam/transforms/trigger.py   | 21 +++++++++++++++-----
 3 files changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e7059e5c/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 3a5f9b1..27b8120 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -1136,7 +1136,7 @@ def model_group_by_key(contents, output_path):
     grouped_words = words_and_counts | beam.GroupByKey()
     # [END model_group_by_key_transform]
     (grouped_words
-     | 'count words' >> beam.Map(lambda (word, counts): (word, len(counts)))
+     | 'count words' >> beam.Map(lambda (word, counts): (word, sum(counts)))
      | beam.io.WriteToText(output_path))
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e7059e5c/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 8018219..92b8737 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1017,7 +1017,7 @@ class CombineValuesDoFn(DoFn):
            self.combinefn.apply(element[1], *args, **kwargs))]
 
     # Add the elements into three accumulators (for testing of merge).
-    elements = element[1]
+    elements = list(element[1])
     accumulators = []
     for k in range(3):
       if len(elements) <= k:

http://git-wip-us.apache.org/repos/asf/beam/blob/e7059e5c/sdks/python/apache_beam/transforms/trigger.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index f77fa1a..c1fbfc5 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -24,6 +24,7 @@ from abc import ABCMeta
 from abc import abstractmethod
 import collections
 import copy
+import itertools
 
 from apache_beam.coders import observable
 from apache_beam.transforms import combiners
@@ -878,6 +879,17 @@ class _UnwindowedValues(observable.ObservableMixin):
   def __reduce__(self):
     return list, (list(self),)
 
+  def __eq__(self, other):
+    if isinstance(other, collections.Iterable):
+      return all(
+          a == b
+          for a, b in itertools.izip_longest(self, other, fillvalue=object()))
+    else:
+      return NotImplemented
+
+  def __ne__(self, other):
+    return not self == other
+
 
 class DefaultGlobalBatchTriggerDriver(TriggerDriver):
   """Breaks a bundles into window (pane)s according to the default triggering.
@@ -888,11 +900,10 @@ class DefaultGlobalBatchTriggerDriver(TriggerDriver):
     pass
 
   def process_elements(self, state, windowed_values, unused_output_watermark):
-    if isinstance(windowed_values, list):
-      unwindowed = [wv.value for wv in windowed_values]
-    else:
-      unwindowed = _UnwindowedValues(windowed_values)
-    yield WindowedValue(unwindowed, MIN_TIMESTAMP, self.GLOBAL_WINDOW_TUPLE)
+    yield WindowedValue(
+        _UnwindowedValues(windowed_values),
+        MIN_TIMESTAMP,
+        self.GLOBAL_WINDOW_TUPLE)
 
   def process_timer(self, window_id, name, time_domain, timestamp, state):
     raise TypeError('Triggers never set or called for batch default windowing.')


Mime
View raw message