beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [1/2] incubator-beam git commit: Adding display data to sink, sources, and parallel-do operations.
Date Wed, 09 Nov 2016 21:25:42 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk cf026bb55 -> ea642428f


Adding display data to sink, sources, and parallel-do operations.

- PubSub, BigQuery, NativeFile, FileBasedSource
- Write, Read, ParDo transforms
- DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3c1043ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3c1043ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3c1043ae

Branch: refs/heads/python-sdk
Commit: 3c1043aeadf7a8ce08c174b76fa975e0bf78bae8
Parents: cf026bb
Author: Pablo <pabloem@google.com>
Authored: Thu Oct 27 12:07:02 2016 -0700
Committer: Robert Bradshaw <robertwb@google.com>
Committed: Wed Nov 9 11:59:47 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/bigquery.py          |  32 +++++
 sdks/python/apache_beam/io/bigquery_test.py     |  68 ++++++++-
 sdks/python/apache_beam/io/filebasedsource.py   |   6 +
 .../apache_beam/io/filebasedsource_test.py      |  44 ++++--
 sdks/python/apache_beam/io/fileio.py            |  20 +++
 sdks/python/apache_beam/io/fileio_test.py       |  62 ++++++++-
 sdks/python/apache_beam/io/iobase.py            |  12 +-
 sdks/python/apache_beam/io/pubsub.py            |  15 ++
 sdks/python/apache_beam/io/pubsub_test.py       |  62 +++++++++
 .../runners/dataflow/native_io/iobase.py        |   5 +-
 sdks/python/apache_beam/transforms/core.py      |  24 +++-
 sdks/python/apache_beam/transforms/display.py   |  72 +++++++---
 .../apache_beam/transforms/display_test.py      | 138 ++++++++++++++-----
 .../apache_beam/transforms/ptransform_test.py   |  58 ++++++++
 14 files changed, 548 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/io/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py
index 9c1ee27..f0e88a6 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -118,6 +118,7 @@ from apache_beam.internal import auth
 from apache_beam.internal.json_value import from_json_value
 from apache_beam.internal.json_value import to_json_value
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
+from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.utils import retry
 from apache_beam.utils.options import GoogleCloudOptions
 
@@ -342,6 +343,23 @@ class BigQuerySource(dataflow_io.NativeSource):
     self.validate = validate
     self.coder = coder or RowAsDictJsonCoder()
 
+  def display_data(self):
+    if self.query is not None:
+      res = {'query': DisplayDataItem(self.query, label='Query')}
+    else:
+      if self.table_reference.projectId is not None:
+        tableSpec = '{}:{}.{}'.format(self.table_reference.projectId,
+                                      self.table_reference.datasetId,
+                                      self.table_reference.tableId)
+      else:
+        tableSpec = '{}.{}'.format(self.table_reference.datasetId,
+                                   self.table_reference.tableId)
+      res = {'table': DisplayDataItem(tableSpec, label='Table')}
+
+    res['validation'] = DisplayDataItem(self.validate,
+                                        label='Validation Enabled')
+    return res
+
   @property
   def format(self):
     """Source format name required for remote execution."""
@@ -434,6 +452,20 @@ class BigQuerySink(dataflow_io.NativeSink):
     self.validate = validate
     self.coder = coder or RowAsDictJsonCoder()
 
+  def display_data(self):
+    res = {}
+    if self.table_reference is not None:
+      tableSpec = '{}.{}'.format(self.table_reference.datasetId,
+                                 self.table_reference.tableId)
+      if self.table_reference.projectId is not None:
+        tableSpec = '{}:{}'.format(self.table_reference.projectId,
+                                   tableSpec)
+      res['table'] = DisplayDataItem(tableSpec, label='Table')
+
+    res['validation'] = DisplayDataItem(self.validate,
+                                        label="Validation Enabled")
+    return res
+
   def schema_as_json(self):
     """Returns the TableSchema associated with the sink as a JSON string."""
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/io/bigquery_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery_test.py b/sdks/python/apache_beam/io/bigquery_test.py
index 86b599c..b0c3bbe 100644
--- a/sdks/python/apache_beam/io/bigquery_test.py
+++ b/sdks/python/apache_beam/io/bigquery_test.py
@@ -24,6 +24,7 @@ import datetime
 import unittest
 
 from apitools.base.py.exceptions import HttpError
+import hamcrest as hc
 import mock
 
 import apache_beam as beam
@@ -31,6 +32,8 @@ from apache_beam.internal.clients import bigquery
 from apache_beam.internal.json_value import to_json_value
 from apache_beam.io.bigquery import RowAsDictJsonCoder
 from apache_beam.io.bigquery import TableRowJsonCoder
+from apache_beam.transforms.display import DisplayData
+from apache_beam.transforms.display_test import DisplayDataItemMatcher
 from apache_beam.utils.options import PipelineOptions
 
 
@@ -112,6 +115,39 @@ class TestTableRowJsonCoder(unittest.TestCase):
 
 class TestBigQuerySource(unittest.TestCase):
 
