beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/2] beam git commit: Added concrete example for CoGroupByKey snippet
Date Fri, 08 Sep 2017 18:27:08 GMT
Repository: beam
Updated Branches:
  refs/heads/master f8e119292 -> e9d3a4a79


Added concrete example for CoGroupByKey snippet


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/92676a5d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/92676a5d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/92676a5d

Branch: refs/heads/master
Commit: 92676a5d8b79dc99cd805a8a36b14fa1ef3007f5
Parents: f8e1192
Author: David Cavazos <dcavazos@google.com>
Authored: Thu Aug 31 12:24:46 2017 -0700
Committer: Ahmet Altay <altay@google.com>
Committed: Fri Sep 8 11:26:46 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/examples/snippets/snippets.py   | 20 +++++++--------
 .../examples/snippets/snippets_test.py          | 27 +++++++++++++++++---
 2 files changed, 33 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/92676a5d/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 46696f4..eac87a2 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -1148,24 +1148,24 @@ def model_co_group_by_key_tuple(email_list, phone_list, output_path):
     # multiple possible values for each key.
     # The phone_list contains values such as: ('mary': '111-222-3333') with
     # multiple possible values for each key.
-    emails = p | 'email' >> beam.Create(email_list)
-    phones = p | 'phone' >> beam.Create(phone_list)
+    emails_pcoll = p | 'create emails' >> beam.Create(email_list)
+    phones_pcoll = p | 'create phones' >> beam.Create(phone_list)
+
     # The result PCollection contains one key-value element for each key in the
     # input PCollections. The key of the pair will be the key from the input and
     # the value will be a dictionary with two entries: 'emails' - an iterable of
     # all values for the current key in the emails PCollection and 'phones': an
     # iterable of all values for the current key in the phones PCollection.
     # For instance, if 'emails' contained ('joe', 'joe@example.com') and
-    # ('joe', 'joe@gmail.com'), then 'result' will contain the element
+    # ('joe', 'joe@gmail.com'), then 'result' will contain the element:
     # ('joe', {'emails': ['joe@example.com', 'joe@gmail.com'], 'phones': ...})
-    result = {'emails': emails, 'phones': phones} | beam.CoGroupByKey()
-
-    def join_info((name, info)):
-      return '; '.join(['%s' % name,
-                        '%s' % ','.join(info['emails']),
-                        '%s' % ','.join(info['phones'])])
+    result = ({'emails': emails_pcoll, 'phones': phones_pcoll}
+              | beam.CoGroupByKey())
 
-    contact_lines = result | beam.Map(join_info)
+    contact_lines = result | beam.Map(
+        lambda (name, info):\
+           '%s; %s; %s' %\
+           (name, sorted(info['emails']), sorted(info['phones'])))
     # [END model_group_by_key_cogroupbykey_tuple]
     contact_lines | beam.io.WriteToText(output_path)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/92676a5d/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index ee1e50e..a700ba5 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -694,12 +694,31 @@ class SnippetsTest(unittest.TestCase):
     self.assertEqual([str(s) for s in expected], self.get_output(result_path))
 
   def test_model_co_group_by_key_tuple(self):
-    email_list = [['a', 'a@example.com'], ['b', 'b@example.com']]
-    phone_list = [['a', 'x4312'], ['b', 'x8452']]
+    # [START model_group_by_key_cogroupbykey_tuple_inputs]
+    email_list = [
+        ('amy', 'amy@example.com'),
+        ('carl', 'carl@example.com'),
+        ('julia', 'julia@example.com'),
+        ('carl', 'carl@email.com'),
+    ]
+    phone_list = [
+        ('amy', '111-222-3333'),
+        ('james', '222-333-4444'),
+        ('amy', '333-444-5555'),
+        ('carl', '444-555-6666'),
+    ]
+    # [END model_group_by_key_cogroupbykey_tuple_inputs]
     result_path = self.create_temp_file()
     snippets.model_co_group_by_key_tuple(email_list, phone_list, result_path)
-    expect = ['a; a@example.com; x4312', 'b; b@example.com; x8452']
-    self.assertEqual(expect, self.get_output(result_path))
+    # [START model_group_by_key_cogroupbykey_tuple_outputs]
+    contact_lines = [
+        "amy; ['amy@example.com']; ['111-222-3333', '333-444-5555']",
+        "carl; ['carl@email.com', 'carl@example.com']; ['444-555-6666']",
+        "james; []; ['222-333-4444']",
+        "julia; ['julia@example.com']; []",
+    ]
+    # [END model_group_by_key_cogroupbykey_tuple_outputs]
+    self.assertEqual(contact_lines, self.get_output(result_path))
 
   def test_model_use_and_query_metrics(self):
     """DebuggingWordCount example snippets."""


Mime
View raw message