beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [47/50] [abbrv] incubator-beam git commit: Move all files to apache_beam folder
Date Tue, 14 Jun 2016 23:13:22 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
new file mode 100644
index 0000000..c601801
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -0,0 +1,59 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test for the BigQuery side input example."""
+
+import logging
+import unittest
+
+import google.cloud.dataflow as df
+from google.cloud.dataflow.examples.cookbook import bigquery_side_input
+
+
+class BigQuerySideInputTest(unittest.TestCase):
+
+  def test_create_groups(self):
+    p = df.Pipeline('DirectPipelineRunner')
+
+    group_ids_pcoll = p | df.Create('create_group_ids', ['A', 'B', 'C'])
+    corpus_pcoll = p | df.Create('create_corpus',
+                                 [{'f': 'corpus1'},
+                                  {'f': 'corpus2'},
+                                  {'f': 'corpus3'}])
+    words_pcoll = p | df.Create('create_words', [{'f': 'word1'},
+                                                 {'f': 'word2'},
+                                                 {'f': 'word3'}])
+    ignore_corpus_pcoll = p | df.Create('create_ignore_corpus', ['corpus1'])
+    ignore_word_pcoll = p | df.Create('create_ignore_word', ['word1'])
+
+    groups = bigquery_side_input.create_groups(group_ids_pcoll, corpus_pcoll,
+                                               words_pcoll, ignore_corpus_pcoll,
+                                               ignore_word_pcoll)
+
+    def group_matcher(actual):
+      self.assertEqual(len(actual), 3)
+      for group in actual:
+        self.assertEqual(len(group), 3)
+        self.assertTrue(group[1].startswith('corpus'))
+        self.assertNotEqual(group[1], 'corpus1')
+        self.assertTrue(group[2].startswith('word'))
+        self.assertNotEqual(group[2], 'word1')
+
+    df.assert_that(groups, group_matcher)
+    p.run()
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
new file mode 100644
index 0000000..ba3a41d
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
@@ -0,0 +1,96 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A workflow using BigQuery sources and sinks.
+
+The workflow will read from a table that has the 'month' and 'tornado' fields as
+part of the table schema (other additional fields are ignored). The 'month'
+field is a number represented as a string (e.g., '23') and the 'tornado' field
+is a boolean field.
+
+The workflow will compute the number of tornadoes in each month and output
+the results to a table (created if needed) with the following schema:
+  - month: number
+  - tornado_count: number
+
+This example uses the default behavior for BigQuery source and sinks that
+represents table rows as plain Python dictionaries.
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+
+import google.cloud.dataflow as df
+
+
+def count_tornadoes(input_data):
+  """Workflow computing the number of tornadoes for each month that had one.
+
+  Args:
+    input_data: a PCollection of dictionaries representing table rows. Each
+      dictionary will have a 'month' and a 'tornado' key as described in the
+      module comment.
+
+  Returns:
+    A PCollection of dictionaries containing 'month' and 'tornado_count' keys.
+    Months without tornadoes are skipped.
+  """
+
+  return (input_data
+          | df.FlatMap(
+              'months with tornadoes',
+              lambda row: [(int(row['month']), 1)] if row['tornado'] else [])
+          | df.CombinePerKey('monthly count', sum)
+          | df.Map('format', lambda (k, v): {'month': k, 'tornado_count': v}))
+
+
+def run(argv=None):
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--input',
+                      default='clouddataflow-readonly:samples.weather_stations',
+                      help=('Input BigQuery table to process specified as: '
+                            'PROJECT:DATASET.TABLE or DATASET.TABLE.'))
+  parser.add_argument(
+      '--output',
+      required=True,
+      help=
+      ('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
+       'or DATASET.TABLE.'))
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  p = df.Pipeline(argv=pipeline_args)
+
+  # Read the table rows into a PCollection.
+  rows = p | df.io.Read('read', df.io.BigQuerySource(known_args.input))
+  counts = count_tornadoes(rows)
+
+  # Write the output using a "Write" transform that has side effects.
+  # pylint: disable=expression-not-assigned
+  counts | df.io.Write(
+      'write',
+      df.io.BigQuerySink(
+          known_args.output,
+          schema='month:INTEGER, tornado_count:INTEGER',
+          create_disposition=df.io.BigQueryDisposition.CREATE_IF_NEEDED,
+          write_disposition=df.io.BigQueryDisposition.WRITE_TRUNCATE))
+
+  # Run the pipeline (all operations are deferred until run() is called).
+  p.run()
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
new file mode 100644
index 0000000..c37cbee
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
@@ -0,0 +1,41 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test for the BigQuery tornadoes example."""
+
+import logging
+import unittest
+
+import google.cloud.dataflow as df
+from google.cloud.dataflow.examples.cookbook import bigquery_tornadoes
+
+
+class BigQueryTornadoesTest(unittest.TestCase):
+
+  def test_basics(self):
+    p = df.Pipeline('DirectPipelineRunner')
+    rows = (p | df.Create('create', [
+        {'month': 1, 'day': 1, 'tornado': False},
+        {'month': 1, 'day': 2, 'tornado': True},
+        {'month': 1, 'day': 3, 'tornado': True},
+        {'month': 2, 'day': 1, 'tornado': True}]))
+    results = bigquery_tornadoes.count_tornadoes(rows)
+    df.assert_that(results, df.equal_to([{'month': 1, 'tornado_count': 2},
+                                         {'month': 2, 'tornado_count': 1}]))
+    p.run()
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
new file mode 100644
index 0000000..c0a4d00
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
@@ -0,0 +1,84 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A BigShuffle workflow."""
+
+from __future__ import absolute_import
+
+import argparse
+import binascii
+import logging
+
+
+import google.cloud.dataflow as df
+
+
+def crc32line(line):
+  return binascii.crc32(line) & 0xffffffff
+
+
+def run(argv=None):
+  # pylint: disable=expression-not-assigned
+
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--input',
+                      required=True,
+                      help='Input file pattern to process.')
+  parser.add_argument('--output',
+                      required=True,
+                      help='Output file pattern to write results to.')
+  parser.add_argument('--checksum_output',
+                      required=True,
+                      help='Checksum output file pattern.')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  p = df.Pipeline(argv=pipeline_args)
+
+  # Read the text file[pattern] into a PCollection.
+  lines = p | df.io.Read('read', df.io.TextFileSource(known_args.input))
+
+  # Count the occurrences of each word.
+  output = (lines
+            | df.Map('split', lambda x: (x[:10], x[10:99]))
+            | df.GroupByKey('group')
+            | df.FlatMap(
+                'format',
+                lambda (key, vals): ['%s%s' % (key, val) for val in vals]))
+
+  input_csum = (lines
+                | df.Map('input-csum', crc32line)
+                | df.CombineGlobally('combine-input-csum', sum)
+                | df.Map('hex-format', lambda x: '%x' % x))
+  input_csum | df.io.Write(
+      'write-input-csum',
+      df.io.TextFileSink(known_args.checksum_output + '-input'))
+
+  # Write the output using a "Write" transform that has side effects.
+  output | df.io.Write('write', df.io.TextFileSink(known_args.output))
+  # Write the output checksum
+  output_csum = (output
+                 | df.Map('output-csum', crc32line)
+                 | df.CombineGlobally('combine-output-csum', sum)
+                 | df.Map('hex-format-output', lambda x: '%x' % x))
+  output_csum | df.io.Write(
+      'write-output-csum',
+      df.io.TextFileSink(known_args.checksum_output + '-output'))
+
+  # Actually run the pipeline (all operations above are deferred).
+  p.run()
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py
new file mode 100644
index 0000000..5697a26
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py
@@ -0,0 +1,61 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test for the bigshuffle example."""
+
+import logging
+import tempfile
+import unittest
+
+from google.cloud.dataflow.examples.cookbook import bigshuffle
+
+
+# TODO(dataflow-python): use gensort to generate input files.
+class BigShuffleTest(unittest.TestCase):
+
+  SAMPLE_TEXT = 'a b c a b a\naa bb cc aa bb aa'
+
+  def create_temp_file(self, contents):
+    with tempfile.NamedTemporaryFile(delete=False) as f:
+      f.write(contents)
+      return f.name
+
+  def test_basics(self):
+    temp_path = self.create_temp_file(self.SAMPLE_TEXT)
+    bigshuffle.run([
+        '--input=%s*' % temp_path,
+        '--output=%s.result' % temp_path,
+        '--checksum_output=%s.checksum' % temp_path])
+    # Parse result file and compare.
+    results = []
+    with open(temp_path + '.result-00000-of-00001') as result_file:
+      for line in result_file:
+        results.append(line.strip())
+    expected = self.SAMPLE_TEXT.split('\n')
+    self.assertEqual(sorted(results), sorted(expected))
+    # Check the checksums
+    input_csum = ''
+    with open(temp_path + '.checksum-input-00000-of-00001') as input_csum_file:
+      input_csum = input_csum_file.read().strip()
+    output_csum = ''
+    with open(temp_path +
+              '.checksum-output-00000-of-00001') as output_csum_file:
+      output_csum = output_csum_file.read().strip()
+    expected_csum = 'd629c1f6'
+    self.assertEqual(input_csum, expected_csum)
+    self.assertEqual(input_csum, output_csum)
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/cookbook/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders.py b/sdks/python/apache_beam/examples/cookbook/coders.py
new file mode 100644
index 0000000..b9bf66d
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/coders.py
@@ -0,0 +1,92 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A workflow using custom JSON-based coders for text sources and sinks.
+
+The input file contains a JSON string on each line describing a match
+record using the following schema:
+
+  {'guest': [TEAM_NAME, GOALS], 'host': [TEAM_NAME, GOALS]}
+
+The output file will contain the computed points for each team with one team
+per line in the following format:
+
+  [TEAM_NAME, POINTS]
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import json
+import logging
+
+import google.cloud.dataflow as df
+
+
+class JsonCoder(object):
+  """A JSON coder interpreting each line as a JSON string."""
+
+  def encode(self, x):
+    return json.dumps(x)
+
+  def decode(self, x):
+    return json.loads(x)
+
+
+def compute_points(record):
+  """Compute points based on the record containing the match result.
+
+  The function assigns 3 points for a win, 1 point for a draw, and 0 points for
+  a loss (see http://en.wikipedia.org/wiki/Three_points_for_a_win).
+  """
+  host_name, host_goals = record['host']
+  guest_name, guest_goals = record['guest']
+  if host_goals == guest_goals:
+    yield host_name, 1
+    yield guest_name, 1
+  elif host_goals > guest_goals:
+    yield host_name, 3
+    yield guest_name, 0
+  else:
+    yield host_name, 0
+    yield guest_name, 3
+
+
+def run(argv=None):
+  """Runs the workflow computing total points from a collection of matches."""
+
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--input',
+                      required=True,
+                      help='Input file to process.')
+  parser.add_argument('--output',
+                      required=True,
+                      help='Output file to write results to.')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  p = df.Pipeline(argv=pipeline_args)
+  (p  # pylint: disable=expression-not-assigned
+   | df.io.Read('read',
+                df.io.TextFileSource(known_args.input,
+                                     coder=JsonCoder()))
+   | df.FlatMap('points', compute_points) | df.CombinePerKey(sum) | df.io.Write(
+       'write',
+       df.io.TextFileSink(known_args.output,
+                          coder=JsonCoder())))
+  p.run()
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/cookbook/coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py b/sdks/python/apache_beam/examples/cookbook/coders_test.py
new file mode 100644
index 0000000..33fe64d
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py
@@ -0,0 +1,56 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test for the coders example."""
+
+import json
+import logging
+import tempfile
+import unittest
+
+from google.cloud.dataflow.examples.cookbook import coders
+
+
+class CodersTest(unittest.TestCase):
+
+  SAMPLE_RECORDS = [
+      {'host': ['Germany', 1], 'guest': ['Italy', 0]},
+      {'host': ['Germany', 1], 'guest': ['Brasil', 3]},
+      {'host': ['Brasil', 1], 'guest': ['Italy', 0]}]
+
+  def create_temp_file(self, records):
+    with tempfile.NamedTemporaryFile(delete=False) as f:
+      for record in records:
+        f.write('%s\n' % json.dumps(record))
+      return f.name
+
+  def test_basics(self):
+    temp_path = self.create_temp_file(self.SAMPLE_RECORDS)
+    coders.run([
+        '--input=%s*' % temp_path,
+        '--output=%s.result' % temp_path])
+    # Parse result file and compare.
+    results = []
+    with open(temp_path + '.result-00000-of-00001') as result_file:
+      for line in result_file:
+        results.append(json.loads(line))
+      logging.info('result: %s', results)
+    self.assertEqual(
+        sorted(results),
+        sorted([['Italy', 0], ['Brasil', 6], ['Germany', 3]]))
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/cookbook/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/combiners_test.py b/sdks/python/apache_beam/examples/cookbook/combiners_test.py
new file mode 100644
index 0000000..ecab671
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/combiners_test.py
@@ -0,0 +1,73 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Simple tests to showcase combiners.
+
+The tests are meant to be "copy/paste" code snippets for the topic they address
+(combiners in this case). Most examples use neither sources nor sinks.
+The input data is generated simply with a Create transform and the output is
+checked directly on the last PCollection produced.
+"""
+
+import logging
+import unittest
+
+import google.cloud.dataflow as df
+
+
+class CombinersTest(unittest.TestCase):
+  """Tests showcasing Dataflow combiners."""
+
+  SAMPLE_DATA = [
+      ('a', 1), ('b', 10), ('a', 2), ('a', 3), ('b', 20), ('c', 100)]
+
+  def test_combine_per_key_with_callable(self):
+    """CombinePerKey using a standard callable reducing iterables.
+
+    A common case for Dataflow combiners is to sum (or max or min) over the
+    values of each key. Such standard functions can be used directly as combiner
+    functions. In fact, any function "reducing" an iterable to a single value
+    can be used.
+    """
+    result = (
+        df.Pipeline(runner=df.runners.DirectPipelineRunner())
+        | df.Create(CombinersTest.SAMPLE_DATA)
+        | df.CombinePerKey(sum))
+
+    df.assert_that(result, df.equal_to([('a', 6), ('b', 30), ('c', 100)]))
+    result.pipeline.run()
+
+  def test_combine_per_key_with_custom_callable(self):
+    """CombinePerKey using a custom function reducing iterables."""
+    def multiply(values):
+      result = 1
+      for v in values:
+        result *= v
+      return result
+
+    result = (
+        df.Pipeline(runner=df.runners.DirectPipelineRunner())
+        | df.Create(CombinersTest.SAMPLE_DATA)
+        | df.CombinePerKey(multiply))
+
+    df.assert_that(result, df.equal_to([('a', 6), ('b', 200), ('c', 100)]))
+    result.pipeline.run()
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
new file mode 100644
index 0000000..011adc5
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
@@ -0,0 +1,132 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Various implementations of a Count custom PTransform.
+
+These example show the different ways you can write custom PTransforms.
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+
+import google.cloud.dataflow as df
+
+from google.cloud.dataflow.utils.options import PipelineOptions
+
+
+# pylint doesn't understand our pipeline syntax:
+# pylint:disable=expression-not-assigned
+
+
+def run_count1(known_args, options):
+  """Runs the first example pipeline."""
+
+  class Count(df.PTransform):
+    """Count as a subclass of PTransform, with an apply method."""
+
+    def apply(self, pcoll):
+      return (
+          pcoll
+          | df.Map('Init', lambda v: (v, 1))
+          | df.CombinePerKey(sum))
+
+  logging.info('Running first pipeline')
+  p = df.Pipeline(options=options)
+  (p | df.io.Read(df.io.TextFileSource(known_args.input)) | Count()
+   | df.io.Write(df.io.TextFileSink(known_args.output)))
+  p.run()
+
+
+def run_count2(known_args, options):
+  """Runs the second example pipeline."""
+
+  @df.ptransform_fn
+  def Count(label, pcoll):      # pylint: disable=invalid-name,unused-argument
+    """Count as a decorated function."""
+    return (
+        pcoll
+        | df.Map('Init', lambda v: (v, 1))
+        | df.CombinePerKey(sum))
+
+  logging.info('Running second pipeline')
+  p = df.Pipeline(options=options)
+  (p | df.io.Read(df.io.TextFileSource(known_args.input))
+   | Count()  # pylint: disable=no-value-for-parameter
+   | df.io.Write(df.io.TextFileSink(known_args.output)))
+  p.run()
+
+
+def run_count3(known_args, options):
+  """Runs the third example pipeline."""
+
+  @df.ptransform_fn
+  # pylint: disable=invalid-name,unused-argument
+  def Count(label, pcoll, factor=1):
+    """Count as a decorated function with a side input.
+
+    Args:
+      label: optional label for this transform
+      pcoll: the PCollection passed in from the previous transform
+      factor: the amount by which to count
+
+    Returns:
+      A PCollection counting the number of times each unique element occurs.
+    """
+    return (
+        pcoll
+        | df.Map('Init', lambda v: (v, factor))
+        | df.CombinePerKey(sum))
+
+  logging.info('Running third pipeline')
+  p = df.Pipeline(options=options)
+  (p | df.io.Read(df.io.TextFileSource(known_args.input))
+   | Count(2)  # pylint: disable=no-value-for-parameter
+   | df.io.Write(df.io.TextFileSink(known_args.output)))
+  p.run()
+
+
+def get_args(argv):
+  """Determines user specified arguments from the given list of arguments.
+
+  Args:
+    argv: all arguments.
+
+  Returns:
+    A pair of argument lists containing known and remaining arguments.
+  """
+
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--input',
+                      required=True,
+                      help='Input file to process.')
+  parser.add_argument('--output',
+                      required=True,
+                      help='Output file to write results to.')
+  return parser.parse_known_args(argv)
+
+
+def run(argv=None):
+  known_args, pipeline_args = get_args(argv)
+  options = PipelineOptions(pipeline_args)
+
+  run_count1(known_args, options)
+  run_count2(known_args, options)
+  run_count3(known_args, options)
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
new file mode 100644
index 0000000..a7da666
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
@@ -0,0 +1,64 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Tests for the various custom Count implementation examples."""
+
+import logging
+import tempfile
+import unittest
+
+from google.cloud.dataflow.examples.cookbook import custom_ptransform
+from google.cloud.dataflow.utils.options import PipelineOptions
+
+
+class CustomCountTest(unittest.TestCase):
+
+  def test_count1(self):
+    self.run_pipeline(custom_ptransform.run_count1)
+
+  def test_count2(self):
+    self.run_pipeline(custom_ptransform.run_count2)
+
+  def test_count3(self):
+    self.run_pipeline(custom_ptransform.run_count3, factor=2)
+
+  def run_pipeline(self, count_implementation, factor=1):
+    input_path = self.create_temp_file('CAT\nDOG\nCAT\nCAT\nDOG\n')
+    output_path = input_path + '.result'
+
+    known_args, pipeline_args = custom_ptransform.get_args([
+        '--input=%s*' % input_path, '--output=%s' % output_path
+    ])
+
+    count_implementation(known_args, PipelineOptions(pipeline_args))
+    self.assertEqual(["(u'CAT', %d)" % (3 * factor),
+                      "(u'DOG', %d)" % (2 * factor)],
+                     self.get_output(output_path + '-00000-of-00001'))
+
+  def create_temp_file(self, contents=''):
+    with tempfile.NamedTemporaryFile(delete=False) as f:
+      f.write(contents)
+      return f.name
+
+  def get_output(self, path):
+    logging.info('Reading output from "%s"', path)
+    lines = []
+    with open(path) as f:
+      lines = f.readlines()
+    return sorted(s.rstrip('\n') for s in lines)
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/cookbook/filters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py
new file mode 100644
index 0000000..f576fb6
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/filters.py
@@ -0,0 +1,104 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""An example workflow that demonstrates filters and other features.
+
+  - Reading and writing data from BigQuery.
+  - Manipulating BigQuery rows (as Python dicts) in memory.
+  - Global aggregates.
+  - Filtering PCollections using both user-specified parameters
+    as well as global aggregates computed during pipeline execution.
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+
+import google.cloud.dataflow as df
+from google.cloud.dataflow.pvalue import AsSingleton
+
+
+def filter_cold_days(input_data, month_filter):
+  """Workflow computing rows in a specific month with low temperatures.
+
+  Args:
+    input_data: a PCollection of dictionaries representing table rows. Each
+      dictionary must have the keys ['year', 'month', 'day', and 'mean_temp'].
+    month_filter: an int representing the month for which colder-than-average
+      days should be returned.
+
+  Returns:
+    A PCollection of dictionaries with the same keys described above. Each
+      row represents a day in the specified month where temperatures were
+      colder than the global mean temperature in the entire dataset.
+  """
+
+  # Project to only the desired fields from a complete input row.
+  # E.g., SELECT f1, f2, f3, ... FROM InputTable.
+  projection_fields = ['year', 'month', 'day', 'mean_temp']
+  fields_of_interest = (
+      input_data
+      | df.Map('projected',
+               lambda row: {f: row[f] for f in projection_fields}))
+
+  # Compute the global mean temperature.
+  global_mean = AsSingleton(
+      fields_of_interest
+      | df.Map('extract mean', lambda row: row['mean_temp'])
+      | df.combiners.Mean.Globally('global mean'))
+
+  # Filter to the rows representing days in the month of interest
+  # in which the mean daily temperature is below the global mean.
+  return (
+      fields_of_interest
+      | df.Filter('desired month', lambda row: row['month'] == month_filter)
+      | df.Filter('below mean',
+                  lambda row, mean: row['mean_temp'] < mean, global_mean))
+
+
+def run(argv=None):
+  """Constructs and runs the example filtering pipeline."""
+
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--input',
+                      help='BigQuery table to read from.',
+                      default='clouddataflow-readonly:samples.weather_stations')
+  parser.add_argument('--output',
+                      required=True,
+                      help='BigQuery table to write to.')
+  parser.add_argument('--month_filter',
+                      default=7,
+                      help='Numeric value of month to filter on.')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  p = df.Pipeline(argv=pipeline_args)
+
+  input_data = p | df.Read('input', df.io.BigQuerySource(known_args.input))
+
+  # pylint: disable=expression-not-assigned
+  (filter_cold_days(input_data, known_args.month_filter)
+   | df.io.Write('save to BQ', df.io.BigQuerySink(
+       known_args.output,
+       schema='year:INTEGER,month:INTEGER,day:INTEGER,mean_temp:FLOAT',
+       create_disposition=df.io.BigQueryDisposition.CREATE_IF_NEEDED,
+       write_disposition=df.io.BigQueryDisposition.WRITE_TRUNCATE)))
+
+  # Actually run the pipeline (all operations above are deferred).
+  p.run()
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/cookbook/filters_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters_test.py b/sdks/python/apache_beam/examples/cookbook/filters_test.py
new file mode 100644
index 0000000..3e88327
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/filters_test.py
@@ -0,0 +1,65 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test for the filters example."""
+
+import logging
+import unittest
+
+import google.cloud.dataflow as df
+from google.cloud.dataflow.examples.cookbook import filters
+
+
+class FiltersTest(unittest.TestCase):
+  # Note that 'removed' should be projected away by the pipeline
+  input_data = [
+      {'year': 2010, 'month': 1, 'day': 1, 'mean_temp': 3, 'removed': 'a'},
+      {'year': 2012, 'month': 1, 'day': 2, 'mean_temp': 3, 'removed': 'a'},
+      {'year': 2011, 'month': 1, 'day': 3, 'mean_temp': 5, 'removed': 'a'},
+      {'year': 2013, 'month': 2, 'day': 1, 'mean_temp': 3, 'removed': 'a'},
+      {'year': 2011, 'month': 3, 'day': 3, 'mean_temp': 5, 'removed': 'a'},
+      ]
+
+  def _get_result_for_month(self, month):
+    p = df.Pipeline('DirectPipelineRunner')
+    rows = (p | df.Create('create', self.input_data))
+
+    results = filters.filter_cold_days(rows, month)
+    return results
+
+  def test_basic(self):
+    """Test that the correct result is returned for a simple dataset."""
+    results = self._get_result_for_month(1)
+    df.assert_that(
+        results,
+        df.equal_to([{'year': 2010, 'month': 1, 'day': 1, 'mean_temp': 3},
+                     {'year': 2012, 'month': 1, 'day': 2, 'mean_temp': 3}]))
+    results.pipeline.run()
+
+  def test_basic_empty(self):
+    """Test that the correct empty result is returned for a simple dataset."""
+    results = self._get_result_for_month(3)
+    df.assert_that(results, df.equal_to([]))
+    results.pipeline.run()
+
+  def test_basic_empty_missing(self):
+    """Test that the correct empty result is returned for a missing month."""
+    results = self._get_result_for_month(4)
+    df.assert_that(results, df.equal_to([]))
+    results.pipeline.run()
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
new file mode 100644
index 0000000..c5f79fb
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
@@ -0,0 +1,111 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""An example of using custom classes and coder for grouping operations.
+
+This workflow demonstrates registration and usage of a custom coder for a user-
+defined class. A deterministic custom coder is needed to use a class as a key in
+a combine or group operation.
+
+This example assumes an input file with, on each line, a comma-separated name
+and score.
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import sys
+
+import google.cloud.dataflow as df
+from google.cloud.dataflow import coders
+from google.cloud.dataflow.typehints import typehints
+from google.cloud.dataflow.typehints.decorators import with_output_types
+
+
+class Player(object):
+  """A custom class used as a key in combine/group transforms."""
+
+  def __init__(self, name):
+    self.name = name
+
+
+class PlayerCoder(coders.Coder):
+  """A custom coder for the Player class."""
+
+  def encode(self, o):
+    """Encode to bytes with a trace that coder was used."""
+    # Our encoding prepends an 'x:' prefix.
+    return 'x:%s' % str(o.name)
+
+  def decode(self, s):
+    # To decode, we strip off the prepended 'x:' prefix.
+    assert s[0:2] == 'x:'
+    return Player(s[2:])
+
+  def is_deterministic(self):
+    # Since coded Player objects are used as keys below with
+    # df.CombinePerKey(sum), we require that this coder is deterministic
+    # (i.e., two equivalent instances of the classes are encoded into the same
+    # byte string) in order to guarantee consistent results.
+    return True
+
+
+# Annotate the get_players function so that the typehint system knows that the
+# input to the CombinePerKey operation is a key-value pair of a Player object
+# and an integer.
+@with_output_types(typehints.KV[Player, int])
+def get_players(descriptor):
+  name, points = descriptor.split(',')
+  return Player(name), int(points)
+
+
+def run(argv=sys.argv[1:]):
+  """Runs the workflow computing total points from a collection of matches."""
+
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--input',
+                      required=True,
+                      help='Input file to process.')
+  parser.add_argument('--output',
+                      required=True,
+                      help='Output file to write results to.')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  p = df.Pipeline(argv=pipeline_args)
+
+  # Register the custom coder for the Player class, so that it will be used in
+  # the computation.
+  coders.registry.register_coder(Player, PlayerCoder)
+
+  (p  # pylint: disable=expression-not-assigned
+   | df.io.Read('read', df.io.TextFileSource(known_args.input))
+   # The get_players function is annotated with a type hint above, so the type
+   # system knows the output type of the following operation is a key-value pair
+   # of a Player and an int. Please see the documentation for details on
+   # types that are inferred automatically as well as other ways to specify
+   # type hints.
+   | df.Map('get players', get_players)
+   # The output type hint of the previous step is used to infer that the key
+   # type of the following operation is the Player type. Since a custom coder
+   # is registered for the Player class above, a PlayerCoder will be used to
+   # encode Player objects as keys for this combine operation.
+   | df.CombinePerKey(sum) | df.Map(lambda (k, v): '%s,%d' % (k.name, v))
+   | df.io.Write('write', df.io.TextFileSink(known_args.output)))
+  p.run()
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
new file mode 100644
index 0000000..f7e500b
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
@@ -0,0 +1,87 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test for the custom coders example."""
+
+import logging
+import tempfile
+import unittest
+
+from google.cloud.dataflow.examples.cookbook import group_with_coder
+
+
+# Patch group_with_coder.PlayerCoder.decode(). To test that the PlayerCoder was
+# used, we do not strip the prepended 'x:' string when decoding a Player object.
+group_with_coder.PlayerCoder.decode = lambda self, s: group_with_coder.Player(s)
+
+
+class GroupWithCoderTest(unittest.TestCase):
+
+  SAMPLE_RECORDS = [
+      'joe,10', 'fred,3', 'mary,7',
+      'joe,20', 'fred,6', 'ann,5',
+      'joe,30', 'ann,10', 'mary,1']
+
+  def create_temp_file(self, records):
+    with tempfile.NamedTemporaryFile(delete=False) as f:
+      for record in records:
+        f.write('%s\n' % record)
+      return f.name
+
+  def test_basics_with_type_check(self):
+    # Run the workflow with --pipeline_type_check option. This will make sure
+    # the typehints associated with all transforms will have non-default values
+    # and therefore any custom coders will be used. In our case we want to make
+    # sure the coder for the Player class will be used.
+    temp_path = self.create_temp_file(self.SAMPLE_RECORDS)
+    group_with_coder.run([
+        '--pipeline_type_check',
+        '--input=%s*' % temp_path,
+        '--output=%s.result' % temp_path])
+    # Parse result file and compare.
+    results = []
+    with open(temp_path + '.result-00000-of-00001') as result_file:
+      for line in result_file:
+        name, points = line.split(',')
+        results.append((name, int(points)))
+      logging.info('result: %s', results)
+    self.assertEqual(
+        sorted(results),
+        sorted([('x:ann', 15), ('x:fred', 9), ('x:joe', 60), ('x:mary', 8)]))
+
+  def test_basics_without_type_check(self):
+    # Run the workflow without --pipeline_type_check option. This will make sure
+    # the typehints associated with all transforms will have default values and
+    # therefore any custom coders will not be used. The default coder (pickler)
+    # will be used instead.
+    temp_path = self.create_temp_file(self.SAMPLE_RECORDS)
+    group_with_coder.run([
+        '--no_pipeline_type_check',
+        '--input=%s*' % temp_path,
+        '--output=%s.result' % temp_path])
+    # Parse result file and compare.
+    results = []
+    with open(temp_path + '.result-00000-of-00001') as result_file:
+      for line in result_file:
+        name, points = line.split(',')
+        results.append((name, int(points)))
+      logging.info('result: %s', results)
+    self.assertEqual(
+        sorted(results),
+        sorted([('ann', 15), ('fred', 9), ('joe', 60), ('mary', 8)]))
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
new file mode 100644
index 0000000..9a09050
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -0,0 +1,126 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Merge phone, email, and mailing address information.
+
+A Dataflow pipeline that merges phone, email, and address information associated
+with the same names. Each input "database" is a tab-delimited text file pairing
+names with one phone number/email address/mailing address; multiple entries
+associated with the same name are allowed. Outputs are a tab-delimited text file
+with the merged information and another file containing some simple statistics.
+See mergecontacts_test.py for example inputs and outputs.
+
+A demonstration of:
+  CoGroupByKey
+  Non-linear pipelines (i.e., pipelines with branches)
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import re
+
+import google.cloud.dataflow as df
+
+
+def run(argv=None, assert_results=None):
+
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input_email',
+      required=True,
+      help='Email database, with each line formatted as "name<TAB>email".')
+  parser.add_argument(
+      '--input_phone',
+      required=True,
+      help='Phonebook, with each line formatted as "name<TAB>phone number".')
+  parser.add_argument(
+      '--input_snailmail',
+      required=True,
+      help='Address database, with each line formatted as "name<TAB>address".')
+  parser.add_argument('--output_tsv',
+                      required=True,
+                      help='Tab-delimited output file.')
+  parser.add_argument('--output_stats',
+                      required=True,
+                      help='Output file for statistics about the input.')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  p = df.Pipeline(argv=pipeline_args)
+
+  # Helper: read a tab-separated key-value mapping from a text file, escape all
+  # quotes/backslashes, and convert it a PCollection of (key, value) pairs.
+  def read_kv_textfile(label, textfile):
+    return (p
+            | df.io.Read('read_%s' % label, textfile)
+            | df.Map('backslash_%s' % label,
+                     lambda x: re.sub(r'\\', r'\\\\', x))
+            | df.Map('escape_quotes_%s' % label,
+                     lambda x: re.sub(r'"', r'\"', x))
+            | df.Map('split_%s' % label, lambda x: re.split(r'\t+', x, 1)))
+
+  # Read input databases.
+  email = read_kv_textfile('email',
+                           df.io.TextFileSource(known_args.input_email))
+  phone = read_kv_textfile('phone',
+                           df.io.TextFileSource(known_args.input_phone))
+  snailmail = read_kv_textfile('snailmail',
+                               df.io.TextFileSource(known_args.input_snailmail))
+
+  # Group together all entries under the same name.
+  grouped = (email, phone, snailmail) | df.CoGroupByKey('group_by_name')
+
+  # Prepare tab-delimited output; something like this:
+  # "name"<TAB>"email_1,email_2"<TAB>"phone"<TAB>"first_snailmail_only"
+  tsv_lines = grouped | df.Map(
+      lambda (name, (email, phone, snailmail)): '\t'.join(
+          ['"%s"' % name,
+           '"%s"' % ','.join(email),
+           '"%s"' % ','.join(phone),
+           '"%s"' % next(iter(snailmail), '')]))
+
+  # Compute some stats about our database of people.
+  luddites = grouped | df.Filter(  # People without email.
+      lambda (name, (email, phone, snailmail)): not next(iter(email), None))
+  writers = grouped | df.Filter(   # People without phones.
+      lambda (name, (email, phone, snailmail)): not next(iter(phone), None))
+  nomads = grouped | df.Filter(    # People without addresses.
+      lambda (name, (email, phone, snailmail)): not next(iter(snailmail), None))
+
+  num_luddites = luddites | df.combiners.Count.Globally('luddites')
+  num_writers = writers | df.combiners.Count.Globally('writers')
+  num_nomads = nomads | df.combiners.Count.Globally('nomads')
+
+  # Write tab-delimited output.
+  # pylint: disable=expression-not-assigned
+  tsv_lines | df.io.Write('write_tsv',
+                          df.io.TextFileSink(known_args.output_tsv))
+
+  # TODO(silviuc): Move the assert_results logic to the unit test.
+  if assert_results is not None:
+    expected_luddites, expected_writers, expected_nomads = assert_results
+    df.assert_that(num_luddites, df.equal_to([expected_luddites]),
+                   label='assert:luddites')
+    df.assert_that(num_writers, df.equal_to([expected_writers]),
+                   label='assert:writers')
+    df.assert_that(num_nomads, df.equal_to([expected_nomads]),
+                   label='assert:nomads')
+  # Execute pipeline.
+  p.run()
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
new file mode 100644
index 0000000..a06d087
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
@@ -0,0 +1,121 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test for the mergecontacts example."""
+
+import logging
+import tempfile
+import unittest
+
+from google.cloud.dataflow.examples.cookbook import mergecontacts
+
+
+class MergeContactsTest(unittest.TestCase):
+
+  CONTACTS_EMAIL = '\n'.join(['Nathan Nomad\tnathan@example.com',
+                              'Nicky Nomad\tnicky@example.com',
+                              'Noreen Nomad\tnoreen@example.com',
+                              'Noreen Nomad\tnomad@example.com',
+                              'Robert B\trobert@example.com',
+                              'Silviu C\tsilviu@example.com',
+                              'Tom S\ttom@example.com',
+                              'Wally Writer\twally@example.com',
+                              ''])
+
+  CONTACTS_PHONE = '\n'.join(['Larry Luddite\t724-228-3529',
+                              'Lisa Luddite\t304-277-3504',
+                              'Nathan Nomad\t412-466-8968',
+                              'Nicky Nomad\t724-379-5815',
+                              'Noreen Nomad\t412-472-0145',
+                              'Robert B\t814-865-8799',
+                              'Silviu C\t724-537-0671',
+                              'Tom S\t570-368-3420',
+                              'Tom S\t814-793-9655',
+                              ''])
+
+  CONTACTS_SNAILMAIL = '\n'.join(
+      ['Larry Luddite\t1949 Westcott St, Detroit, MI 48222',
+       'Lisa Luddite\t1949 Westcott St, Detroit, MI 48222',
+       'Robert B\t601 N 34th St, Seattle, WA 98103',
+       'Silviu C\t1600 Amphitheatre Pkwy, Mountain View, CA 94043',
+       'Tom S\t6425 Penn Ave Ste 700, Pittsburgh, PA 15206',
+       'Wally Writer\t101 Ridge Rd, Chalkyitsik, AK 99788',
+       ''])
+
+  EXPECTED_TSV = '\n'.join(
+      ['\t'.join(['"Larry Luddite"', '""', '"724-228-3529"',
+                  '"1949 Westcott St, Detroit, MI 48222"']),
+       '\t'.join(['"Lisa Luddite"', '""', '"304-277-3504"',
+                  '"1949 Westcott St, Detroit, MI 48222"']),
+       '\t'.join(['"Nathan Nomad"', '"nathan@example.com"', '"412-466-8968"',
+                  '""']),
+       '\t'.join(['"Nicky Nomad"', '"nicky@example.com"', '"724-379-5815"',
+                  '""']),
+       '\t'.join(['"Noreen Nomad"', '"nomad@example.com,noreen@example.com"',
+                  '"412-472-0145"', '""']),
+       '\t'.join(['"Robert B"', '"robert@example.com"', '"814-865-8799"',
+                  '"601 N 34th St, Seattle, WA 98103"']),
+       '\t'.join(['"Silviu C"', '"silviu@example.com"', '"724-537-0671"',
+                  '"1600 Amphitheatre Pkwy, Mountain View, CA 94043"']),
+       '\t'.join(['"Tom S"', '"tom@example.com"', '"570-368-3420,814-793-9655"',
+                  '"6425 Penn Ave Ste 700, Pittsburgh, PA 15206"']),
+       '\t'.join(['"Wally Writer"', '"wally@example.com"', '""',
+                  '"101 Ridge Rd, Chalkyitsik, AK 99788"']),
+       ''])
+
+  EXPECTED_STATS = '\n'.join(['2 luddites',
+                              '1 writers',
+                              '3 nomads',
+                              ''])
+
+  def create_temp_file(self, contents):
+    with tempfile.NamedTemporaryFile(delete=False) as f:
+      f.write(contents)
+      return f.name
+
+  def normalize_tsv_results(self, tsv_data):
+    """Sort .tsv file data so we can compare it with expected output."""
+    lines_in = tsv_data.strip().split('\n')
+    lines_out = []
+    for line in lines_in:
+      name, email, phone, snailmail = line.split('\t')
+      lines_out.append('\t'.join(
+          [name,
+           '"%s"' % ','.join(sorted(email.strip('"').split(','))),
+           '"%s"' % ','.join(sorted(phone.strip('"').split(','))),
+           snailmail]))
+    return '\n'.join(sorted(lines_out)) + '\n'
+
+  def test_mergecontacts(self):
+    path_email = self.create_temp_file(self.CONTACTS_EMAIL)
+    path_phone = self.create_temp_file(self.CONTACTS_PHONE)
+    path_snailmail = self.create_temp_file(self.CONTACTS_SNAILMAIL)
+
+    result_prefix = self.create_temp_file('')
+
+    mergecontacts.run([
+        '--input_email=%s' % path_email,
+        '--input_phone=%s' % path_phone,
+        '--input_snailmail=%s' % path_snailmail,
+        '--output_tsv=%s.tsv' % result_prefix,
+        '--output_stats=%s.stats' % result_prefix], assert_results=(2, 1, 3))
+
+    with open('%s.tsv-00000-of-00001' % result_prefix) as f:
+      contents = f.read()
+      self.assertEqual(self.EXPECTED_TSV, self.normalize_tsv_results(contents))
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
new file mode 100644
index 0000000..bbc72bc
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -0,0 +1,171 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+"""A workflow demonstrating a DoFn with multiple outputs.
+
+DoFns may produce a main output and additional side outputs. These side outputs
+are marked with a tag at output time and later the same tag will be used to get
+the corresponding result (a PCollection) for that side output.
+
+This is a slightly modified version of the basic wordcount example. In this
+example words are divided into 2 buckets as shorts words (3 characters in length
+or less) and words (all other words). There will be 3 output files:
+
+  [OUTPUT]-chars        :   Character count for the input.
+  [OUTPUT]-short-words  :   Word count for short words only.
+  [OUTPUT]-words        :   Word count for all other words.
+
+To execute this pipeline locally, specify a local output file or output prefix
+on GCS:
+  --output [YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
+
+To execute this pipeline using the Google Cloud Dataflow service, specify
+pipeline configuration:
+  --project YOUR_PROJECT_ID
+  --staging_location gs://YOUR_STAGING_DIRECTORY
+  --temp_location gs://YOUR_TEMP_DIRECTORY
+  --job_name YOUR_JOB_NAME
+  --runner BlockingDataflowPipelineRunner
+
+and an output prefix on GCS:
+  --output gs://YOUR_OUTPUT_PREFIX
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import re
+
+import google.cloud.dataflow as df
+from google.cloud.dataflow import pvalue
+
+
+class SplitLinesToWordsFn(df.DoFn):
+  """A transform to split a line of text into individual words.
+
+  This transform will have 3 outputs:
+    - main output: all words that are longer than 3 characters.
+    - short words side output: all other words.
+    - character count side output: Number of characters in each processed line.
+  """
+
+  # These tags will be used to tag the side outputs of this DoFn.
+  SIDE_OUTPUT_TAG_SHORT_WORDS = 'tag_short_words'
+  SIDE_OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count'
+
+  def process(self, context):
+    """Receives a single element (a line) and produces words and side outputs.
+
+    Important things to note here:
+      - For a single element you may produce multiple main outputs:
+        words of a single line.
+      - For that same input you may produce multiple side outputs, along with
+        multiple main outputs.
+      - Side outputs may have different types (count) or may share the same type
+        (words) as with the main output.
+
+    Args:
+      context: processing context.
+
+    Yields:
+      words as main output, short words as side output, line character count as
+      side output.
+    """
+    # yield a count (integer) to the SIDE_OUTPUT_TAG_CHARACTER_COUNT tagged
+    # collection.
+    yield pvalue.SideOutputValue(self.SIDE_OUTPUT_TAG_CHARACTER_COUNT,
+                                 len(context.element))
+
+    words = re.findall(r'[A-Za-z\']+', context.element)
+    for word in words:
+      if len(word) <= 3:
+        # yield word as a side output to the SIDE_OUTPUT_TAG_SHORT_WORDS tagged
+        # collection.
+        yield pvalue.SideOutputValue(self.SIDE_OUTPUT_TAG_SHORT_WORDS, word)
+      else:
+        # yield word to add it to the main collection.
+        yield word
+
+
+class CountWords(df.PTransform):
+  """A transform to count the occurrences of each word.
+
+  A PTransform that converts a PCollection containing words into a PCollection
+  of "word: count" strings.
+  """
+
+  def apply(self, pcoll):
+    return (pcoll
+            | df.Map('pair_with_one', lambda x: (x, 1))
+            | df.GroupByKey('group')
+            | df.Map('count', lambda (word, ones): (word, sum(ones)))
+            | df.Map('format', lambda (word, c): '%s: %s' % (word, c)))
+
+
+def run(argv=None):
+  """Runs the workflow counting the long words and short words separately."""
+
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--input',
+                      default='gs://dataflow-samples/shakespeare/kinglear.txt',
+                      help='Input file to process.')
+  parser.add_argument('--output',
+                      required=True,
+                      help='Output prefix for files to write results to.')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  p = df.Pipeline(argv=pipeline_args)
+
+  lines = p | df.Read('read', df.io.TextFileSource(known_args.input))
+
+  # with_outputs allows accessing the side outputs of a DoFn.
+  split_lines_result = (lines
+                        | df.ParDo(SplitLinesToWordsFn()).with_outputs(
+                            SplitLinesToWordsFn.SIDE_OUTPUT_TAG_SHORT_WORDS,
+                            SplitLinesToWordsFn.SIDE_OUTPUT_TAG_CHARACTER_COUNT,
+                            main='words'))
+
+  # split_lines_result is an object of type DoOutputsTuple. It supports
+  # accessing result in alternative ways.
+  words, _, _ = split_lines_result
+  short_words = split_lines_result[
+      SplitLinesToWordsFn.SIDE_OUTPUT_TAG_SHORT_WORDS]
+  character_count = split_lines_result.tag_character_count
+
+  # pylint: disable=expression-not-assigned
+  (character_count
+   | df.Map('pair_with_key', lambda x: ('chars_temp_key', x))
+   | df.GroupByKey()
+   | df.Map('count chars', lambda (_, counts): sum(counts))
+   | df.Write('write chars', df.io.TextFileSink(known_args.output + '-chars')))
+
+  # pylint: disable=expression-not-assigned
+  (short_words
+   | CountWords('count short words')
+   | df.Write('write short words',
+              df.io.TextFileSink(known_args.output + '-short-words')))
+
+  # pylint: disable=expression-not-assigned
+  (words
+   | CountWords('count words')
+   | df.Write('write words', df.io.TextFileSink(known_args.output + '-words')))
+
+  p.run()
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
new file mode 100644
index 0000000..9cbf066
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
@@ -0,0 +1,69 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test for the multiple_output_pardo example."""
+
+import logging
+import re
+import tempfile
+import unittest
+
+from google.cloud.dataflow.examples.cookbook import multiple_output_pardo
+
+
+class MultipleOutputParDo(unittest.TestCase):
+
+  SAMPLE_TEXT = 'A whole new world\nA new fantastic point of view'
+  EXPECTED_SHORT_WORDS = [('A', 2), ('new', 2), ('of', 1)]
+  EXPECTED_WORDS = [
+      ('whole', 1), ('world', 1), ('fantastic', 1), ('point', 1), ('view', 1)]
+
+  def create_temp_file(self, contents):
+    with tempfile.NamedTemporaryFile(delete=False) as f:
+      f.write(contents)
+      return f.name
+
+  def get_wordcount_results(self, temp_path):
+    results = []
+    with open(temp_path) as result_file:
+      for line in result_file:
+        match = re.search(r'([A-Za-z]+): ([0-9]+)', line)
+        if match is not None:
+          results.append((match.group(1), int(match.group(2))))
+    return results
+
+  def test_multiple_output_pardo(self):
+    temp_path = self.create_temp_file(self.SAMPLE_TEXT)
+    result_prefix = temp_path + '.result'
+
+    multiple_output_pardo.run([
+        '--input=%s*' % temp_path,
+        '--output=%s' % result_prefix])
+
+    expected_char_count = len(''.join(self.SAMPLE_TEXT.split('\n')))
+    with open(result_prefix + '-chars-00000-of-00001') as f:
+      contents = f.read()
+      self.assertEqual(expected_char_count, int(contents))
+
+    short_words = self.get_wordcount_results(
+        result_prefix + '-short-words-00000-of-00001')
+    self.assertEqual(sorted(short_words), sorted(self.EXPECTED_SHORT_WORDS))
+
+    words = self.get_wordcount_results(result_prefix + '-words-00000-of-00001')
+    self.assertEqual(sorted(words), sorted(self.EXPECTED_WORDS))
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()


Mime
View raw message