+  def test_display_data_item_on_validate_true(self):
+    source = beam.io.BigQuerySource('dataset.table', validate=True)
+
+    dd = DisplayData.create_from(source)
+    expected_items = [
+        DisplayDataItemMatcher('validation', True),
+        DisplayDataItemMatcher('table', 'dataset.table')]
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
+  def test_table_reference_display_data(self):
+    source = beam.io.BigQuerySource('dataset.table')
+    dd = DisplayData.create_from(source)
+    expected_items = [
+        DisplayDataItemMatcher('validation', False),
+        DisplayDataItemMatcher('table', 'dataset.table')]
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
+    source = beam.io.BigQuerySource('project:dataset.table')
+    dd = DisplayData.create_from(source)
+    expected_items = [
+        DisplayDataItemMatcher('validation', False),
+        DisplayDataItemMatcher('table', 'project:dataset.table')]
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
+    source = beam.io.BigQuerySource('xyz.com:project:dataset.table')
+    dd = DisplayData.create_from(source)
+    expected_items = [
+        DisplayDataItemMatcher('validation',
+                               False),
+        DisplayDataItemMatcher('table',
+                               'xyz.com:project:dataset.table')]
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
   def test_parse_table_reference(self):
     source = beam.io.BigQuerySource('dataset.table')
     self.assertEqual(source.table_reference.datasetId, 'dataset')
@@ -127,20 +163,40 @@ class TestBigQuerySource(unittest.TestCase):
     self.assertEqual(source.table_reference.datasetId, 'dataset')
     self.assertEqual(source.table_reference.tableId, 'table')
 
-  def test_specify_query_without_table(self):
     source = beam.io.BigQuerySource(query='my_query')
     self.assertEqual(source.query, 'my_query')
     self.assertIsNone(source.table_reference)
     self.assertTrue(source.use_legacy_sql)
 
+  def test_query_only_display_data(self):
+    source = beam.io.BigQuerySource(query='my_query')
+    dd = DisplayData.create_from(source)
+    expected_items = [
+        DisplayDataItemMatcher('validation', False),
+        DisplayDataItemMatcher('query', 'my_query')]
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
   def test_specify_query_sql_format(self):
     source = beam.io.BigQuerySource(query='my_query', use_legacy_sql=False)
     self.assertEqual(source.query, 'my_query')
     self.assertFalse(source.use_legacy_sql)
 
+  def test_specify_query_without_table(self):
+    source = beam.io.BigQuerySource(query='my_query')
+    self.assertEqual(source.query, 'my_query')
+    self.assertIsNone(source.table_reference)
+
 
 class TestBigQuerySink(unittest.TestCase):
 
+  def test_table_spec_display_data(self):
+    sink = beam.io.BigQuerySink('dataset.table')
+    dd = DisplayData.create_from(sink)
+    expected_items = [
+        DisplayDataItemMatcher('table', 'dataset.table'),
+        DisplayDataItemMatcher('validation', False)]
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
   def test_parse_schema_descriptor(self):
     sink = beam.io.BigQuerySink(
         'dataset.table', schema='s:STRING, n:INTEGER')
@@ -150,9 +206,17 @@ class TestBigQuerySink(unittest.TestCase):
         field.name: field.type for field in sink.table_schema.fields}
     self.assertEqual({'n': 'INTEGER', 's': 'STRING'}, result_schema)
 
