beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
Subject [beam-site] 01/03: [BEAM-1934] Add more CoGroupByKey content/examples
Date Mon, 06 Nov 2017 21:13:02 GMT
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch mergebot
in repository

commit db377341a97d127802feeef1c9d1d981bc2f0fc5
Author: melissa <>
AuthorDate: Wed Aug 23 15:25:47 2017 -0700

    [BEAM-1934] Add more CoGroupByKey content/examples
 src/documentation/ | 193 +++++++++++++++++++++++++++------
 1 file changed, 162 insertions(+), 31 deletions(-)

diff --git a/src/documentation/ b/src/documentation/
index 2ccbd35..c2f95ac 100644
--- a/src/documentation/
+++ b/src/documentation/
@@ -785,45 +785,176 @@ tree, [2]
 Thus, `GroupByKey` represents a transform from a multimap (multiple keys to
 individual values) to a uni-map (unique keys to collections of values).
+##### GroupByKey and unbounded PCollections
+If you are using unbounded `PCollection`s, you must use either [non-global
+windowing](#setting-your-pcollections-windowing-function) or an
+[aggregation trigger](#triggers) in order to perform a `GroupByKey` or
+[CoGroupByKey](#cogroupbykey). This is because a bounded `GroupByKey` or
+`CoGroupByKey` must wait for all the data with a certain key to be collected,
+but with unbounded collections, the data is unlimited. Windowing and/or triggers
+allow grouping to operate on logical, finite bundles of data within the
+unbounded data streams.
+If you do apply `GroupByKey` or `CoGroupByKey` to a group of unbounded
+`PCollection`s without setting either a non-global windowing strategy, a trigger
+strategy, or both for each collection, Beam generates an IllegalStateException
+error at pipeline construction time.
+When using `GroupByKey` or `CoGroupByKey` to group `PCollection`s that have a
+[windowing strategy](#windowing) applied, all of the `PCollection`s you want to
+group *must use the same windowing strategy* and window sizing. For example, all
+of the collections you are merging must use (hypothetically) identical 5-minute
+fixed windows, or 4-minute sliding windows starting every 30 seconds.
+If your pipeline attempts to use `GroupByKey` or `CoGroupByKey` to merge
+`PCollection`s with incompatible windows, Beam generates an
+IllegalStateException error at pipeline construction time.
 #### 4.2.3. CoGroupByKey
-`CoGroupByKey` joins two or more key/value `PCollection`s that have the same key
-type, and then emits a collection of `KV<K, CoGbkResult>` pairs. [Design Your
-Pipeline]({{ site.baseurl }}/documentation/pipelines/design-your-pipeline/#multiple-sources)
+`CoGroupByKey` performs a relational join of two or more key/value
+`PCollection`s that have the same key type.
+[Design Your Pipeline]({{ site.baseurl }}/documentation/pipelines/design-your-pipeline/#multiple-sources)
 shows an example pipeline that uses a join.
-Given the input collections below:
+Consider using `CoGroupByKey` if you have multiple data sets that provide
+information about related things. For example, let's say you have two different
+files with user data: one file has names and email addresses; the other file
+has names and phone numbers. You can join those two data sets, using the user
+name as a common key and the other data as the associated values. After the
+join, you have one data set that contains all of the information (email
+addresses and phone numbers) associated with each name.
+If you are using unbounded `PCollection`s, you must use either [non-global
+windowing](#setting-your-pcollections-windowing-function) or an
+[aggregation trigger](#triggers) in order to perform a `CoGroupByKey`. See
+[GroupByKey and unbounded PCollections](#groupbykey-and-unbounded-pcollections)
+for more details.
+<span class="language-java">
+In the Beam SDK for Java, `CoGroupByKey` accepts a tuple of keyed
+`PCollection`s (`PCollection<KV<K, V>>`) as input. For type safety, the SDK
+requires you to pass each `PCollection` as part of a `KeyedPCollectionTuple`.
+You must declare a `TupleTag` for each input `PCollection` in the
+`KeyedPCollectionTuple` that you want to pass to `CoGroupByKey`. As output,
+`CoGroupByKey` returns a `PCollection<KV<K, CoGbkResult>>`, which groups values
+from all the input `PCollection`s by their common keys. Each key (all of type
+`K`) will have a different `CoGbkResult`, which is a map from `TupleTag<T>` to
+`Iterable<T>`. You can access a specific collection in an `CoGbkResult` object
+by using the `TupleTag` that you supplied with the initial collection.
+<span class="language-py">
+In the Beam SDK for Python, `CoGroupByKey` accepts a dictionary of keyed
+`PCollection`s as input. As output, `CoGroupByKey` creates a single output
+`PCollection` that contains one key/value tuple for each key in the input
+`PCollection`s. Each key's value is a dictionary that maps each tag to an
+iterable of the values under they key in the corresponding `PCollection`.
+The following conceptual examples use two input collections to show the mechanics of
+<span class="language-java">
+The first set of data has a `TupleTag<String>` called `emailTag` and contains names
+and email addresses. The second set of data has a `TupleTag<String>` called
+`phoneTag` and contains names and phone numbers.
+<span class="language-py">
+The first set of data contains names and email addresses. The second set of
+data contains names and phone numbers.
+// This set of data has a `TupleTag<String>` called `emailTag`.
+   "amy" -> ""
+   "carl" -> ""
+   "julia" -> ""
+   "carl" -> ""
+// This set of data has a `TupleTag<String>` called `phoneTag`.
+   "amy" -> "111-222-3333"
+   "james" -> "222-333-4444"
+   "amy" -> "333-444-5555"
+   "carl" -> "444-555-6666"
-// collection 1
-user1, address1
-user2, address2
-user3, address3
+{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/
-// collection 2
-user1, order1
-user1, order2
-user2, order3
-guest, order4
+After `CoGroupByKey`, the resulting data contains all data associated with each
+unique key from any of the input collections.
+   "amy" -> {
+      emailTag -> [""]
+      phoneTag -> ["111-222-3333", "333-444-5555"]
+   }
+   "carl" -> {
+      emailTag -> ["", ""]
+      phoneTag -> ["444-555-6666"]
+   }
+   "james" -> {
+      emailTag -> [],
+      phoneTag -> ["222-333-4444"]
+   }
+   "julia" -> {
+      emailTag -> [""],
+      phoneTag -> []
+   }
+{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/
-`CoGroupByKey` gathers up the values with the same key from all `PCollection`s,
-and outputs a new pair consisting of the unique key and an object `CoGbkResult`
-containing all values that were associated with that key. If you apply
-`CoGroupByKey` to the input collections above, the output collection would look
-like this:
+The following code example joins the two `PCollection`s with `CoGroupByKey`,
+followed by a `ParDo` to consume the result. Then, the code uses tags to look up
+and format data from each collection.
+  // Each set of key-value pairs is read into separate PCollections.
+  // Each shares a common key ("K").
+  PCollection<KV<K, V1>> pt1 = ...;
+  PCollection<KV<K, V2>> pt2 = ...;
+  // Create tuple tags for the value types in each collection.
+  final TupleTag<V1> t1 = new TupleTag<V1>();
+  final TupleTag<V2> t2 = new TupleTag<V2>();
+  // Merge collection values into a CoGbkResult collection
+  PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
+    KeyedPCollectionTuple.of(t1, pt1)
+                         .and(t2, pt2)
+                         .apply(CoGroupByKey.<K>create());
+  // Access results and do something with them.
+  PCollection<T> finalResultCollection =
+    coGbkResultCollection.apply(ParDo.of(
+      new DoFn<KV<K, CoGbkResult>, T>() {
+        @Override
+        public void processElement(ProcessContext c) {
+          KV<K, CoGbkResult> e = c.element();
+          // Get all collection 1 values
+          Iterable<V1> pt1Vals = e.getValue().getAll(t1);
+          // Get all collection 2 values
+          Iterable<V2> pt2Vals = e.getValue().getAll(t2);
+          // ... Do something ...
+          c.output(...some T...);
+        }
+      }));
-user1, [[address1], [order1, order2]]
-user2, [[address2], [order3]]
-user3, [[address3], []]
-guest, [[], [order4]]
+{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/
-> **A Note on Key/Value Pairs:** Beam represents key/value pairs slightly
-> differently depending on the language and SDK you're using. In the Beam SDK
-> for Java, you represent a key/value pair with an object of type `KV<K, V>`. In
-> Python, you represent key/value pairs with 2-tuples.
+The formatted data looks like this:
+  Sample coming soon.
+{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/
 #### 4.2.4. Combine
@@ -1078,7 +1209,7 @@ PCollection<String> merged = collections.apply(Flatten.<String>pCollections());
 # Flatten takes a tuple of PCollection objects.
-# Returns a single PCollection that contains all of the elements in the
+# Returns a single PCollection that contains all of the elements in the PCollection objects
in that tuple.
 github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/
@@ -1998,7 +2129,7 @@ windows are not actually used until they're needed for the `GroupByKey`.
 Subsequent transforms, however, are applied to the result of the `GroupByKey` --
 data is grouped by both key and window.
-#### 7.1.2. Using windowing with bounded PCollections
+#### 7.1.2. Windowing with bounded PCollections
 You can use windowing with fixed-size data sets in **bounded** `PCollection`s.
 However, note that windowing considers only the implicit timestamps attached to

To stop receiving notification emails like this one, please contact
"" <>.

View raw message