beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [48/50] [abbrv] incubator-beam git commit: Move all files to apache_beam folder
Date Tue, 14 Jun 2016 23:13:23 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/typecoders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py
new file mode 100644
index 0000000..98cf2b5
--- /dev/null
+++ b/sdks/python/apache_beam/coders/typecoders.py
@@ -0,0 +1,154 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Type coders registration.
+
+This module contains functionality to define and use coders for custom classes.
+Let's say we have a class Xyz and we are processing a PCollection with elements
+of type Xyz. If we do not register a coder for Xyz, a default pickle-based
+fallback coder will be used. This can be undesirable for two reasons. First, we
+may want a faster coder or a more space efficient one. Second, the pickle-based
+coder is not deterministic in the sense that objects like dictionaries or sets
+are not guaranteed to be encoded in the same way every time (elements are not
+really ordered).
+
+Two (sometimes three) steps are needed to define and use a custom coder:
+  - define the coder class
+  - associate the code with the class (a.k.a. coder registration)
+  - typehint DoFns or transforms with the new class or composite types using
+    the class.
+
+A coder class is defined by subclassing from CoderBase and defining the
+encode_to_bytes and decode_from_bytes methods. The framework uses duck-typing
+for coders so it is not strictly required to subclass from CoderBase as long as
+the encode/decode methods are defined.
+
+Registering a coder class is made with a register_coder() call::
+
+  from google.cloud.dataflow import coders
+  ...
+  coders.registry.register_coder(Xyz, XyzCoder)
+
+Additionally, DoFns and PTransforms may need type hints. This is not always
+necessary since there is functionality to infer the return types of DoFns by
+analyzing the code. For instance, for the function below the return type of
+'Xyz' will be inferred::
+
+  def MakeXyzs(v):
+    return Xyz(v)
+
+If Xyz is inferred then its coder will be used whenever the framework needs to
+serialize data (e.g., writing to the shuffler subsystem responsible for group by
+key operations). If a typehint is needed it can be specified by decorating the
+DoFns or using with_input_types/with_output_types methods on PTransforms. For
+example, the above function can be decorated::
+
+  @with_output_types(Xyz)
+  def MakeXyzs(v):
+    return complex_operation_returning_Xyz(v)
+
+See google.cloud.dataflow.typehints.decorators module for more details.
+"""
+
+import logging
+
+from google.cloud.dataflow.coders import coders
+from google.cloud.dataflow.typehints import typehints
+
+
+class CoderRegistry(object):
+  """A coder registry for typehint/coder associations."""
+
+  def __init__(self, fallback_coder=None):
+    self._coders = {}
+    self.custom_types = []
+    self.register_standard_coders(fallback_coder)
+
+  def register_standard_coders(self, fallback_coder):
+    """Register coders for all basic and composite types."""
+    self._register_coder_internal(int, coders.VarIntCoder)
+    self._register_coder_internal(float, coders.FloatCoder)
+    self._register_coder_internal(str, coders.BytesCoder)
+    self._register_coder_internal(bytes, coders.BytesCoder)
+    self._register_coder_internal(unicode, coders.StrUtf8Coder)
+    self._register_coder_internal(typehints.TupleConstraint, coders.TupleCoder)
+    self._register_coder_internal(typehints.AnyTypeConstraint,
+                                  coders.PickleCoder)
+    self._fallback_coder = fallback_coder or coders.PickleCoder
+
+  def _register_coder_internal(self, typehint_type, typehint_coder_class):
+    self._coders[typehint_type] = typehint_coder_class
+
+  def register_coder(self, typehint_type, typehint_coder_class):
+    if not isinstance(typehint_coder_class, type):
+      raise TypeError('Coder registration requires a coder class object. '
+                      'Received %r instead.' % typehint_coder_class)
+    if typehint_type not in self.custom_types:
+      self.custom_types.append(typehint_type)
+    self._register_coder_internal(typehint_type, typehint_coder_class)
+
+  def get_coder(self, typehint):
+    coder = self._coders.get(
+        typehint.__class__ if isinstance(typehint, typehints.TypeConstraint)
+        else typehint, None)
+    if isinstance(typehint, typehints.TypeConstraint) and coder is not None:
+      return coder.from_type_hint(typehint, self)
+    if coder is None:
+      # We use the fallback coder when there is no coder registered for a
+      # typehint. For example a user defined class with no coder specified.
+      if not hasattr(self, '_fallback_coder'):
+        raise RuntimeError(
+            'Coder registry has no fallback coder. This can happen if the '
+            'fast_coders module could not be imported.')
+      if isinstance(typehint, typehints.IterableTypeConstraint):
+        # In this case, we suppress the warning message for using the fallback
+        # coder, since Iterable is hinted as the output of a GroupByKey
+        # operation and that direct output will not be coded.
+        # TODO(ccy): refine this behavior.
+        pass
+      elif typehint is None:
+        # In some old code, None is used for Any.
+        # TODO(robertwb): Clean this up.
+        pass
+      elif isinstance(typehint, typehints.TypeVariable):
+        # TODO(robertwb): Clean this up when type inference is fully enabled.
+        pass
+      else:
+        logging.warning('Using fallback coder for typehint: %r.', typehint)
+      coder = self._fallback_coder
+    return coder.from_type_hint(typehint, self)
+
+  def get_custom_type_coder_tuples(self, types):
+    """Returns type/coder tuples for all custom types passed in."""
+    return [(t, self._coders[t]) for t in types if t in self.custom_types]
+
+  def verify_deterministic(self, key_coder, op_name, silent=True):
+    if not key_coder.is_deterministic():
+      error_msg = ('The key coder "%s" for %s '
+                   'is not deterministic. This may result in incorrect '
+                   'pipeline output. This can be fixed by adding a type '
+                   'hint to the operation preceding the GroupByKey step, '
+                   'and for custom key classes, by writing a '
+                   'deterministic custom Coder. Please see the '
+                   'documentation for more details.' % (key_coder, op_name))
+      if isinstance(key_coder, (coders.PickleCoder, self._fallback_coder)):
+        if not silent:
+          logging.warning(error_msg)
+        return coders.DeterministicPickleCoder(key_coder, op_name)
+      else:
+        raise ValueError(error_msg)
+    else:
+      return key_coder
+
+registry = CoderRegistry()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/typecoders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/typecoders_test.py b/sdks/python/apache_beam/coders/typecoders_test.py
new file mode 100644
index 0000000..ed46ede
--- /dev/null
+++ b/sdks/python/apache_beam/coders/typecoders_test.py
@@ -0,0 +1,114 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unit tests for the typecoders module."""
+
+import unittest
+
+from google.cloud.dataflow.coders import coders
+from google.cloud.dataflow.coders import typecoders
+from google.cloud.dataflow.internal import pickler
+from google.cloud.dataflow.typehints import typehints
+
+
+class CustomClass(object):
+
+  def __init__(self, n):
+    self.number = n
+
+  def __eq__(self, other):
+    return self.number == other.number
+
+
+class CustomCoder(coders.Coder):
+
+  def encode(self, value):
+    return str(value.number)
+
+  def decode(self, encoded):
+    return CustomClass(int(encoded))
+
+  def is_deterministic(self):
+    # This coder is deterministic. Though we don't use need this coder to be
+    # deterministic for this test, we annotate this as such to follow best
+    # practices.
+    return True
+
+
+class TypeCodersTest(unittest.TestCase):
+
+  def test_register_non_type_coder(self):
+    coder = CustomCoder()
+    with self.assertRaises(TypeError) as e:
+      # When registering a coder the coder class must be specified.
+      typecoders.registry.register_coder(CustomClass, coder)
+    self.assertEqual(e.exception.message,
+                     'Coder registration requires a coder class object. '
+                     'Received %r instead.' % coder)
+
+  def test_get_coder_with_custom_coder(self):
+    typecoders.registry.register_coder(CustomClass, CustomCoder)
+    self.assertEqual(CustomCoder,
+                     typecoders.registry.get_coder(CustomClass).__class__)
+
+  def test_get_coder_with_composite_custom_coder(self):
+    typecoders.registry.register_coder(CustomClass, CustomCoder)
+    coder = typecoders.registry.get_coder(typehints.KV[CustomClass, str])
+    revived_coder = pickler.loads(pickler.dumps(coder))
+    self.assertEqual(
+        (CustomClass(123), 'abc'),
+        revived_coder.decode(revived_coder.encode((CustomClass(123), 'abc'))))
+
+  def test_get_coder_with_standard_coder(self):
+    self.assertEqual(coders.BytesCoder,
+                     typecoders.registry.get_coder(str).__class__)
+
+  def test_fallbackcoder(self):
+    coder = typecoders.registry.get_coder(typehints.Any)
+    self.assertEqual(('abc', 123), coder.decode(coder.encode(('abc', 123))))
+
+  def test_get_coder_can_be_pickled(self):
+    coder = typecoders.registry.get_coder(typehints.Tuple[str, int])
+    revived_coder = pickler.loads(pickler.dumps(coder))
+    self.assertEqual(('abc', 123),
+                     revived_coder.decode(revived_coder.encode(('abc', 123))))
+
+  def test_standard_int_coder(self):
+    real_coder = typecoders.registry.get_coder(int)
+    expected_coder = coders.VarIntCoder()
+    self.assertEqual(
+        real_coder.encode(0x0404), expected_coder.encode(0x0404))
+    self.assertEqual(0x0404, real_coder.decode(real_coder.encode(0x0404)))
+    self.assertEqual(
+        real_coder.encode(0x040404040404),
+        expected_coder.encode(0x040404040404))
+    self.assertEqual(0x040404040404,
+                     real_coder.decode(real_coder.encode(0x040404040404)))
+
+  def test_standard_str_coder(self):
+    real_coder = typecoders.registry.get_coder(str)
+    expected_coder = coders.BytesCoder()
+    self.assertEqual(
+        real_coder.encode('abc'), expected_coder.encode('abc'))
+    self.assertEqual('abc', real_coder.decode(real_coder.encode('abc')))
+
+    real_coder = typecoders.registry.get_coder(bytes)
+    expected_coder = coders.BytesCoder()
+    self.assertEqual(
+        real_coder.encode('abc'), expected_coder.encode('abc'))
+    self.assertEqual('abc', real_coder.decode(real_coder.encode('abc')))
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/dataflow_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py
new file mode 100644
index 0000000..c40b88f
--- /dev/null
+++ b/sdks/python/apache_beam/dataflow_test.py
@@ -0,0 +1,405 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Integration tests for the dataflow package."""
+
+from __future__ import absolute_import
+
+import logging
+import re
+import unittest
+
+from google.cloud.dataflow.pipeline import Pipeline
+from google.cloud.dataflow.pvalue import AsDict
+from google.cloud.dataflow.pvalue import AsIter as AllOf
+from google.cloud.dataflow.pvalue import AsList
+from google.cloud.dataflow.pvalue import AsSingleton
+from google.cloud.dataflow.pvalue import EmptySideInput
+from google.cloud.dataflow.pvalue import SideOutputValue
+from google.cloud.dataflow.transforms import Create
+from google.cloud.dataflow.transforms import DoFn
+from google.cloud.dataflow.transforms import FlatMap
+from google.cloud.dataflow.transforms import GroupByKey
+from google.cloud.dataflow.transforms import Map
+from google.cloud.dataflow.transforms import ParDo
+from google.cloud.dataflow.transforms import WindowInto
+from google.cloud.dataflow.transforms.util import assert_that
+from google.cloud.dataflow.transforms.util import equal_to
+from google.cloud.dataflow.transforms.window import IntervalWindow
+from google.cloud.dataflow.transforms.window import WindowFn
+
+
+class DataflowTest(unittest.TestCase):
+  """Dataflow integration tests."""
+
+  SAMPLE_DATA = 'aa bb cc aa bb aa \n' * 10
+  SAMPLE_RESULT = [('cc', 10), ('bb', 20), ('aa', 30)]
+
+  # TODO(silviuc): Figure out a nice way to specify labels for stages so that
+  # internal steps get prepended with surorunding stage names.
+  @staticmethod
+  def Count(pcoll):  # pylint: disable=invalid-name
+    """A Count transform: v, ... => (v, n), ..."""
+    return (pcoll
+            | Map('AddCount', lambda x: (x, 1))
+            | GroupByKey('GroupCounts')
+            | Map('AddCounts', lambda (x, ones): (x, sum(ones))))
+
+  def test_word_count(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    lines = pipeline | Create('SomeWords', [DataflowTest.SAMPLE_DATA])
+    result = (
+        (lines | FlatMap('GetWords', lambda x: re.findall(r'\w+', x)))
+        .apply('CountWords', DataflowTest.Count))
+    assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT))
+    pipeline.run()
+
+  def test_map(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    lines = pipeline | Create('input', ['a', 'b', 'c'])
+    result = (lines
+              | Map('upper', str.upper)
+              | Map('prefix', lambda x, prefix: prefix + x, 'foo-'))
+    assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C']))
+    pipeline.run()
+
+  def test_word_count_using_get(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    lines = pipeline | Create('SomeWords', [DataflowTest.SAMPLE_DATA])
+    result = (
+        (lines | FlatMap('GetWords', lambda x: re.findall(r'\w+', x)))
+        .apply('CountWords', DataflowTest.Count))
+    assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT))
+    pipeline.run()
+
+  def test_par_do_with_side_input_as_arg(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    words_list = ['aa', 'bb', 'cc']
+    words = pipeline | Create('SomeWords', words_list)
+    prefix = pipeline | Create('SomeString', ['xyz'])  # side in
+    suffix = 'zyx'
+    result = words | FlatMap(
+        'DecorateWords',
+        lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)],
+        AsSingleton(prefix), suffix)
+    assert_that(result, equal_to(['xyz-%s-zyx' % x for x in words_list]))
+    pipeline.run()
+
+  def test_par_do_with_side_input_as_keyword_arg(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    words_list = ['aa', 'bb', 'cc']
+    words = pipeline | Create('SomeWords', words_list)
+    prefix = 'zyx'
+    suffix = pipeline | Create('SomeString', ['xyz'])  # side in
+    result = words | FlatMap(
+        'DecorateWords',
+        lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)],
+        prefix, sfx=AsSingleton(suffix))
+    assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
+    pipeline.run()
+
+  def test_par_do_with_do_fn_object(self):
+    class SomeDoFn(DoFn):
+      """A custom DoFn for a FlatMap transform."""
+
+      def process(self, context, prefix, suffix):
+        return ['%s-%s-%s' % (prefix, context.element, suffix)]
+
+    pipeline = Pipeline('DirectPipelineRunner')
+    words_list = ['aa', 'bb', 'cc']
+    words = pipeline | Create('SomeWords', words_list)
+    prefix = 'zyx'
+    suffix = pipeline | Create('SomeString', ['xyz'])  # side in
+    result = words | ParDo('DecorateWordsDoFn', SomeDoFn(), prefix,
+                           suffix=AsSingleton(suffix))
+    assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
+    pipeline.run()
+
+  def test_par_do_with_multiple_outputs_and_using_yield(self):
+    class SomeDoFn(DoFn):
+      """A custom DoFn using yield."""
+
+      def process(self, context):
+        yield context.element
+        if context.element % 2 == 0:
+          yield SideOutputValue('even', context.element)
+        else:
+          yield SideOutputValue('odd', context.element)
+
+    pipeline = Pipeline('DirectPipelineRunner')
+    nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
+    results = nums | ParDo(
+        'ClassifyNumbers', SomeDoFn()).with_outputs('odd', 'even', main='main')
+    assert_that(results.main, equal_to([1, 2, 3, 4]))
+    assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
+    assert_that(results.even, equal_to([2, 4]), label='assert:even')
+    pipeline.run()
+
+  def test_par_do_with_multiple_outputs_and_using_return(self):
+    def some_fn(v):
+      if v % 2 == 0:
+        return [v, SideOutputValue('even', v)]
+      else:
+        return [v, SideOutputValue('odd', v)]
+
+    pipeline = Pipeline('DirectPipelineRunner')
+    nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
+    results = nums | FlatMap(
+        'ClassifyNumbers', some_fn).with_outputs('odd', 'even', main='main')
+    assert_that(results.main, equal_to([1, 2, 3, 4]))
+    assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
+    assert_that(results.even, equal_to([2, 4]), label='assert:even')
+    pipeline.run()
+
+  def test_empty_singleton_side_input(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcol = pipeline | Create('start', [1, 2])
+    side = pipeline | Create('side', [])  # Empty side input.
+
+    def my_fn(k, s):
+      v = ('empty' if isinstance(s, EmptySideInput) else 'full')
+      return [(k, v)]
+    result = pcol | FlatMap('compute', my_fn, AsSingleton(side))
+    assert_that(result, equal_to([(1, 'empty'), (2, 'empty')]))
+    pipeline.run()
+
+  def test_multi_valued_singleton_side_input(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcol = pipeline | Create('start', [1, 2])
+    side = pipeline | Create('side', [3, 4])  # 2 values in side input.
+    pcol | FlatMap('compute', lambda x, s: [x * s], AsSingleton(side))
+    with self.assertRaises(ValueError) as e:
+      pipeline.run()
+
+  def test_default_value_singleton_side_input(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcol = pipeline | Create('start', [1, 2])
+    side = pipeline | Create('side', [])  # 0 values in side input.
+    result = (
+        pcol | FlatMap('compute', lambda x, s: [x * s], AsSingleton(side, 10)))
+    assert_that(result, equal_to([10, 20]))
+    pipeline.run()
+
+  def test_iterable_side_input(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcol = pipeline | Create('start', [1, 2])
+    side = pipeline | Create('side', [3, 4])  # 2 values in side input.
+    result = pcol | FlatMap('compute',
+                            lambda x, s: [x * y for y in s], AllOf(side))
+    assert_that(result, equal_to([3, 4, 6, 8]))
+    pipeline.run()
+
+  def test_undeclared_side_outputs(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
+    results = nums | FlatMap(
+        'ClassifyNumbers',
+        lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
+    ).with_outputs()
+    # TODO(silviuc): Revisit this test to check for undeclared side outputs.
+    # This should work with .with_outputs() without any tags declared and
+    # the results[None] should work also.
+    assert_that(results[None], equal_to([1, 2, 3, 4]))
+    assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
+    assert_that(results.even, equal_to([2, 4]), label='assert:even')
+    pipeline.run()
+
+  def test_empty_side_outputs(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    nums = pipeline | Create('Some Numbers', [1, 3, 5])
+    results = nums | FlatMap(
+        'ClassifyNumbers',
+        lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
+    ).with_outputs()
+    assert_that(results[None], equal_to([1, 3, 5]))
+    assert_that(results.odd, equal_to([1, 3, 5]), label='assert:odd')
+    assert_that(results.even, equal_to([]), label='assert:even')
+    pipeline.run()
+
+  def test_as_list_and_as_dict_side_inputs(self):
+    a_list = [5, 1, 3, 2, 9]
+    some_pairs = [('crouton', 17), ('supreme', None)]
+    pipeline = Pipeline('DirectPipelineRunner')
+    main_input = pipeline | Create('main input', [1])
+    side_list = pipeline | Create('side list', a_list)
+    side_pairs = pipeline | Create('side pairs', some_pairs)
+    results = main_input | FlatMap(
+        'concatenate',
+        lambda x, the_list, the_dict: [[x, the_list, the_dict]],
+        AsList(side_list), AsDict(side_pairs))
+
+    def  matcher(expected_elem, expected_list, expected_pairs):
+      def match(actual):
+        [[actual_elem, actual_list, actual_dict]] = actual
+        equal_to([expected_elem])([actual_elem])
+        equal_to(expected_list)(actual_list)
+        equal_to(expected_pairs)(actual_dict.iteritems())
+      return match
+
+    assert_that(results, matcher(1, a_list, some_pairs))
+    pipeline.run()
+
+  def test_as_singleton_without_unique_labels(self):
+    # This should succeed as calling AsSingleton on the same PCollection twice
+    # with the same defaults will return the same PCollectionView.
+    a_list = [2]
+    pipeline = Pipeline('DirectPipelineRunner')
+    main_input = pipeline | Create('main input', [1])
+    side_list = pipeline | Create('side list', a_list)
+    results = main_input | FlatMap(
+        'test',
+        lambda x, s1, s2: [[x, s1, s2]],
+        AsSingleton(side_list), AsSingleton(side_list))
+
+    def  matcher(expected_elem, expected_singleton):
+      def match(actual):
+        [[actual_elem, actual_singleton1, actual_singleton2]] = actual
+        equal_to([expected_elem])([actual_elem])
+        equal_to([expected_singleton])([actual_singleton1])
+        equal_to([expected_singleton])([actual_singleton2])
+      return match
+
+    assert_that(results, matcher(1, 2))
+    pipeline.run()
+
+  def test_as_singleton_with_different_defaults_without_unique_labels(self):
+    # This should fail as AsSingleton with distinct default values should create
+    # distinct PCollectionViews with the same full_label.
+    a_list = [2]
+    pipeline = Pipeline('DirectPipelineRunner')
+    main_input = pipeline | Create('main input', [1])
+    side_list = pipeline | Create('side list', a_list)
+
+    with self.assertRaises(RuntimeError) as e:
+      _ = main_input | FlatMap(
+          'test',
+          lambda x, s1, s2: [[x, s1, s2]],
+          AsSingleton(side_list), AsSingleton(side_list, default_value=3))
+    self.assertTrue(
+        e.exception.message.startswith(
+            'Transform "ViewAsSingleton(side list.None)" does not have a '
+            'stable unique label.'))
+
+  def test_as_singleton_with_different_defaults_with_unique_labels(self):
+    a_list = []
+    pipeline = Pipeline('DirectPipelineRunner')
+    main_input = pipeline | Create('main input', [1])
+    side_list = pipeline | Create('side list', a_list)
+    results = main_input | FlatMap(
+        'test',
+        lambda x, s1, s2: [[x, s1, s2]],
+        AsSingleton('si1', side_list, default_value=2),
+        AsSingleton('si2', side_list, default_value=3))
+
+    def  matcher(expected_elem, expected_singleton1, expected_singleton2):
+      def match(actual):
+        [[actual_elem, actual_singleton1, actual_singleton2]] = actual
+        equal_to([expected_elem])([actual_elem])
+        equal_to([expected_singleton1])([actual_singleton1])
+        equal_to([expected_singleton2])([actual_singleton2])
+      return match
+
+    assert_that(results, matcher(1, 2, 3))
+    pipeline.run()
+
+  def test_as_list_without_unique_labels(self):
+    # This should succeed as calling AsList on the same PCollection twice will
+    # return the same PCollectionView.
+    a_list = [1, 2, 3]
+    pipeline = Pipeline('DirectPipelineRunner')
+    main_input = pipeline | Create('main input', [1])
+    side_list = pipeline | Create('side list', a_list)
+    results = main_input | FlatMap(
+        'test',
+        lambda x, ls1, ls2: [[x, ls1, ls2]],
+        AsList(side_list), AsList(side_list))
+
+    def  matcher(expected_elem, expected_list):
+      def match(actual):
+        [[actual_elem, actual_list1, actual_list2]] = actual
+        equal_to([expected_elem])([actual_elem])
+        equal_to(expected_list)(actual_list1)
+        equal_to(expected_list)(actual_list2)
+      return match
+
+    assert_that(results, matcher(1, [1, 2, 3]))
+    pipeline.run()
+
+  def test_as_list_with_unique_labels(self):
+    a_list = [1, 2, 3]
+    pipeline = Pipeline('DirectPipelineRunner')
+    main_input = pipeline | Create('main input', [1])
+    side_list = pipeline | Create('side list', a_list)
+    results = main_input | FlatMap(
+        'test',
+        lambda x, ls1, ls2: [[x, ls1, ls2]],
+        AsList(side_list), AsList(side_list, label='label'))
+
+    def  matcher(expected_elem, expected_list):
+      def match(actual):
+        [[actual_elem, actual_list1, actual_list2]] = actual
+        equal_to([expected_elem])([actual_elem])
+        equal_to(expected_list)(actual_list1)
+        equal_to(expected_list)(actual_list2)
+      return match
+
+    assert_that(results, matcher(1, [1, 2, 3]))
+    pipeline.run()
+
+  def test_as_dict_with_unique_labels(self):
+    some_kvs = [('a', 1), ('b', 2)]
+    pipeline = Pipeline('DirectPipelineRunner')
+    main_input = pipeline | Create('main input', [1])
+    side_kvs = pipeline | Create('side kvs', some_kvs)
+    results = main_input | FlatMap(
+        'test',
+        lambda x, dct1, dct2: [[x, dct1, dct2]],
+        AsDict(side_kvs), AsDict(side_kvs, label='label'))
+
+    def  matcher(expected_elem, expected_kvs):
+      def match(actual):
+        [[actual_elem, actual_dict1, actual_dict2]] = actual
+        equal_to([expected_elem])([actual_elem])
+        equal_to(expected_kvs)(actual_dict1.iteritems())
+        equal_to(expected_kvs)(actual_dict2.iteritems())
+      return match
+
+    assert_that(results, matcher(1, some_kvs))
+    pipeline.run()
+
+  def test_window_transform(self):
+    class TestWindowFn(WindowFn):
+      """Windowing function adding two disjoint windows to each element."""
+
+      def assign(self, assign_context):
+        _ = assign_context
+        return [IntervalWindow(10, 20), IntervalWindow(20, 30)]
+
+      def merge(self, existing_windows):
+        return existing_windows
+
+    pipeline = Pipeline('DirectPipelineRunner')
+    numbers = pipeline | Create('KVs', [(1, 10), (2, 20), (3, 30)])
+    result = (numbers
+              | WindowInto('W', windowfn=TestWindowFn())
+              | GroupByKey('G'))
+    assert_that(
+        result, equal_to([(1, [10]), (1, [10]), (2, [20]),
+                          (2, [20]), (3, [30]), (3, [30])]))
+    pipeline.run()
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/error.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/error.py b/sdks/python/apache_beam/error.py
new file mode 100644
index 0000000..779c4d9
--- /dev/null
+++ b/sdks/python/apache_beam/error.py
@@ -0,0 +1,39 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Python Dataflow error classes."""
+
+
+class DataflowError(Exception):
+  """Base class for all Dataflow errors."""
+
+
+class PipelineError(DataflowError):
+  """An error in the pipeline object (e.g. a PValue not linked to it)."""
+
+
+class PValueError(DataflowError):
+  """An error related to a PValue object (e.g. value is not computed)."""
+
+
+class RunnerError(DataflowError):
+  """An error related to a Runner object (e.g. cannot find a runner to run)."""
+
+
+class SideInputError(DataflowError):
+  """An error related to a side input to a parallel Do operation."""
+
+
+class TransformError(DataflowError):
+  """An error related to a PTransform object."""

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/__init__.py b/sdks/python/apache_beam/examples/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
new file mode 100644
index 0000000..400863d
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -0,0 +1,79 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A workflow emitting the top k most common words for each prefix."""
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import re
+
+import google.cloud.dataflow as df
+
+
+def run(argv=None):
+
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--input',
+                      required=True,
+                      help='Input file to process.')
+  parser.add_argument('--output',
+                      required=True,
+                      help='Output file to write results to.')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  p = df.Pipeline(argv=pipeline_args)
+
+  (p  # pylint: disable=expression-not-assigned
+   | df.io.Read('read', df.io.TextFileSource(known_args.input))
+   | df.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x))
+   | TopPerPrefix('TopPerPrefix', 5)
+   | df.Map('format',
+            lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
+   | df.io.Write('write', df.io.TextFileSink(known_args.output)))
+  p.run()
+
+
+class TopPerPrefix(df.PTransform):
+
+  def __init__(self, label, count):
+    super(TopPerPrefix, self).__init__(label)
+    self._count = count
+
+  def apply(self, words):
+    """Compute the most common words for each possible prefixes.
+
+    Args:
+      words: a PCollection of strings
+
+    Returns:
+      A PCollection of most common words with each prefix, in the form
+          (prefix, [(count, word), (count, word), ...])
+    """
+    return (words
+            | df.combiners.Count.PerElement()
+            | df.FlatMap(extract_prefixes)
+            | df.combiners.Top.LargestPerKey(self._count))
+
+
+def extract_prefixes((word, count)):
+  for k in range(1, len(word) + 1):
+    prefix = word[:k]
+    yield prefix, (count, word)
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
new file mode 100644
index 0000000..3c10483
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -0,0 +1,78 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test for the wordcount example."""
+
+import collections
+import logging
+import re
+import tempfile
+import unittest
+
+
+import google.cloud.dataflow as df
+from google.cloud.dataflow.examples.complete import autocomplete
+from google.cloud.dataflow.pvalue import AsIter
+from google.cloud.dataflow.utils import options
+
+# TODO(robertwb): Move to testing utilities.
+
+
+def assert_that(pcoll, matcher):
+  """Asserts that the give PCollection satisfies the constraints of the matcher
+  in a way that is runnable locally or on a remote service.
+  """
+  singleton = pcoll.pipeline | df.Create('create_singleton', [None])
+
+  def check_matcher(_, side_value):
+    assert matcher(side_value)
+    return []
+  singleton | df.FlatMap(check_matcher, AsIter(pcoll))
+
+
+def contains_in_any_order(expected):
+  def matcher(value):
+    vs = collections.Counter(value)
+    es = collections.Counter(expected)
+    if vs != es:
+      raise ValueError(
+          'extra: %s, missing: %s' % (vs - es, es - vs))
+    return True
+  return matcher
+
+
+class WordCountTest(unittest.TestCase):
+
+  WORDS = ['this', 'this', 'that', 'to', 'to', 'to']
+
+  def test_top_prefixes(self):
+    p = df.Pipeline('DirectPipelineRunner')
+    words = p | df.Create('create', self.WORDS)
+    result = words | autocomplete.TopPerPrefix('test', 5)
+    # values must be hashable for now
+    result = result | df.Map(lambda (k, vs): (k, tuple(vs)))
+    assert_that(result, contains_in_any_order(
+        [
+            ('t', ((3, 'to'), (2, 'this'), (1, 'that'))),
+            ('to', ((3, 'to'), )),
+            ('th', ((2, 'this'), (1, 'that'))),
+            ('thi', ((2, 'this'), )),
+            ('this', ((2, 'this'), )),
+            ('tha', ((1, 'that'), )),
+            ('that', ((1, 'that'), )),
+        ]))
+    p.run()
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/estimate_pi.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py
new file mode 100644
index 0000000..0e52bad
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -0,0 +1,109 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A workflow that uses a simple Monte Carlo method to estimate π.
+
+The algorithm computes the fraction of points drawn uniformly within the unit
+square that also fall in the quadrant of the unit circle that overlaps the
+square. A simple area calculation shows that this fraction should be π/4, so
+we multiply our counts ratio by four to estimate π.
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import json
+import logging
+import random
+
+
+import google.cloud.dataflow as df
+from google.cloud.dataflow.typehints import Any
+from google.cloud.dataflow.typehints import Iterable
+from google.cloud.dataflow.typehints import Tuple
+
+
+@df.typehints.with_output_types(Tuple[int, int, int])
+@df.typehints.with_input_types(int)
+def run_trials(runs):
+  """Run trials and return a 3-tuple representing the results.
+
+  Args:
+    runs: Number of trial runs to be executed.
+
+  Returns:
+    A 3-tuple (total trials, inside trials, 0).
+
+  The final zero is needed solely to make sure that the combine_results function
+  has same type for inputs and outputs (a requirement for combiner functions).
+  """
+  inside_runs = 0
+  for _ in xrange(runs):
+    x = random.uniform(0, 1)
+    y = random.uniform(0, 1)
+    inside_runs += 1 if x * x + y * y <= 1.0 else 0
+  return runs, inside_runs, 0
+
+
+@df.typehints.with_output_types(Tuple[int, int, float])
+@df.typehints.with_input_types(Iterable[Tuple[int, int, Any]])
+def combine_results(results):
+  """Combiner function to sum up trials and compute the estimate.
+
+  Args:
+    results: An iterable of 3-tuples (total trials, inside trials, ignored).
+
+  Returns:
+    A 3-tuple containing the sum of total trials, sum of inside trials, and
+    the probability computed from the two numbers.
+  """
+  # TODO(silviuc): Do we guarantee that argument can be iterated repeatedly?
+  # Should document one way or the other.
+  total, inside = sum(r[0] for r in results), sum(r[1] for r in results)
+  return total, inside, 4 * float(inside) / total
+
+
+class JsonCoder(object):
+  """A JSON coder used to format the final result."""
+
+  def encode(self, x):
+    return json.dumps(x)
+
+
+def run(argv=None):
+
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--output',
+                      required=True,
+                      help='Output file to write results to.')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  p = df.Pipeline(argv=pipeline_args)
+  # A thousand work items of a million tries each.
+  (p  # pylint: disable=expression-not-assigned
+   | df.Create('Initialize', [100000] * 100).with_output_types(int)
+   | df.Map('Run trials', run_trials)
+   | df.CombineGlobally('Sum', combine_results).without_defaults()
+   | df.io.Write('Write',
+                 df.io.TextFileSink(known_args.output,
+                                    coder=JsonCoder())))
+
+  # Actually run the pipeline (all operations above are deferred).
+  p.run()
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
new file mode 100644
index 0000000..0c5be30
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
@@ -0,0 +1,46 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test for the estimate_pi example."""
+
+import json
+import logging
+import tempfile
+import unittest
+
+from google.cloud.dataflow.examples.complete import estimate_pi
+
+
+class EstimatePiTest(unittest.TestCase):
+
+  def create_temp_file(self, contents):
+    with tempfile.NamedTemporaryFile(delete=False) as f:
+      f.write(contents)
+      return f.name
+
+  def test_basics(self):
+    temp_path = self.create_temp_file('result')
+    estimate_pi.run([
+        '--output=%s' % temp_path])
+    # Parse result file and compare.
+    with open(temp_path + '-00000-of-00001') as result_file:
+      estimated_pi = json.loads(result_file.readline())[2]
+      # Note: Probabilistically speaking this test can fail with a probability
+      # that is very small (VERY) given that we run at least 10 million trials.
+      self.assertTrue(estimated_pi > 3.13 and estimated_pi < 3.15)
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/juliaset/juliaset/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/__init__.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
new file mode 100644
index 0000000..3546f03
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -0,0 +1,119 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A Julia set computing workflow: https://en.wikipedia.org/wiki/Julia_set.
+
+We use the quadratic polinomial f(z) = z*z + c, with c = -.62772 +.42193i
+"""
+
+from __future__ import absolute_import
+
+import argparse
+
+import google.cloud.dataflow as df
+
+
+def from_pixel(x, y, n):
+  """Converts a NxN pixel position to a (-1..1, -1..1) complex number."""
+  return complex(2.0 * x / n - 1.0, 2.0 * y / n - 1.0)
+
+
+def get_julia_set_point_color((x, y), c, n, max_iterations):
+  """Given an pixel, convert it into a point in our julia set."""
+  z = from_pixel(x, y, n)
+  for i in xrange(max_iterations):
+    if z.real * z.real + z.imag * z.imag > 2.0:
+      break
+    z = z * z + c
+  return x, y, i  # pylint: disable=undefined-loop-variable
+
+
+def generate_julia_set_colors(pipeline, c, n, max_iterations):
+  """Compute julia set coordinates for each point in our set."""
+  def point_set(n):
+    for x in range(n):
+      for y in range(n):
+        yield (x, y)
+
+  julia_set_colors = (pipeline
+                      | df.Create('add points', point_set(n))
+                      | df.Map(get_julia_set_point_color, c, n, max_iterations))
+
+  return julia_set_colors
+
+
+def generate_julia_set_visualization(data, n, max_iterations):
+  """Generate the pixel matrix for rendering the julia set as an image."""
+  import numpy as np  # pylint: disable=g-import-not-at-top
+  colors = []
+  for r in range(0, 256, 16):
+    for g in range(0, 256, 16):
+      for b in range(0, 256, 16):
+        colors.append((r, g, b))
+
+  xy = np.zeros((n, n, 3), dtype=np.uint8)
+  for x, y, iteration in data:
+    xy[x, y] = colors[iteration * len(colors) / max_iterations]
+
+  return xy
+
+
+def save_julia_set_visualization(out_file, image_array):
+  """Save the fractal image of our julia set as a png."""
+  from matplotlib import pyplot as plt  # pylint: disable=g-import-not-at-top
+  plt.imsave(out_file, image_array, format='png')
+
+
+def run(argv=None):  # pylint: disable=missing-docstring
+
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--grid_size',
+                      dest='grid_size',
+                      default=1000,
+                      help='Size of the NxN matrix')
+  parser.add_argument(
+      '--coordinate_output',
+      dest='coordinate_output',
+      required=True,
+      help='Output file to write the color coordinates of the image to.')
+  parser.add_argument('--image_output',
+                      dest='image_output',
+                      default=None,
+                      help='Output file to write the resulting image to.')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  p = df.Pipeline(argv=pipeline_args)
+  n = int(known_args.grid_size)
+
+  coordinates = generate_julia_set_colors(p, complex(-.62772, .42193), n, 100)
+
+  # Group each coordinate triplet by its x value, then write the coordinates to
+  # the output file with an x-coordinate grouping per line.
+  # pylint: disable=expression-not-assigned
+  # pylint: disable=g-long-lambda
+  (coordinates | df.Map('x coord key', lambda (x, y, i): (x, (x, y, i)))
+   | df.GroupByKey('x coord') | df.Map(
+       'format',
+       lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords))
+   | df.io.Write('write', df.io.TextFileSink(known_args.coordinate_output)))
+  # pylint: enable=g-long-lambda
+  # pylint: enable=expression-not-assigned
+  p.run()
+
+  # Optionally render the image and save it to a file.
+  # TODO(silviuc): Add this functionality.
+  # if p.options.image_output is not None:
+  #  julia_set_image = generate_julia_set_visualization(
+  #      file_with_coordinates, n, 100)
+  #  save_julia_set_visualization(p.options.image_output, julia_set_image)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py
new file mode 100644
index 0000000..33c434a
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py
@@ -0,0 +1,83 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test for the juliaset example."""
+
+import logging
+import os
+import re
+import tempfile
+import unittest
+
+
+from google.cloud.dataflow.examples.complete.juliaset.juliaset import juliaset
+
+
+class JuliaSetTest(unittest.TestCase):
+
+  def setUp(self):
+    self.test_files = {}
+    self.test_files['output_coord_file_name'] = self.generate_temp_file()
+    self.test_files['output_image_file_name'] = self.generate_temp_file()
+
+  def tearDown(self):
+    for test_file in self.test_files.values():
+      if os.path.exists(test_file):
+        os.remove(test_file)
+
+  def generate_temp_file(self):
+    with tempfile.NamedTemporaryFile(delete=False) as temp_file:
+      return temp_file.name
+
+  def run_example(self, grid_size, image_file_name=None):
+    args = [
+        '--coordinate_output=%s' % self.test_files['output_coord_file_name'],
+        '--grid_size=%s' % grid_size,
+    ]
+
+    if image_file_name is not None:
+      args.append('--image_output=%s' % image_file_name)
+
+    juliaset.run(args)
+
+  def test_output_file_format(self):
+    grid_size = 5
+    self.run_example(grid_size)
+
+    # Parse the results from the file, and ensure it was written in the proper
+    # format.
+    with open(self.test_files['output_coord_file_name'] +
+              '-00000-of-00001') as result_file:
+      output_lines = result_file.readlines()
+
+      # Should have a line for each x-coordinate.
+      self.assertEqual(grid_size, len(output_lines))
+      for line in output_lines:
+        coordinates = re.findall(r'(\(\d+, \d+, \d+\))', line)
+
+        # Should have 5 coordinates on each line.
+        self.assertTrue(coordinates)
+        self.assertEqual(grid_size, len(coordinates))
+
+  def test_generate_fractal_image(self):
+    temp_image_file = self.test_files['output_image_file_name']
+    self.run_example(10, image_file_name=temp_image_file)
+
+    # Ensure that the image was saved properly.
+    # TODO(silviuc): Reactivate the test when --image_output is supported.
+    # self.assertTrue(os.stat(temp_image_file).st_size > 0)
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py
new file mode 100644
index 0000000..39a58d6
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py
@@ -0,0 +1,55 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A Julia set computing workflow: https://en.wikipedia.org/wiki/Julia_set.
+
+This example has in the juliaset/ folder all the code needed to execute the
+workflow. It is organized in this way so that it can be packaged as a Python
+package and later installed in the VM workers executing the job. The root
+directory for the example contains just a "driver" script to launch the job
+and the setup.py file needed to create a package.
+
+The advantages for organizing the code is that large projects will naturally
+evolve beyond just one module and you will have to make sure the additional
+modules are present in the worker.
+
+In Python Dataflow, using the --setup_file option when submitting a job, will
+trigger creating a source distribution (as if running python setup.py sdist) and
+then staging the resulting tarball in the staging area. The workers, upon
+startup, will install the tarball.
+
+Below is a complete command line for running the juliaset workflow remotely as
+an example:
+
+python juliaset_main.py \
+  --job_name juliaset-$USER \
+  --project YOUR-PROJECT \
+  --runner BlockingDataflowPipelineRunner \
+  --setup_file ./setup.py \
+  --staging_location gs://YOUR-BUCKET/juliaset/staging \
+  --temp_location gs://YOUR-BUCKET/juliaset/temp \
+  --coordinate_output gs://YOUR-BUCKET/juliaset/out \
+  --grid_size 20 \
+
+"""
+
+import logging
+
+
+from juliaset import juliaset
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  juliaset.run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/juliaset/setup.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/setup.py b/sdks/python/apache_beam/examples/complete/juliaset/setup.py
new file mode 100644
index 0000000..91d6588
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/juliaset/setup.py
@@ -0,0 +1,115 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Setup.py module for the workflow's worker utilities.
+
+All the workflow related code is gathered in a package that will be built as a
+source distribution, staged in the staging area for the workflow being run and
+then installed in the workers when they start running.
+
+This behavior is triggered by specifying the --setup_file command line option
+when running the workflow for remote execution.
+"""
+
+import subprocess
+
+import setuptools
+from setuptools.command.bdist_egg import bdist_egg as _bdist_egg
+
+
+class bdist_egg(_bdist_egg):  # pylint: disable=invalid-name
+  """A bdist_egg command class that will be invoked during package install.
+
+  The package built using the current setup.py will be staged and later
+  installed in the worker using `easy_install package'. This class will be
+  instantiated during install for this specific scenario and will trigger
+  running the custom commands specified.
+  """
+
+  def run(self):
+    self.run_command('CustomCommands')
+    _bdist_egg.run(self)
+
+
+# Some custom command to run during setup. The command is not essential for this
+# workflow. It is used here as an example. Each command will spawn a child
+# process. Typically, these commands will include steps to install non-Python
+# packages. For instance, to install a C++-based library libjpeg62 the following
+# two commands will have to be added:
+#
+#     ['apt-get', 'update'],
+#     ['apt-get', '--assume-yes', install', 'libjpeg62'],
+#
+# First, note that there is no need to use the sudo command because the setup
+# script runs with appropriate access.
+# Second, if apt-get tool is used then the first command needs to be 'apt-get
+# update' so the tool refreshes itself and initializes links to download
+# repositories.  Without this initial step the other apt-get install commands
+# will fail with package not found errors. Note also --assume-yes option which
+# shortcuts the interactive confirmation.
+#
+# The output of custom commands (including failures) will be logged in the
+# worker-startup log.
+CUSTOM_COMMANDS = [
+    ['echo', 'Custom command worked!']]
+
+
+class CustomCommands(setuptools.Command):
+  """A setuptools Command class able to run arbitrary commands."""
+
+  def initialize_options(self):
+    pass
+
+  def finalize_options(self):
+    pass
+
+  def RunCustomCommand(self, command_list):
+    print 'Running command: %s' % command_list
+    p = subprocess.Popen(
+        command_list,
+        stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+    # Can use communicate(input='y\n'.encode()) if the command run requires
+    # some confirmation.
+    stdout_data, _ = p.communicate()
+    print 'Command output: %s' % stdout_data
+    if p.returncode != 0:
+      raise RuntimeError(
+          'Command %s failed: exit code: %s' % (command_list, p.returncode))
+
+  def run(self):
+    for command in CUSTOM_COMMANDS:
+      self.RunCustomCommand(command)
+
+
+# Configure the required packages and scripts to install.
+# Note that the Python Dataflow containers come with numpy already installed
+# so this dependency will not trigger anything to be installed unless a version
+# restriction is specified.
+REQUIRED_PACKAGES = [
+    'numpy',
+    ]
+
+
+setuptools.setup(
+    name='juliaset',
+    version='0.0.1',
+    description='Julia set workflow package.',
+    install_requires=REQUIRED_PACKAGES,
+    packages=setuptools.find_packages(),
+    cmdclass={
+        # Command class instantiated and run during easy_install scenarios.
+        'bdist_egg': bdist_egg,
+        'CustomCommands': CustomCommands,
+        }
+    )

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
new file mode 100644
index 0000000..fcdfac8
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -0,0 +1,196 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A TF-IDF workflow (term frequency - inverse document frequency).
+
+For an explanation of the TF-IDF algorithm see the following link:
+http://en.wikipedia.org/wiki/Tf-idf
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import glob
+import math
+import re
+
+import google.cloud.dataflow as df
+from google.cloud.dataflow.pvalue import AsSingleton
+
+
+def read_documents(pipeline, uris):
+  """Read the documents at the provided uris and returns (uri, line) pairs."""
+  pcolls = []
+  for uri in uris:
+    pcolls.append(
+        pipeline
+        | df.io.Read('read: %s' % uri, df.io.TextFileSource(uri))
+        | df.Map('withkey: %s' % uri, lambda v, uri: (uri, v), uri))
+  return pcolls | df.Flatten('flatten read pcolls')
+
+
+class TfIdf(df.PTransform):
+  """A transform containing a basic TF-IDF pipeline.
+
+  The input consists of KV objects where the key is the document's URI and
+  the value is a piece of the document's content.
+  The output is mapping from terms to scores for each document URI.
+  """
+
+  def apply(self, uri_to_content):
+
+    # Compute the total number of documents, and prepare a singleton
+    # PCollection to use as side input.
+    total_documents = (
+        uri_to_content
+        | df.Keys('get uris')
+        | df.RemoveDuplicates('get unique uris')
+        | df.combiners.Count.Globally(' count uris'))
+
+    # Create a collection of pairs mapping a URI to each of the words
+    # in the document associated with that that URI.
+
+    def split_into_words((uri, line)):
+      return [(uri, w.lower()) for w in re.findall(r'[A-Za-z\']+', line)]
+
+    uri_to_words = (
+        uri_to_content
+        | df.FlatMap('split words', split_into_words))
+
+    # Compute a mapping from each word to the total number of documents
+    # in which it appears.
+    word_to_doc_count = (
+        uri_to_words
+        | df.RemoveDuplicates('get unique words per doc')
+        | df.Values('get words')
+        | df.combiners.Count.PerElement('count docs per word'))
+
+    # Compute a mapping from each URI to the total number of words in the
+    # document associated with that URI.
+    uri_to_word_total = (
+        uri_to_words
+        | df.Keys(' get uris')
+        | df.combiners.Count.PerElement('count words in doc'))
+
+    # Count, for each (URI, word) pair, the number of occurrences of that word
+    # in the document associated with the URI.
+    uri_and_word_to_count = (
+        uri_to_words
+        | df.combiners.Count.PerElement('count word-doc pairs'))
+
+    # Adjust the above collection to a mapping from (URI, word) pairs to counts
+    # into an isomorphic mapping from URI to (word, count) pairs, to prepare
+    # for a join by the URI key.
+    uri_to_word_and_count = (
+        uri_and_word_to_count
+        | df.Map('shift keys',
+                 lambda ((uri, word), count): (uri, (word, count))))
+
+    # Perform a CoGroupByKey (a sort of pre-join) on the prepared
+    # uri_to_word_total and uri_to_word_and_count tagged by 'word totals' and
+    # 'word counts' strings. This yields a mapping from URI to a dictionary
+    # that maps the above mentioned tag strings to an iterable containing the
+    # word total for that URI and word and count respectively.
+    #
+    # A diagram (in which '[]' just means 'iterable'):
+    #
+    #   URI: {'word totals': [count],  # Total words within this URI's document.
+    #         'word counts': [(word, count),  # Counts of specific words
+    #                         (word, count),  # within this URI's document.
+    #                         ... ]}
+    uri_to_word_and_count_and_total = (
+        {'word totals': uri_to_word_total, 'word counts': uri_to_word_and_count}
+        | df.CoGroupByKey('cogroup by uri'))
+
+    # Compute a mapping from each word to a (URI, term frequency) pair for each
+    # URI. A word's term frequency for a document is simply the number of times
+    # that word occurs in the document divided by the total number of words in
+    # the document.
+
+    def compute_term_frequency((uri, count_and_total)):
+      word_and_count = count_and_total['word counts']
+      # We have an iterable for one element that we want extracted.
+      [word_total] = count_and_total['word totals']
+      for word, count in word_and_count:
+        yield word, (uri, float(count) / word_total)
+
+    word_to_uri_and_tf = (
+        uri_to_word_and_count_and_total
+        | df.FlatMap('compute term frequencies', compute_term_frequency))
+
+    # Compute a mapping from each word to its document frequency.
+    # A word's document frequency in a corpus is the number of
+    # documents in which the word appears divided by the total
+    # number of documents in the corpus.
+    #
+    # This calculation uses a side input, a Dataflow-computed auxiliary value
+    # presented to each invocation of our MapFn lambda. The second argument to
+    # the lambda (called total---note that we are unpacking the first argument)
+    # receives the value we listed after the lambda in Map(). Additional side
+    # inputs (and ordinary Python values, too) can be provided to MapFns and
+    # DoFns in this way.
+    word_to_df = (
+        word_to_doc_count
+        | df.Map('compute doc frequencies',
+                 lambda (word, count), total: (word, float(count) / total),
+                 AsSingleton(total_documents)))
+
+    # Join the term frequency and document frequency collections,
+    # each keyed on the word.
+    word_to_uri_and_tf_and_df = (
+        {'tf': word_to_uri_and_tf, 'df': word_to_df}
+        | df.CoGroupByKey('cogroup words by tf-df'))
+
+    # Compute a mapping from each word to a (URI, TF-IDF) score for each URI.
+    # There are a variety of definitions of TF-IDF
+    # ("term frequency - inverse document frequency") score; here we use a
+    # basic version that is the term frequency divided by the log of the
+    # document frequency.
+
+    def compute_tf_idf((word, tf_and_df)):
+      [docf] = tf_and_df['df']
+      for uri, tf in tf_and_df['tf']:
+        yield word, (uri, tf * math.log(1 / docf))
+
+    word_to_uri_and_tfidf = (
+        word_to_uri_and_tf_and_df
+        | df.FlatMap('compute tf-idf', compute_tf_idf))
+
+    return word_to_uri_and_tfidf
+
+
+def run(argv=None):
+  """Main entry point; defines and runs the tfidf pipeline."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--uris',
+                      required=True,
+                      help='URIs to process.')
+  parser.add_argument('--output',
+                      required=True,
+                      help='Output file to write results to.')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  p = df.Pipeline(argv=pipeline_args)
+  # Read documents specified by the uris command line option.
+  pcoll = read_documents(p, glob.glob(known_args.uris))
+  # Compute TF-IDF information for each word.
+  output = pcoll | TfIdf()
+  # Write the output using a "Write" transform that has side effects.
+  # pylint: disable=expression-not-assigned
+  output | df.io.Write('write', df.io.TextFileSink(known_args.output))
+  p.run()
+
+
+if __name__ == '__main__':
+  run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
new file mode 100644
index 0000000..85b4964
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -0,0 +1,88 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test for the TF-IDF example."""
+
+import logging
+import os
+import re
+import tempfile
+import unittest
+
+import google.cloud.dataflow as df
+from google.cloud.dataflow.examples.complete import tfidf
+
+
+EXPECTED_RESULTS = set([
+    ('ghi', '1.txt', 0.3662040962227032),
+    ('abc', '1.txt', 0.0),
+    ('abc', '3.txt', 0.0),
+    ('abc', '2.txt', 0.0),
+    ('def', '1.txt', 0.13515503603605478),
+    ('def', '2.txt', 0.2027325540540822)])
+
+
+EXPECTED_LINE_RE = r'\(u\'([a-z]*)\', \(\'.*([0-9]\.txt)\', (.*)\)\)'
+
+
+class TfIdfTest(unittest.TestCase):
+
+  def create_file(self, path, contents):
+    logging.info('Creating temp file: %s', path)
+    with open(path, 'w') as f:
+      f.write(contents)
+
+  def test_tfidf_transform(self):
+    p = df.Pipeline('DirectPipelineRunner')
+    uri_to_line = p | df.Create(
+        'create sample',
+        [('1.txt', 'abc def ghi'),
+         ('2.txt', 'abc def'),
+         ('3.txt', 'abc')])
+    result = (
+        uri_to_line
+        | tfidf.TfIdf()
+        | df.Map('flatten', lambda (word, (uri, tfidf)): (word, uri, tfidf)))
+    df.assert_that(result, df.equal_to(EXPECTED_RESULTS))
+    # Run the pipeline. Note that the assert_that above adds to the pipeline
+    # a check that the result PCollection contains expected values. To actually
+    # trigger the check the pipeline must be run.
+    p.run()
+
+  def test_basics(self):
+    # Setup the files with expected content.
+    temp_folder = tempfile.mkdtemp()
+    self.create_file(os.path.join(temp_folder, '1.txt'), 'abc def ghi')
+    self.create_file(os.path.join(temp_folder, '2.txt'), 'abc def')
+    self.create_file(os.path.join(temp_folder, '3.txt'), 'abc')
+    tfidf.run([
+        '--uris=%s/*' % temp_folder,
+        '--output', os.path.join(temp_folder, 'result')])
+    # Parse result file and compare.
+    results = []
+    with open(os.path.join(temp_folder,
+                           'result-00000-of-00001')) as result_file:
+      for line in result_file:
+        match = re.search(EXPECTED_LINE_RE, line)
+        logging.info('Result line: %s', line)
+        if match is not None:
+          results.append(
+              (match.group(1), match.group(2), float(match.group(3))))
+    logging.info('Computed results: %s', set(results))
+    self.assertEqual(set(results), EXPECTED_RESULTS)
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
new file mode 100644
index 0000000..d0935fe
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -0,0 +1,170 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""An example that reads Wikipedia edit data and computes strings of edits.
+
+An example that reads Wikipedia edit data from Cloud Storage and computes the
+user with the longest string of edits separated by no more than an hour within
+each 30 day period.
+
+To execute this pipeline locally using the DirectPipelineRunner, specify an
+output prefix on GCS:
+  --output gs://YOUR_OUTPUT_PREFIX
+
+To execute this pipeline using the Google Cloud Dataflow service, specify
+pipeline configuration in addition to the above:
+  --job_name NAME_FOR_YOUR_JOB
+  --project YOUR_PROJECT_ID
+  --staging_location gs://YOUR_STAGING_DIRECTORY
+  --temp_location gs://YOUR_TEMPORARY_DIRECTORY
+  --runner BlockingDataflowPipelineRunner
+
+The default input is gs://dataflow-samples/wikipedia_edits/*.json and can be
+overridden with --input.
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import json
+import logging
+import sys
+
+import google.cloud.dataflow as df
+from google.cloud.dataflow import combiners
+from google.cloud.dataflow import window
+
+ONE_HOUR_IN_SECONDS = 3600
+THIRTY_DAYS_IN_SECONDS = 30 * 24 * ONE_HOUR_IN_SECONDS
+
+
+class ExtractUserAndTimestampDoFn(df.DoFn):
+  """Extracts user and timestamp representing a Wikipedia edit."""
+
+  def process(self, context):
+    table_row = json.loads(context.element)
+    if 'contributor_username' in table_row:
+      user_name = table_row['contributor_username']
+      timestamp = table_row['timestamp']
+      yield window.TimestampedValue(user_name, timestamp)
+
+
+class ComputeSessions(df.PTransform):
+  """Computes the number of edits in each user session.
+
+  A session is defined as a string of edits where each is separated from the
+  next by less than an hour.
+  """
+
+  def __init__(self):
+    super(ComputeSessions, self).__init__()
+
+  def apply(self, pcoll):
+    return (pcoll
+            | df.WindowInto('ComputeSessionsWindow',
+                            window.Sessions(gap_size=ONE_HOUR_IN_SECONDS))
+            | combiners.Count.PerElement())
+
+
+class TopPerMonth(df.PTransform):
+  """Computes the longest session ending in each month."""
+
+  def __init__(self):
+    super(TopPerMonth, self).__init__()
+
+  def apply(self, pcoll):
+    return (pcoll
+            | df.WindowInto('TopPerMonthWindow',
+                            window.FixedWindows(
+                                size=THIRTY_DAYS_IN_SECONDS))
+            | combiners.core.CombineGlobally(
+                'Top',
+                combiners.TopCombineFn(
+                    10, lambda first, second: first[1] < second[1]))
+            .without_defaults())
+
+
+class SessionsToStringsDoFn(df.DoFn):
+  """Adds the session information to be part of the key."""
+
+  def process(self, context):
+    yield (context.element[0] + ' : ' +
+           ', '.join([str(w) for w in context.windows]), context.element[1])
+
+
+class FormatOutputDoFn(df.DoFn):
+  """Formats a string containing the user, count, and session."""
+
+  def process(self, context):
+    for kv in context.element:
+      session = kv[0]
+      count = kv[1]
+      yield (session + ' : ' + str(count) + ' : '
+             + ', '.join([str(w) for w in context.windows]))
+
+
+class ComputeTopSessions(df.PTransform):
+  """Computes the top user sessions for each month."""
+
+  def __init__(self, sampling_threshold):
+    super(ComputeTopSessions, self).__init__()
+    self.sampling_threshold = sampling_threshold
+
+  def apply(self, pcoll):
+    return (pcoll
+            | df.ParDo('ExtractUserAndTimestamp', ExtractUserAndTimestampDoFn())
+            | df.Filter(
+                lambda x: abs(hash(x)) <= sys.maxint * self.sampling_threshold)
+            | ComputeSessions()
+            | df.ParDo('SessionsToStrings', SessionsToStringsDoFn())
+            | TopPerMonth()
+            | df.ParDo('FormatOutput', FormatOutputDoFn()))
+
+
+def run(argv=None):
+  """Runs the Wikipedia top edits pipeline.
+
+  Args:
+    argv: Pipeline options as a list of arguments.
+  """
+
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      default='gs://dataflow-samples/wikipedia_edits/*.json',
+      help='Input specified as a GCS path containing a BigQuery table exported '
+      'as json.')
+  parser.add_argument('--output',
+                      required=True,
+                      help='Output file to write results to.')
+  parser.add_argument('--sampling_threshold',
+                      type=float,
+                      default=0.1,
+                      help='Fraction of entries used for session tracking')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  p = df.Pipeline(argv=pipeline_args)
+
+  (p  # pylint: disable=expression-not-assigned
+   | df.Read('read', df.io.TextFileSource(known_args.input))
+   | ComputeTopSessions(known_args.sampling_threshold)
+   | df.io.Write('write', df.io.TextFileSink(known_args.output)))
+
+  p.run()
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
new file mode 100644
index 0000000..a4fdf8c
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
@@ -0,0 +1,58 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test for the top wikipedia sessions example."""
+
+import json
+import unittest
+
+
+import google.cloud.dataflow as df
+from google.cloud.dataflow.examples.complete import top_wikipedia_sessions
+
+
+class ComputeTopSessionsTest(unittest.TestCase):
+
+  EDITS = [
+      json.dumps({'timestamp': 0.0, 'contributor_username': 'user1'}),
+      json.dumps({'timestamp': 0.001, 'contributor_username': 'user1'}),
+      json.dumps({'timestamp': 0.002, 'contributor_username': 'user1'}),
+      json.dumps({'timestamp': 0.0, 'contributor_username': 'user2'}),
+      json.dumps({'timestamp': 0.001, 'contributor_username': 'user2'}),
+      json.dumps({'timestamp': 3.601, 'contributor_username': 'user2'}),
+      json.dumps({'timestamp': 3.602, 'contributor_username': 'user2'}),
+      json.dumps(
+          {'timestamp': 2 * 3600.0, 'contributor_username': 'user2'}),
+      json.dumps(
+          {'timestamp': 35 * 24 * 3.600, 'contributor_username': 'user3'})
+  ]
+
+  EXPECTED = [
+      'user1 : [0.0, 3600.002) : 3 : [0.0, 2592000.0)',
+      'user2 : [0.0, 3603.602) : 4 : [0.0, 2592000.0)',
+      'user2 : [7200.0, 10800.0) : 1 : [0.0, 2592000.0)',
+      'user3 : [3024.0, 6624.0) : 1 : [0.0, 2592000.0)',
+  ]
+
+  def test_compute_top_sessions(self):
+    p = df.Pipeline('DirectPipelineRunner')
+    edits = p | df.Create('create', self.EDITS)
+    result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0)
+
+    df.assert_that(result, df.equal_to(self.EXPECTED))
+    p.run()
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
new file mode 100644
index 0000000..67616ec
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
@@ -0,0 +1,127 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A workflow that writes to a BigQuery table with nested and repeated fields.
+
+Demonstrates how to build a bigquery.TableSchema object with nested and repeated
+fields. Also, shows how to generate data to be written to a BigQuery table with
+nested and repeated fields.
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+
+import google.cloud.dataflow as df
+
+
+def run(argv=None):
+  """Run the workflow."""
+  parser = argparse.ArgumentParser()
+
+  parser.add_argument(
+      '--output',
+      required=True,
+      help=
+      ('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
+       'or DATASET.TABLE.'))
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  p = df.Pipeline(argv=pipeline_args)
+
+  from google.cloud.dataflow.internal.clients import bigquery  # pylint: disable=g-import-not-at-top
+
+  table_schema = bigquery.TableSchema()
+
+  # Fields that use standard types.
+  kind_schema = bigquery.TableFieldSchema()
+  kind_schema.name = 'kind'
+  kind_schema.type = 'string'
+  kind_schema.mode = 'nullable'
+  table_schema.fields.append(kind_schema)
+
+  full_name_schema = bigquery.TableFieldSchema()
+  full_name_schema.name = 'fullName'
+  full_name_schema.type = 'string'
+  full_name_schema.mode = 'required'
+  table_schema.fields.append(full_name_schema)
+
+  age_schema = bigquery.TableFieldSchema()
+  age_schema.name = 'age'
+  age_schema.type = 'integer'
+  age_schema.mode = 'nullable'
+  table_schema.fields.append(age_schema)
+
+  gender_schema = bigquery.TableFieldSchema()
+  gender_schema.name = 'gender'
+  gender_schema.type = 'string'
+  gender_schema.mode = 'nullable'
+  table_schema.fields.append(gender_schema)
+
+  # A nested field
+  phone_number_schema = bigquery.TableFieldSchema()
+  phone_number_schema.name = 'phoneNumber'
+  phone_number_schema.type = 'record'
+  phone_number_schema.mode = 'nullable'
+
+  area_code = bigquery.TableFieldSchema()
+  area_code.name = 'areaCode'
+  area_code.type = 'integer'
+  area_code.mode = 'nullable'
+  phone_number_schema.fields.append(area_code)
+
+  number = bigquery.TableFieldSchema()
+  number.name = 'number'
+  number.type = 'integer'
+  number.mode = 'nullable'
+  phone_number_schema.fields.append(number)
+  table_schema.fields.append(phone_number_schema)
+
+  # A repeated field.
+  children_schema = bigquery.TableFieldSchema()
+  children_schema.name = 'children'
+  children_schema.type = 'string'
+  children_schema.mode = 'repeated'
+  table_schema.fields.append(children_schema)
+
+  def create_random_record(record_id):
+    return {'kind': 'kind' + record_id, 'fullName': 'fullName'+record_id,
+            'age': int(record_id) * 10, 'gender': 'male',
+            'phoneNumber': {
+                'areaCode': int(record_id) * 100,
+                'number': int(record_id) * 100000},
+            'children': ['child' + record_id + '1',
+                         'child' + record_id + '2',
+                         'child' + record_id + '3']
+           }
+
+  # pylint: disable=expression-not-assigned
+  record_ids = p | df.Create('CreateIDs', ['1', '2', '3', '4', '5'])
+  records = record_ids | df.Map('CreateRecords', create_random_record)
+  records | df.io.Write(
+      'write',
+      df.io.BigQuerySink(
+          known_args.output,
+          schema=table_schema,
+          create_disposition=df.io.BigQueryDisposition.CREATE_IF_NEEDED,
+          write_disposition=df.io.BigQueryDisposition.WRITE_TRUNCATE))
+
+  # Run the pipeline (all operations are deferred until run() is called).
+  p.run()
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
new file mode 100644
index 0000000..20ef8d9
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
@@ -0,0 +1,114 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A Dataflow job that uses BigQuery sources as a side inputs.
+
+Illustrates how to insert side-inputs into transforms in three different forms,
+as a singleton, as a iterator, and as a list.
+
+This workflow generate a set of tuples of the form (groupId, corpus, word) where
+groupId is a generated identifier for the group and corpus and word are randomly
+selected from corresponding rows in BQ dataset 'publicdata:samples.shakespeare'.
+Users should specify the number of groups to form and optionally a corpus and/or
+a word that should be ignored when forming groups.
+"""
+
+import argparse
+import logging
+from random import randrange
+
+import google.cloud.dataflow as df
+
+from google.cloud.dataflow.pvalue import AsIter
+from google.cloud.dataflow.pvalue import AsList
+from google.cloud.dataflow.pvalue import AsSingleton
+
+
+def create_groups(group_ids, corpus, word, ignore_corpus, ignore_word):
+  """Generate groups given the input PCollections."""
+
+  def attach_corpus_fn(group, corpus, ignore):
+    selected = None
+    len_corpus = len(corpus)
+    while not selected:
+      c = corpus[randrange(0, len_corpus - 1)].values()[0]
+      if c != ignore:
+        selected = c
+
+    yield (group, selected)
+
+  def attach_word_fn(group, words, ignore):
+    selected = None
+    len_words = len(words)
+    while not selected:
+      c = words[randrange(0, len_words - 1)].values()[0]
+      if c != ignore:
+        selected = c
+
+    yield group + (selected,)
+
+  return (group_ids
+          | df.FlatMap(
+              'attach corpus',
+              attach_corpus_fn,
+              AsList(corpus),
+              AsSingleton(ignore_corpus))
+          | df.FlatMap(
+              'attach word',
+              attach_word_fn,
+              AsIter(word),
+              AsSingleton(ignore_word)))
+
+
+def run(argv=None):
+  """Run the workflow."""
+
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--output')
+  parser.add_argument('--ignore_corpus', default='')
+  parser.add_argument('--ignore_word', default='')
+  parser.add_argument('--num_groups')
+
+  known_args, pipeline_args = parser.parse_known_args(argv)
+  p = df.Pipeline(argv=pipeline_args)
+
+  group_ids = []
+  for i in xrange(0, int(known_args.num_groups)):
+    group_ids.append('id' + str(i))
+
+  query_corpus = 'select UNIQUE(corpus) from publicdata:samples.shakespeare'
+  query_word = 'select UNIQUE(word) from publicdata:samples.shakespeare'
+  ignore_corpus = known_args.ignore_corpus
+  ignore_word = known_args.ignore_word
+
+  pcoll_corpus = p | df.Read('read corpus',
+                             df.io.BigQuerySource(query=query_corpus))
+  pcoll_word = p | df.Read('read words',
+                           df.io.BigQuerySource(query=query_word))
+  pcoll_ignore_corpus = p | df.Create('create_ignore_corpus', [ignore_corpus])
+  pcoll_ignore_word = p | df.Create('create_ignore_word', [ignore_word])
+  pcoll_group_ids = p | df.Create('create groups', group_ids)
+
+  pcoll_groups = create_groups(pcoll_group_ids, pcoll_corpus, pcoll_word,
+                               pcoll_ignore_corpus, pcoll_ignore_word)
+
+  # pylint:disable=expression-not-assigned
+  pcoll_groups | df.io.Write('WriteToText',
+                             df.io.TextFileSink(known_args.output))
+  p.run()
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()



Mime
View raw message