+  def test_project_table_display_data(self):
+    sinkq = beam.io.BigQuerySink('PROJECT:dataset.table')
+    dd = DisplayData.create_from(sinkq)
+    expected_items = [
+        DisplayDataItemMatcher('table', 'PROJECT:dataset.table'),
+        DisplayDataItemMatcher('validation', False)]
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
   def test_simple_schema_as_json(self):
     sink = beam.io.BigQuerySink(
-        'dataset.table', schema='s:STRING, n:INTEGER')
+        'PROJECT:dataset.table', schema='s:STRING, n:INTEGER')
     self.assertEqual(
         json.dumps({'fields': [
             {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'},

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index d9b907c..58ad118 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -35,6 +35,7 @@ from apache_beam.io import concat_source
 from apache_beam.io import fileio
 from apache_beam.io import iobase
 from apache_beam.io import range_trackers
+from apache_beam.transforms.display import DisplayDataItem
 
 MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 25
 
@@ -91,6 +92,11 @@ class FileBasedSource(iobase.BoundedSource):
       # We can't split compressed files efficiently so turn off splitting.
       self._splittable = False
 
+  def display_data(self):
+    return {'filePattern': DisplayDataItem(self._pattern, label="File Pattern"),
+            'compression': DisplayDataItem(str(self._compression_type),
+                                           label='Compression Type')}
+
   def _get_concat_source(self):
     if self._concat_source is None:
       single_file_sources = []

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index d07c1df..7bc31fd 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -25,6 +25,8 @@ import os
 import tempfile
 import unittest
 
+import hamcrest as hc
+
 import apache_beam as beam
 from apache_beam.io import filebasedsource
 from apache_beam.io import fileio
@@ -36,6 +38,8 @@ from apache_beam.io.concat_source import ConcatSource
 from apache_beam.io.filebasedsource import _SingleFileSource as SingleFileSource
 
 from apache_beam.io.filebasedsource import FileBasedSource
+from apache_beam.transforms.display import DisplayData
+from apache_beam.transforms.display_test import DisplayDataItemMatcher
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
 
@@ -224,6 +228,16 @@ class TestFileBasedSource(unittest.TestCase):
     read_data = [record for record in fbs.read(range_tracker)]
     self.assertItemsEqual(expected_data, read_data)
 
+  def test_single_file_display_data(self):
+    file_name, _ = write_data(10)
+    fbs = LineSource(file_name)
+    dd = DisplayData.create_from(fbs)
+    expected_items = [
+        DisplayDataItemMatcher('filePattern', file_name),
+        DisplayDataItemMatcher('compression', 'auto')]
+    hc.assert_that(dd.items,
+                   hc.contains_inanyorder(*expected_items))
+
   def test_fully_read_file_pattern(self):
     pattern, expected_data = write_pattern([5, 3, 12, 8, 8, 4])
     assert len(expected_data) == 40
@@ -510,8 +524,8 @@ class TestSingleFileSource(unittest.TestCase):
   def test_source_creation_fails_for_non_number_offsets(self):
     start_not_a_number_error = 'start_offset must be a number*'
     stop_not_a_number_error = 'stop_offset must be a number*'
-
-    fbs = LineSource('dymmy_pattern')
+    file_name = 'dummy_pattern'
+    fbs = LineSource(file_name)
 
     with self.assertRaisesRegexp(TypeError, start_not_a_number_error):
       SingleFileSource(
@@ -529,11 +543,21 @@ class TestSingleFileSource(unittest.TestCase):
       SingleFileSource(
           fbs, file_name='dummy_file', start_offset=None, stop_offset=100)
 
+  def test_source_creation_display_data(self):
+    file_name = 'dummy_pattern'
+    fbs = LineSource(file_name)
+    dd = DisplayData.create_from(fbs)
+    expected_items = [
+        DisplayDataItemMatcher('compression', 'auto'),
+        DisplayDataItemMatcher('filePattern', file_name)]
+    hc.assert_that(dd.items,
+                   hc.contains_inanyorder(*expected_items))
+
   def test_source_creation_fails_if_start_lg_stop(self):
     start_larger_than_stop_error = (
         'start_offset must be smaller than stop_offset*')
 
-    fbs = LineSource('dymmy_pattern')
+    fbs = LineSource('dummy_pattern')
     SingleFileSource(
         fbs, file_name='dummy_file', start_offset=99, stop_offset=100)
     with self.assertRaisesRegexp(ValueError, start_larger_than_stop_error):
@@ -544,7 +568,7 @@ class TestSingleFileSource(unittest.TestCase):
           fbs, file_name='dummy_file', start_offset=100, stop_offset=100)
 
   def test_estimates_size(self):
-    fbs = LineSource('dymmy_pattern')
+    fbs = LineSource('dummy_pattern')
 
     # Should simply return stop_offset - start_offset
     source = SingleFileSource(
@@ -556,7 +580,7 @@ class TestSingleFileSource(unittest.TestCase):
     self.assertEquals(90, source.estimate_size())
 
   def test_read_range_at_beginning(self):
-    fbs = LineSource('dymmy_pattern')
+    fbs = LineSource('dummy_pattern')
 
     file_name, expected_data = write_data(10)
     assert len(expected_data) == 10
@@ -567,7 +591,7 @@ class TestSingleFileSource(unittest.TestCase):
     self.assertItemsEqual(expected_data[:4], read_data)
 
   def test_read_range_at_end(self):
-    fbs = LineSource('dymmy_pattern')
+    fbs = LineSource('dummy_pattern')
 
     file_name, expected_data = write_data(10)
     assert len(expected_data) == 10
@@ -578,7 +602,7 @@ class TestSingleFileSource(unittest.TestCase):
     self.assertItemsEqual(expected_data[-3:], read_data)
 
   def test_read_range_at_middle(self):
-    fbs = LineSource('dymmy_pattern')
+    fbs = LineSource('dummy_pattern')
 
     file_name, expected_data = write_data(10)
     assert len(expected_data) == 10
@@ -589,7 +613,7 @@ class TestSingleFileSource(unittest.TestCase):
     self.assertItemsEqual(expected_data[4:7], read_data)
 
   def test_produces_splits_desiredsize_large_than_size(self):
-    fbs = LineSource('dymmy_pattern')
+    fbs = LineSource('dummy_pattern')
 
     file_name, expected_data = write_data(10)
     assert len(expected_data) == 10
@@ -605,7 +629,7 @@ class TestSingleFileSource(unittest.TestCase):
     self.assertItemsEqual(expected_data, read_data)
 
   def test_produces_splits_desiredsize_smaller_than_size(self):
-    fbs = LineSource('dymmy_pattern')
+    fbs = LineSource('dummy_pattern')
 
     file_name, expected_data = write_data(10)
     assert len(expected_data) == 10
@@ -623,7 +647,7 @@ class TestSingleFileSource(unittest.TestCase):
     self.assertItemsEqual(expected_data, read_data)
 
   def test_produce_split_with_start_and_end_positions(self):
-    fbs = LineSource('dymmy_pattern')
+    fbs = LineSource('dummy_pattern')
 
     file_name, expected_data = write_data(10)
     assert len(expected_data) == 10

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index f74ac9c..669bfc9 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -34,6 +34,8 @@ from apache_beam import coders
 from apache_beam.io import iobase
 from apache_beam.io import range_trackers
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
+from apache_beam.transforms.display import DisplayDataItem
+
 
 __all__ = ['TextFileSource', 'TextFileSink']
 
@@ -56,6 +58,9 @@ class _CompressionType(object):
   def __ne__(self, other):
     return not self.__eq__(other)
 
+  def __str__(self):
+    return self.identifier
+
   def __repr__(self):
     return '_CompressionType(%s)' % self.identifier
 
@@ -161,6 +166,10 @@ class NativeFileSource(dataflow_io.NativeSource):
     self.coder = coder
     self.mime_type = mime_type
 
+  def display_data(self):
+    return {'filePattern': DisplayDataItem(self.file_path,
+                                           label="File Pattern")}
+
   def __eq__(self, other):
     return (self.file_path == other.file_path and
             self.start_offset == other.start_offset and
@@ -1005,6 +1014,17 @@ class NativeFileSink(dataflow_io.NativeSink):
     self.mime_type = mime_type
     self.compression_type = compression_type
 
+  def display_data(self):
+    file_name_pattern = '{}{}{}'.format(self.file_name_prefix,
+                                        self.shard_name_template,
+                                        self.file_name_suffix)
+    return {'filePattern':
+            DisplayDataItem(file_name_pattern,
+                            label='File Name Pattern'),
+            'compression':
+            DisplayDataItem(str(self.compression_type),
+                            label='Compression Type')}
+
   @property
   def path(self):
     return self.file_path

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 15daf04..7e6e60b 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -27,10 +27,14 @@ import tempfile
 import unittest
 import zlib
 
+import hamcrest as hc
+
 import apache_beam as beam
 from apache_beam import coders
 from apache_beam.io import fileio
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
+from apache_beam.transforms.display import DisplayData
+from apache_beam.transforms.display_test import DisplayDataItemMatcher
 
 # TODO: Add tests for file patterns (ie not just individual files) for both
 # uncompressed
@@ -52,8 +56,9 @@ class TestTextFileSource(unittest.TestCase):
                         output_lines,
                         start_offset=None,
                         end_offset=None):
+    file_name = self.create_temp_file('\n'.join(input_lines))
     source = fileio.TextFileSource(
-        file_path=self.create_temp_file('\n'.join(input_lines)),
+        file_path=file_name,
         start_offset=start_offset,
         end_offset=end_offset)
     read_lines = []
@@ -61,6 +66,11 @@ class TestTextFileSource(unittest.TestCase):
       for line in reader:
         read_lines.append(line)
     self.assertEqual(read_lines, output_lines)
+    dd = DisplayData.create_from(source)
+    expected_items = [
+        DisplayDataItemMatcher('filePattern', file_name)]
+    hc.assert_that(dd.items,
+                   hc.contains_inanyorder(*expected_items))
 
   def progress_with_offsets(self,
                             input_lines,
@@ -592,6 +602,30 @@ class TestNativeTextFileSink(unittest.TestCase):
     with open(self.path, 'r') as f:
       self.assertEqual(f.read().splitlines(), self.lines)
 
+  def test_text_file_display_data(self):
+    sink = fileio.NativeTextFileSink(self.path)
+    dd = DisplayData.create_from(sink)
+    expected_items = [
+        DisplayDataItemMatcher(
+            'filePattern',
+            '{}{}'.format(self.path, '-SSSSS-of-NNNNN')),
+        DisplayDataItemMatcher(
+            'compression',
+            'auto')]
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
+  def test_text_file_display_data_suffix(self):
+    sink = fileio.NativeTextFileSink(self.path, file_name_suffix='.pdf')
+    dd = DisplayData.create_from(sink)
+    expected_items = [
+        DisplayDataItemMatcher(
+            'filePattern',
+            '{}{}{}'.format(self.path, '-SSSSS-of-NNNNN', '.pdf')),
+        DisplayDataItemMatcher(
+            'compression',
+            'auto')]
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
   def test_write_text_file_empty(self):
     sink = fileio.NativeTextFileSink(self.path)
     self._write_lines(sink, [])
@@ -607,6 +641,19 @@ class TestNativeTextFileSink(unittest.TestCase):
     with gzip.GzipFile(self.path, 'r') as f:
       self.assertEqual(f.read().splitlines(), self.lines)
 
+  def test_display_data_gzip_file(self):
+    sink = fileio.NativeTextFileSink(
+        self.path, compression_type=fileio.CompressionTypes.GZIP)
+    dd = DisplayData.create_from(sink)
+    expected_items = [
+        DisplayDataItemMatcher(
+            'filePattern',
+            '{}{}'.format(self.path, '-SSSSS-of-NNNNN')),
+        DisplayDataItemMatcher(
+            'compression',
+            'gzip')]
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
   def test_write_text_gzip_file_auto(self):
     self.path = tempfile.NamedTemporaryFile(suffix='.gz').name
     sink = fileio.NativeTextFileSink(self.path)
@@ -631,6 +678,19 @@ class TestNativeTextFileSink(unittest.TestCase):
     with bz2.BZ2File(self.path, 'r') as f:
       self.assertEqual(f.read().splitlines(), self.lines)
 
+  def test_display_data_bzip2_file(self):
+    sink = fileio.NativeTextFileSink(
+        self.path, compression_type=fileio.CompressionTypes.BZIP2)
+    dd = DisplayData.create_from(sink)
+    expected_items = [
+        DisplayDataItemMatcher(
+            'filePattern',
+            '{}{}'.format(self.path, '-SSSSS-of-NNNNN')),
+        DisplayDataItemMatcher(
+            'compression',
+            'bzip2')]
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
   def test_write_text_bzip2_file_auto(self):
     self.path = tempfile.NamedTemporaryFile(suffix='.bz2').name
     sink = fileio.NativeTextFileSink(self.path)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index fd46dd6..a0de131 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -42,6 +42,7 @@ from apache_beam.pvalue import AsSingleton
 from apache_beam.transforms import core
 from apache_beam.transforms import ptransform
 from apache_beam.transforms import window
+from apache_beam.transforms.display import HasDisplayData, DisplayDataItem
 
 
 # Encapsulates information about a bundle of a source generated when method
@@ -65,7 +66,7 @@ SourceBundle = namedtuple(
     'weight source start_position stop_position')
 
 
-class BoundedSource(object):
+class BoundedSource(HasDisplayData):
   """A source that reads a finite amount of input records.
 
   This class defines following operations which can be used to read the source
@@ -670,6 +671,11 @@ class Read(ptransform.PTransform):
     else:
       return self.source.coder
 
+  def display_data(self):
+    return {'source': DisplayDataItem(self.source.__class__,
+                                      label='Read Source'),
+            'source_dd': self.source}
+
 
 class Write(ptransform.PTransform):
   """A ``PTransform`` that writes to a sink.
@@ -712,6 +718,10 @@ class Write(ptransform.PTransform):
     super(Write, self).__init__(label)
     self.sink = sink
 
+  def display_data(self):
+    return {'sink': self.sink.__class__,
+            'sink_dd': self.sink}
+
   def apply(self, pcoll):
     from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
     if isinstance(self.sink, dataflow_io.NativeSink):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/io/pubsub.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/pubsub.py b/sdks/python/apache_beam/io/pubsub.py
index da81742..40bd368 100644
--- a/sdks/python/apache_beam/io/pubsub.py
+++ b/sdks/python/apache_beam/io/pubsub.py
@@ -24,6 +24,7 @@ from __future__ import absolute_import
 
 from apache_beam import coders
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
+from apache_beam.transforms.display import DisplayDataItem
 
 
 class PubSubSource(dataflow_io.NativeSource):
@@ -54,6 +55,17 @@ class PubSubSource(dataflow_io.NativeSource):
     """Source format name required for remote execution."""
     return 'pubsub'
 
+  def display_data(self):
+    return {'idLabel':
+            DisplayDataItem(self.id_label,
+                            label='ID Label Attribute').drop_if_none(),
+            'topic':
+            DisplayDataItem(self.topic,
+                            label='Pubsub Topic'),
+            'subscription':
+            DisplayDataItem(self.subscription,
+                            label='Pubsub Subscription').drop_if_none()}
+
   def reader(self):
     raise NotImplementedError(
         'PubSubSource is not supported in local execution.')
@@ -71,6 +83,9 @@ class PubSubSink(dataflow_io.NativeSink):
     """Sink format name required for remote execution."""
     return 'pubsub'
 
+  def display_data(self):
+    return {'topic': DisplayDataItem(self.topic, label='Pubsub Topic')}
+
   def writer(self):
     raise NotImplementedError(
         'PubSubSink is not supported in local execution.')

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/io/pubsub_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/pubsub_test.py b/sdks/python/apache_beam/io/pubsub_test.py
new file mode 100644
index 0000000..828d233
--- /dev/null
+++ b/sdks/python/apache_beam/io/pubsub_test.py
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for PubSub sources and sinks."""
+
+import logging
+import unittest
+
+import hamcrest as hc
+
+from apache_beam.io.pubsub import PubSubSource, PubSubSink
+from apache_beam.transforms.display import DisplayData
+from apache_beam.transforms.display_test import DisplayDataItemMatcher
+
+
+class TestPubSubSource(unittest.TestCase):
+
+  def test_display_data(self):
+    source = PubSubSource('a_topic', 'a_subscription', 'a_label')
+    dd = DisplayData.create_from(source)
+    expected_items = [
+        DisplayDataItemMatcher('topic', 'a_topic'),
+        DisplayDataItemMatcher('subscription', 'a_subscription'),
+        DisplayDataItemMatcher('idLabel', 'a_label')]
+
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
+  def test_display_data_no_subscription(self):
+    source = PubSubSource('a_topic')
+    dd = DisplayData.create_from(source)
+    expected_items = [
+        DisplayDataItemMatcher('topic', 'a_topic')]
+
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
+
+class TestPubSubSink(unittest.TestCase):
+  def test_display_data(self):
+    sink = PubSubSink('a_topic')
+    dd = DisplayData.create_from(sink)
+    expected_items = [
+        DisplayDataItemMatcher('topic', 'a_topic')]
+
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
index 9621f4c..32da3a2 100644
--- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
+++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
@@ -22,6 +22,7 @@ import logging
 
 from apache_beam import pvalue
 from apache_beam.transforms import ptransform
+from apache_beam.transforms.display import HasDisplayData
 
 
 def _dict_printable_fields(dict_object, skip_fields):
@@ -38,7 +39,7 @@ _minor_fields = ['coder', 'key_coder', 'value_coder',
                  'compression_type']
 
 
-class NativeSource(object):
+class NativeSource(HasDisplayData):
   """A source implemented by Dataflow service.
 
   This class is to be only inherited by sources natively implemented by Cloud
@@ -244,7 +245,7 @@ class DynamicSplitResultWithPosition(DynamicSplitResult):
     self.stop_position = stop_position
 
 
-class NativeSink(object):
+class NativeSink(HasDisplayData):
   """A sink implemented by Dataflow service.
 
   This class is to be only inherited by sinks natively implemented by Cloud

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 3b5816a..3189de7 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -29,7 +29,7 @@ from apache_beam.coders import typecoders
 from apache_beam.internal import util
 from apache_beam.transforms import ptransform
 from apache_beam.transforms import window
-from apache_beam.transforms.display import HasDisplayData
+from apache_beam.transforms.display import HasDisplayData, DisplayDataItem
 from apache_beam.transforms.ptransform import PTransform
 from apache_beam.transforms.ptransform import PTransformWithSideInputs
 from apache_beam.transforms.window import MIN_TIMESTAMP
@@ -235,6 +235,16 @@ class CallableWrapperDoFn(DoFn):
 
     super(CallableWrapperDoFn, self).__init__()
 
+  def display_data(self):
+    # If the callable has a name, then it's likely a function, and
+    # we show its name.
+    # Otherwise, it might be an instance of a callable class. We
+    # show its class.
+    display_data_value = (self._fn.__name__ if hasattr(self._fn, '__name__')
+                          else self._fn.__class__)
+    return {'fn': DisplayDataItem(display_data_value,
+                                  label='Transform Function')}
+
   def __repr__(self):
     return 'CallableWrapperDoFn(%s)' % self._fn
 
@@ -580,6 +590,11 @@ class ParDo(PTransformWithSideInputs):
   def process_argspec_fn(self):
     return self.fn.process_argspec_fn()
 
+  def display_data(self):
+    return {'fn': DisplayDataItem(self.fn.__class__,
+                                  label='Transform Function'),
+            'fn_dd': self.fn}
+
   def apply(self, pcoll):
     self.side_output_tags = set()
     # TODO(robertwb): Change all uses of the dofn attribute to use fn instead.
@@ -696,6 +711,10 @@ def Map(fn_or_label, *args, **kwargs):  # pylint: disable=invalid-name
   else:
     wrapper = lambda x: [fn(x)]
 
+  # TODO. What about callable classes?
+  if hasattr(fn, '__name__'):
+    wrapper.__name__ = fn.__name__
+
   # Proxy the type-hint information from the original function to this new
   # wrapped function.
   get_type_hints(wrapper).input_types = get_type_hints(fn).input_types
@@ -739,6 +758,9 @@ def Filter(fn_or_label, *args, **kwargs):  # pylint: disable=invalid-name
         % (fn, 'first' if label is None else 'second'))
   wrapper = lambda x, *args, **kwargs: [x] if fn(x, *args, **kwargs) else []
 
+  # TODO: What about callable classes?
+  if hasattr(fn, '__name__'):
+    wrapper.__name__ = fn.__name__
   # Proxy the type-hint information from the function being wrapped, setting the
   # output type to be the same as the input type.
   get_type_hints(wrapper).input_types = get_type_hints(fn).input_types

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/transforms/display.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py
index e93d560..365abaf 100644
--- a/sdks/python/apache_beam/transforms/display.py
+++ b/sdks/python/apache_beam/transforms/display.py
@@ -93,6 +93,8 @@ class DisplayData(object):
         continue
 
       if isinstance(element, DisplayDataItem):
+        if element.should_drop():
+          continue
         element.key = key
         element.namespace = self.namespace
         self.items.append(element)
@@ -132,6 +134,7 @@ class DisplayDataItem(object):
   typeDict = {str:'STRING',
               int:'INTEGER',
               float:'FLOAT',
+              bool: 'BOOLEAN',
               timedelta:'DURATION',
               datetime:'TIMESTAMP'}
 
@@ -145,6 +148,40 @@ class DisplayDataItem(object):
     self.value = value
     self.url = url
     self.label = label
+    self._drop_if_none = False
+    self._drop_if_default = False
+
+  def drop_if_none(self):
+    """ The item should be dropped if its value is None.
+
+    Returns:
+      Returns self.
+    """
+    self._drop_if_none = True
+    return self
+
+  def drop_if_default(self, default):
+    """ The item should be dropped if its value is equal to its default.
+
+    Returns:
+      Returns self.
+    """
+    self._default = default
+    self._drop_if_default = True
+    return self
+
+  def should_drop(self):
+    """ Return True if the item should be dropped, or False if it should not
+    be dropped. This depends on the drop_if_none, and drop_if_default calls.
+
+    Returns:
+      True or False; depending on whether the item should be dropped or kept.
+    """
+    if self._drop_if_none and self.value is None:
+      return True
+    if self._drop_if_default and self.value == self._default:
+      return True
+    return False
 
   def is_valid(self):
     """ Checks that all the necessary fields of the DisplayDataItem are
@@ -164,24 +201,12 @@ class DisplayDataItem(object):
           'Invalid DisplayDataItem. Value {} is of an unsupported type.'
           .format(self.value))
 
-  def get_dict(self):
-    """ Returns the internal-API dictionary representing the DisplayDataItem.
-
-    Returns:
-      A dictionary. The internal-API dictionary representing the
-      DisplayDataItem
-
-    Raises:
-     ValueError: if the item is not valid.
-    """
-    self.is_valid()
-
+  def _get_dict(self):
     res = {'key': self.key,
            'namespace': self.namespace,
            'type': self.type if self.type != 'CLASS' else 'STRING'}
     # TODO: Python Class types should not be special-cased once
     # the Fn API is in.
-
     if self.url is not None:
       res['url'] = self.url
     if self.shortValue is not None:
@@ -191,19 +216,32 @@ class DisplayDataItem(object):
     res['value'] = self._format_value(self.value, self.type)
     return res
 
+  def get_dict(self):
+    """ Returns the internal-API dictionary representing the DisplayDataItem.
+
+    Returns:
+      A dictionary. The internal-API dictionary representing the
+      DisplayDataItem
+
+    Raises:
+      ValueError: if the item is not valid.
+    """
+    self.is_valid()
+    return self._get_dict()
+
   def __repr__(self):
-    return 'DisplayDataItem({})'.format(json.dumps(self.get_dict()))
+    return 'DisplayDataItem({})'.format(json.dumps(self._get_dict()))
 
   def __eq__(self, other):
     if isinstance(other, self.__class__):
-      return self.get_dict() == other.get_dict()
+      return self._get_dict() == other._get_dict()
     return False
 
   def __ne__(self, other):
     return not self == other
 
   def __hash__(self):
-    return hash(tuple(sorted(self.get_dict().items())))
+    return hash(tuple(sorted(self._get_dict().items())))
 
   @classmethod
   def _format_value(cls, value, type_):
@@ -259,4 +297,6 @@ class DisplayDataItem(object):
     type_ = cls.typeDict.get(type(value))
     if type_ is None:
       type_ = 'CLASS' if inspect.isclass(value) else None
+    if type_ is None and value is None:
+      type_ = 'STRING'
     return type_

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/transforms/display_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display_test.py b/sdks/python/apache_beam/transforms/display_test.py
index 87d0920..d514065 100644
--- a/sdks/python/apache_beam/transforms/display_test.py
+++ b/sdks/python/apache_beam/transforms/display_test.py
@@ -22,14 +22,72 @@ from __future__ import absolute_import
 from datetime import datetime
 import unittest
 
+import hamcrest as hc
+from hamcrest.core.base_matcher import BaseMatcher
+
 import apache_beam as beam
 from apache_beam.transforms.display import HasDisplayData
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display import DisplayDataItem
 
 
+class DisplayDataItemMatcher(BaseMatcher):
+  """ Matcher class for DisplayDataItems in unit tests.
+  """
+  IGNORED = object()
+
+  def __init__(self, key=IGNORED, value=IGNORED,
+               namespace=IGNORED, label=IGNORED, shortValue=IGNORED):
+    if all(member == DisplayDataItemMatcher.IGNORED for member in
+           [key, value, namespace, label, shortValue]):
+      raise ValueError('Must receive at least one item attribute to match')
+
+    self.key = key
+    self.value = value
+    self.namespace = namespace
+    self.label = label
+    self.shortValue = shortValue
+
+  def _matches(self, item):
+    if self.key != DisplayDataItemMatcher.IGNORED and item.key != self.key:
+      return False
+    if (self.namespace != DisplayDataItemMatcher.IGNORED and
+        item.namespace != self.namespace):
+      return False
+    if (self.value != DisplayDataItemMatcher.IGNORED and
+        item.value != self.value):
+      return False
+    if (self.label != DisplayDataItemMatcher.IGNORED and
+        item.label != self.label):
+      return False
+    if (self.shortValue != DisplayDataItemMatcher.IGNORED and
+        item.shortValue != self.shortValue):
+      return False
+    return True
+
+  def describe_to(self, description):
+    descriptors = []
+    if self.key != DisplayDataItemMatcher.IGNORED:
+      descriptors.append('key is {}'.format(self.key))
+    if self.value != DisplayDataItemMatcher.IGNORED:
+      descriptors.append('value is {}'.format(self.value))
+    if self.namespace != DisplayDataItemMatcher.IGNORED:
+      descriptors.append('namespace is {}'.format(self.namespace))
+    if self.label != DisplayDataItemMatcher.IGNORED:
+      descriptors.append('label is {}'.format(self.label))
+    if self.shortValue != DisplayDataItemMatcher.IGNORED:
+      descriptors.append('shortValue is {}'.format(self.shortValue))
+
+    item_description = '{}'.format(' and '.join(descriptors))
+    description.append(item_description)
+
+
 class DisplayDataTest(unittest.TestCase):
 
+  def test_display_data_item_matcher(self):
+    with self.assertRaises(ValueError):
+      DisplayDataItemMatcher()
+
   def test_inheritance_ptransform(self):
     class MyTransform(beam.PTransform):
       pass
@@ -70,52 +128,58 @@ class DisplayDataTest(unittest.TestCase):
     now = datetime.now()
     fn = MyDoFn(my_display_data=now)
     dd = DisplayData.create_from(fn)
-
     nspace = '{}.{}'.format(fn.__module__, fn.__class__.__name__)
-    expected_items = set([
-        DisplayDataItem(namespace=nspace,
-                        key='complex_url',
-                        value='github.com',
-                        label='The URL',
-                        url='http://github.com'),
-        DisplayDataItem(namespace=nspace,
-                        key='my_dd',
-                        value=now),
-        DisplayDataItem(namespace=nspace,
-                        key='python_class',
-                        shortValue='HasDisplayData',
-                        value='apache_beam.transforms.display.HasDisplayData'),
-        DisplayDataItem(namespace=nspace,
-                        key='static_integer',
-                        value=120),
-        DisplayDataItem(namespace=nspace,
-                        key='static_string',
-                        value='static me!'),
-    ])
-
-    self.assertEqual(set(dd.items), expected_items)
-
-  def test_subcomponent(self):
-    class SpecialParDo(beam.PTransform):
-      def __init__(self, fn):
-        self.fn = fn
-
+    expected_items = [
+        DisplayDataItemMatcher(key='complex_url',
+                               value='github.com',
+                               namespace=nspace,
+                               label='The URL'),
+        DisplayDataItemMatcher(key='my_dd',
+                               value=now,
+                               namespace=nspace),
+        DisplayDataItemMatcher(key='python_class',
+                               value=HasDisplayData,
+                               namespace=nspace,
+                               shortValue='HasDisplayData'),
+        DisplayDataItemMatcher(key='static_integer',
+                               value=120,
+                               namespace=nspace),
+        DisplayDataItemMatcher(key='static_string',
+                               value='static me!',
+                               namespace=nspace)]
+
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
+  def test_drop_if_none(self):
+    class MyDoFn(beam.DoFn):
       def display_data(self):
-        return {'asubcomponent': self.fn}
+        return {'some_val': DisplayDataItem('something').drop_if_none(),
+                'non_val': DisplayDataItem(None).drop_if_none(),
+                'def_val': DisplayDataItem(True).drop_if_default(True),
+                'nodef_val': DisplayDataItem(True).drop_if_default(False)}
+
+    dd = DisplayData.create_from(MyDoFn())
+    expected_items = [DisplayDataItemMatcher('some_val',
+                                             'something'),
+                      DisplayDataItemMatcher('nodef_val',
+                                             True)]
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
+  def test_subcomponent(self):
     class SpecialDoFn(beam.DoFn):
       def display_data(self):
         return {'dofn_value': 42}
 
     dofn = SpecialDoFn()
-    pardo = SpecialParDo(dofn)
+    pardo = beam.ParDo(dofn)
     dd = DisplayData.create_from(pardo)
-    nspace = '{}.{}'.format(dofn.__module__, dofn.__class__.__name__)
-    self.assertEqual(dd.items[0].get_dict(),
-                     {"type": "INTEGER",
-                      "namespace": nspace,
-                      "value": 42,
-                      "key": "dofn_value"})
+    dofn_nspace = '{}.{}'.format(dofn.__module__, dofn.__class__.__name__)
+    pardo_nspace = '{}.{}'.format(pardo.__module__, pardo.__class__.__name__)
+    expected_items = [
+        DisplayDataItemMatcher('dofn_value', 42, dofn_nspace),
+        DisplayDataItemMatcher('fn', SpecialDoFn, pardo_nspace)]
+
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
 
 # TODO: Test __repr__ function

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index b99cd26..e3b1026 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -23,11 +23,13 @@ import operator
 import re
 import unittest
 
+import hamcrest as hc
 
 import apache_beam as beam
 from apache_beam.pipeline import Pipeline
 import apache_beam.pvalue as pvalue
 import apache_beam.transforms.combiners as combine
+from apache_beam.transforms.display import DisplayData, DisplayDataItem
 from apache_beam.transforms.ptransform import PTransform
 from apache_beam.transforms.util import assert_that, equal_to
 import apache_beam.typehints as typehints
@@ -663,6 +665,62 @@ class PTransformLabelsTest(unittest.TestCase):
     self.check_label(beam.ParDo(MyDoFn()), r'ParDo(MyDoFn)')
 
 
+class PTransformTestDisplayData(unittest.TestCase):
+  def test_map_named_function(self):
+    tr = beam.Map(len)
+    dd = DisplayData.create_from(tr)
+    nspace = 'apache_beam.transforms.core.CallableWrapperDoFn'
+    expected_item = DisplayDataItem('len', key='fn',
+                                    label='Transform Function',
+                                    namespace=nspace)
+    hc.assert_that(dd.items, hc.has_item(expected_item))
+
+  def test_map_anonymous_function(self):
+    tr = beam.Map(lambda x: x)
+    dd = DisplayData.create_from(tr)
+    nspace = 'apache_beam.transforms.core.CallableWrapperDoFn'
+    expected_item = DisplayDataItem('<lambda>', key='fn',
+                                    label='Transform Function',
+                                    namespace=nspace)
+    hc.assert_that(dd.items, hc.has_item(expected_item))
+
+  def test_flatmap_named_function(self):
+    tr = beam.FlatMap(list)
+    dd = DisplayData.create_from(tr)
+    nspace = 'apache_beam.transforms.core.CallableWrapperDoFn'
+    expected_item = DisplayDataItem('list', key='fn',
+                                    label='Transform Function',
+                                    namespace=nspace)
+    hc.assert_that(dd.items, hc.has_item(expected_item))
+
+  def test_flatmap_anonymous_function(self):
+    tr = beam.FlatMap(lambda x: [x])
+    dd = DisplayData.create_from(tr)
+    nspace = 'apache_beam.transforms.core.CallableWrapperDoFn'
+    expected_item = DisplayDataItem('<lambda>', key='fn',
+                                    label='Transform Function',
+                                    namespace=nspace)
+    hc.assert_that(dd.items, hc.has_item(expected_item))
+
+  def test_filter_named_function(self):
+    tr = beam.Filter(sum)
+    dd = DisplayData.create_from(tr)
+    nspace = 'apache_beam.transforms.core.CallableWrapperDoFn'
+    expected_item = DisplayDataItem('sum', key='fn',
+                                    label='Transform Function',
+                                    namespace=nspace)
+    hc.assert_that(dd.items, hc.has_item(expected_item))
+
+  def test_filter_anonymous_function(self):
+    tr = beam.Filter(lambda x: x // 30)
+    dd = DisplayData.create_from(tr)
+    nspace = 'apache_beam.transforms.core.CallableWrapperDoFn'
+    expected_item = DisplayDataItem('<lambda>', key='fn',
+                                    label='Transform Function',
+                                    namespace=nspace)
+    hc.assert_that(dd.items, hc.has_item(expected_item))
+
+
 class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def assertStartswith(self, msg, prefix):



Mime
View raw message