beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <>
Subject [jira] [Commented] (BEAM-1934) Code examples for CoGroupByKey
Date Mon, 06 Nov 2017 21:18:00 GMT


ASF GitHub Bot commented on BEAM-1934:

asfgit closed pull request #302: [BEAM-1934] Add more CoGroupByKey content/examples

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/src/documentation/ b/src/documentation/
index ebe8a8e3b..50fd9d713 100644
--- a/src/documentation/
+++ b/src/documentation/
@@ -784,45 +784,122 @@ tree, [2]
 Thus, `GroupByKey` represents a transform from a multimap (multiple keys to
 individual values) to a uni-map (unique keys to collections of values).
+##### GroupByKey and unbounded PCollections
+If you are using unbounded `PCollection`s, you must use either [non-global
+windowing](#setting-your-pcollections-windowing-function) or an
+[aggregation trigger](#triggers) in order to perform a `GroupByKey` or
+[CoGroupByKey](#cogroupbykey). This is because a bounded `GroupByKey` or
+`CoGroupByKey` must wait for all the data with a certain key to be collected,
+but with unbounded collections, the data is unlimited. Windowing and/or triggers
+allow grouping to operate on logical, finite bundles of data within the
+unbounded data streams.
+If you do apply `GroupByKey` or `CoGroupByKey` to a group of unbounded
+`PCollection`s without setting either a non-global windowing strategy, a trigger
+strategy, or both for each collection, Beam generates an IllegalStateException
+error at pipeline construction time.
+When using `GroupByKey` or `CoGroupByKey` to group `PCollection`s that have a
+[windowing strategy](#windowing) applied, all of the `PCollection`s you want to
+group *must use the same windowing strategy* and window sizing. For example, all
+of the collections you are merging must use (hypothetically) identical 5-minute
+fixed windows, or 4-minute sliding windows starting every 30 seconds.
+If your pipeline attempts to use `GroupByKey` or `CoGroupByKey` to merge
+`PCollection`s with incompatible windows, Beam generates an
+IllegalStateException error at pipeline construction time.
 #### 4.2.3. CoGroupByKey
-`CoGroupByKey` joins two or more key/value `PCollection`s that have the same key
-type, and then emits a collection of `KV<K, CoGbkResult>` pairs. [Design Your
-Pipeline]({{ site.baseurl }}/documentation/pipelines/design-your-pipeline/#multiple-sources)
+`CoGroupByKey` performs a relational join of two or more key/value
+`PCollection`s that have the same key type.
+[Design Your Pipeline]({{ site.baseurl }}/documentation/pipelines/design-your-pipeline/#multiple-sources)
 shows an example pipeline that uses a join.
-Given the input collections below:
-// collection 1
-user1, address1
-user2, address2
-user3, address3
+Consider using `CoGroupByKey` if you have multiple data sets that provide
+information about related things. For example, let's say you have two different
+files with user data: one file has names and email addresses; the other file
+has names and phone numbers. You can join those two data sets, using the user
+name as a common key and the other data as the associated values. After the
+join, you have one data set that contains all of the information (email
+addresses and phone numbers) associated with each name.
+If you are using unbounded `PCollection`s, you must use either [non-global
+windowing](#setting-your-pcollections-windowing-function) or an
+[aggregation trigger](#triggers) in order to perform a `CoGroupByKey`. See
+[GroupByKey and unbounded PCollections](#groupbykey-and-unbounded-pcollections)
+for more details.
+<span class="language-java">
+In the Beam SDK for Java, `CoGroupByKey` accepts a tuple of keyed
+`PCollection`s (`PCollection<KV<K, V>>`) as input. For type safety, the SDK
+requires you to pass each `PCollection` as part of a `KeyedPCollectionTuple`.
+You must declare a `TupleTag` for each input `PCollection` in the
+`KeyedPCollectionTuple` that you want to pass to `CoGroupByKey`. As output,
+`CoGroupByKey` returns a `PCollection<KV<K, CoGbkResult>>`, which groups values
+from all the input `PCollection`s by their common keys. Each key (all of type
+`K`) will have a different `CoGbkResult`, which is a map from `TupleTag<T>` to
+`Iterable<T>`. You can access a specific collection in an `CoGbkResult` object
+by using the `TupleTag` that you supplied with the initial collection.
+<span class="language-py">
+In the Beam SDK for Python, `CoGroupByKey` accepts a dictionary of keyed
+`PCollection`s as input. As output, `CoGroupByKey` creates a single output
+`PCollection` that contains one key/value tuple for each key in the input
+`PCollection`s. Each key's value is a dictionary that maps each tag to an
+iterable of the values under they key in the corresponding `PCollection`.
+The following conceptual examples use two input collections to show the mechanics of
+<span class="language-java">
+The first set of data has a `TupleTag<String>` called `emailsTag` and contains names
+and email addresses. The second set of data has a `TupleTag<String>` called
+`phonesTag` and contains names and phone numbers.
+<span class="language-py">
+The first set of data contains names and email addresses. The second set of
+data contains names and phone numbers.
-// collection 2
-user1, order1
-user1, order2
-user2, order3
-guest, order4
+{% github_sample /apache/beam/blob/master/examples/java8/src/test/java/org/apache/beam/examples/website_snippets/
+{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/
-`CoGroupByKey` gathers up the values with the same key from all `PCollection`s,
-and outputs a new pair consisting of the unique key and an object `CoGbkResult`
-containing all values that were associated with that key. If you apply
-`CoGroupByKey` to the input collections above, the output collection would look
-like this:
-user1, [[address1], [order1, order2]]
-user2, [[address2], [order3]]
-user3, [[address3], []]
-guest, [[], [order4]]
+After `CoGroupByKey`, the resulting data contains all data associated with each
+unique key from any of the input collections.
+{% github_sample /apache/beam/blob/master/examples/java8/src/test/java/org/apache/beam/examples/website_snippets/
+{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/
-> **A Note on Key/Value Pairs:** Beam represents key/value pairs slightly
-> differently depending on the language and SDK you're using. In the Beam SDK
-> for Java, you represent a key/value pair with an object of type `KV<K, V>`. In
-> Python, you represent key/value pairs with 2-tuples.
+The following code example joins the two `PCollection`s with `CoGroupByKey`,
+followed by a `ParDo` to consume the result. Then, the code uses tags to look up
+and format data from each collection.
+{% github_sample /apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/website_snippets/
+{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/
+The formatted data looks like this:
+{% github_sample /apache/beam/blob/master/examples/java8/src/test/java/org/apache/beam/examples/website_snippets/
+{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/
 #### 4.2.4. Combine
@@ -1077,7 +1154,7 @@ PCollection<String> merged = collections.apply(Flatten.<String>pCollections());
 # Flatten takes a tuple of PCollection objects.
-# Returns a single PCollection that contains all of the elements in the
+# Returns a single PCollection that contains all of the elements in the PCollection objects
in that tuple.
 github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/
@@ -1997,7 +2074,7 @@ windows are not actually used until they're needed for the `GroupByKey`.
 Subsequent transforms, however, are applied to the result of the `GroupByKey` --
 data is grouped by both key and window.
-#### 7.1.2. Using windowing with bounded PCollections
+#### 7.1.2. Windowing with bounded PCollections
 You can use windowing with fixed-size data sets in **bounded** `PCollection`s.
 However, note that windowing considers only the implicit timestamps attached to


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

> Code examples for CoGroupByKey
> ------------------------------
>                 Key: BEAM-1934
>                 URL:
>             Project: Beam
>          Issue Type: Improvement
>          Components: website
>            Reporter: Aviem Zur
>            Assignee: Melissa Pashniak
> Add code examples for usage of {{CoGroupByKey}}.
> Also, it would probably be wise to give introductions to the components of a {{CoGroupByKey}}
such as {{KeyedPCollectionTuple}} and {{TupleTag}} to help users understand how to use it

This message was sent by Atlassian JIRA

View raw message