beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [1/2] beam git commit: Move assert_that, equal_to, is_empty to apache_beam.testing.util
Date Thu, 11 May 2017 22:30:37 GMT
Repository: beam
Updated Branches:
  refs/heads/master 4a62fabdb -> 1b31167fb


Move assert_that, equal_to, is_empty to apache_beam.testing.util


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0b7356e3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0b7356e3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0b7356e3

Branch: refs/heads/master
Commit: 0b7356e329f060c3824991cbe3b43bce89042763
Parents: 4a62fab
Author: Charles Chen <ccy@google.com>
Authored: Thu May 11 15:07:30 2017 -0700
Committer: Robert Bradshaw <robertwb@gmail.com>
Committed: Thu May 11 15:27:57 2017 -0700

----------------------------------------------------------------------
 .../examples/complete/autocomplete_test.py      |   4 +-
 .../examples/complete/estimate_pi_test.py       |   4 +-
 .../complete/game/hourly_team_score_test.py     |   4 +-
 .../examples/complete/game/user_score_test.py   |   4 +-
 .../apache_beam/examples/complete/tfidf_test.py |   4 +-
 .../complete/top_wikipedia_sessions_test.py     |   4 +-
 .../cookbook/bigquery_side_input_test.py        |   4 +-
 .../cookbook/bigquery_tornadoes_test.py         |   6 +-
 .../examples/cookbook/coders_test.py            |   4 +-
 .../examples/cookbook/combiners_test.py         |   6 +-
 .../examples/cookbook/custom_ptransform_test.py |   4 +-
 .../examples/cookbook/filters_test.py           |  12 ++-
 .../examples/cookbook/mergecontacts.py          |  14 +--
 .../apache_beam/examples/snippets/snippets.py   |  17 +--
 .../examples/snippets/snippets_test.py          |  30 +++---
 .../apache_beam/examples/wordcount_debugging.py |   6 +-
 sdks/python/apache_beam/io/avroio_test.py       |   4 +-
 .../python/apache_beam/io/concat_source_test.py |   4 +-
 .../apache_beam/io/filebasedsource_test.py      |   4 +-
 sdks/python/apache_beam/io/sources_test.py      |   4 +-
 sdks/python/apache_beam/io/textio_test.py       |   5 +-
 sdks/python/apache_beam/io/tfrecordio_test.py   |  24 +++--
 sdks/python/apache_beam/pipeline_test.py        |   4 +-
 .../portability/maptask_executor_runner_test.py |   6 +-
 sdks/python/apache_beam/runners/runner_test.py  |   4 +-
 sdks/python/apache_beam/testing/util.py         | 107 +++++++++++++++++++
 sdks/python/apache_beam/testing/util_test.py    |  50 +++++++++
 .../apache_beam/transforms/combiners_test.py    |   2 +-
 .../apache_beam/transforms/create_test.py       |   3 +-
 .../apache_beam/transforms/ptransform_test.py   |   2 +-
 .../apache_beam/transforms/sideinputs_test.py   |   2 +-
 .../apache_beam/transforms/trigger_test.py      |   2 +-
 sdks/python/apache_beam/transforms/util.py      |  79 --------------
 sdks/python/apache_beam/transforms/util_test.py |  50 ---------
 .../apache_beam/transforms/window_test.py       |   2 +-
 .../transforms/write_ptransform_test.py         |   2 +-
 .../typehints/typed_pipeline_test.py            |   2 +-
 37 files changed, 271 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index 438633a..378d222 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -22,8 +22,8 @@ import unittest
 import apache_beam as beam
 from apache_beam.examples.complete import autocomplete
 from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class AutocompleteTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
index 12d8379..fd51309 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
@@ -22,8 +22,8 @@ import unittest
 
 from apache_beam.examples.complete import estimate_pi
 from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import BeamAssertException
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import BeamAssertException
 
 
 def in_between(lower, upper):

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
index bd0abca..9c30127 100644
--- a/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
@@ -23,6 +23,8 @@ import unittest
 import apache_beam as beam
 from apache_beam.examples.complete.game import hourly_team_score
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class HourlyTeamScoreTest(unittest.TestCase):
@@ -44,7 +46,7 @@ class HourlyTeamScoreTest(unittest.TestCase):
                     start_min='2015-11-16-15-20',
                     stop_min='2015-11-16-17-20',
                     window_duration=60))
-      beam.assert_that(result, beam.equal_to([
+      assert_that(result, equal_to([
           ('team1', 18), ('team2', 2), ('team3', 13)]))
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/examples/complete/game/user_score_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score_test.py b/sdks/python/apache_beam/examples/complete/game/user_score_test.py
index 2db53bd..59903d9 100644
--- a/sdks/python/apache_beam/examples/complete/game/user_score_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/user_score_test.py
@@ -23,6 +23,8 @@ import unittest
 import apache_beam as beam
 from apache_beam.examples.complete.game import user_score
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class UserScoreTest(unittest.TestCase):
@@ -40,7 +42,7 @@ class UserScoreTest(unittest.TestCase):
     with TestPipeline() as p:
       result = (
           p | beam.Create(UserScoreTest.SAMPLE_DATA) | user_score.UserScore())
-      beam.assert_that(result, beam.equal_to([
+      assert_that(result, equal_to([
           ('user1_team1', 50), ('user2_team2', 2), ('user3_team3', 8),
           ('user4_team3', 5)]))
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index 0e30254..f177dfc 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -26,6 +26,8 @@ import unittest
 import apache_beam as beam
 from apache_beam.examples.complete import tfidf
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 EXPECTED_RESULTS = set([
@@ -57,7 +59,7 @@ class TfIdfTest(unittest.TestCase):
         uri_to_line
         | tfidf.TfIdf()
         | beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf)))
-    beam.assert_that(result, beam.equal_to(EXPECTED_RESULTS))
+    assert_that(result, equal_to(EXPECTED_RESULTS))
     # Run the pipeline. Note that the assert_that above adds to the pipeline
     # a check that the result PCollection contains expected values. To actually
     # trigger the check the pipeline must be run.

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
index 4850c04..5fb6276 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
@@ -24,6 +24,8 @@ import unittest
 import apache_beam as beam
 from apache_beam.examples.complete import top_wikipedia_sessions
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class ComputeTopSessionsTest(unittest.TestCase):
@@ -54,7 +56,7 @@ class ComputeTopSessionsTest(unittest.TestCase):
     edits = p | beam.Create(self.EDITS)
     result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0)
 
-    beam.assert_that(result, beam.equal_to(self.EXPECTED))
+    assert_that(result, equal_to(self.EXPECTED))
     p.run()
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
index 1ca25c9..b11dc47 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -23,6 +23,8 @@ import unittest
 import apache_beam as beam
 from apache_beam.examples.cookbook import bigquery_side_input
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class BigQuerySideInputTest(unittest.TestCase):
@@ -42,7 +44,7 @@ class BigQuerySideInputTest(unittest.TestCase):
                                                words_pcoll, ignore_corpus_pcoll,
                                                ignore_word_pcoll)
 
-    beam.assert_that(groups, beam.equal_to(
+    assert_that(groups, equal_to(
         [('A', 'corpus2', 'word2'),
          ('B', 'corpus2', 'word2'),
          ('C', 'corpus2', 'word2')]))

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
index ca7ca9e..c926df8 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
@@ -23,6 +23,8 @@ import unittest
 import apache_beam as beam
 from apache_beam.examples.cookbook import bigquery_tornadoes
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class BigQueryTornadoesTest(unittest.TestCase):
@@ -35,8 +37,8 @@ class BigQueryTornadoesTest(unittest.TestCase):
         {'month': 1, 'day': 3, 'tornado': True},
         {'month': 2, 'day': 1, 'tornado': True}]))
     results = bigquery_tornadoes.count_tornadoes(rows)
-    beam.assert_that(results, beam.equal_to([{'month': 1, 'tornado_count': 2},
-                                             {'month': 2, 'tornado_count': 1}]))
+    assert_that(results, equal_to([{'month': 1, 'tornado_count': 2},
+                                   {'month': 2, 'tornado_count': 1}]))
     p.run().wait_until_finish()
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/examples/cookbook/coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py b/sdks/python/apache_beam/examples/cookbook/coders_test.py
index 35cf252..f71dad8 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py
@@ -23,8 +23,8 @@ import unittest
 import apache_beam as beam
 from apache_beam.examples.cookbook import coders
 from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class CodersTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/examples/cookbook/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/combiners_test.py b/sdks/python/apache_beam/examples/cookbook/combiners_test.py
index 45c779f..ee1fb77 100644
--- a/sdks/python/apache_beam/examples/cookbook/combiners_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/combiners_test.py
@@ -28,6 +28,8 @@ import unittest
 
 import apache_beam as beam
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class CombinersTest(unittest.TestCase):
@@ -49,7 +51,7 @@ class CombinersTest(unittest.TestCase):
         | beam.Create(CombinersTest.SAMPLE_DATA)
         | beam.CombinePerKey(sum))
 
-    beam.assert_that(result, beam.equal_to([('a', 6), ('b', 30), ('c', 100)]))
+    assert_that(result, equal_to([('a', 6), ('b', 30), ('c', 100)]))
     result.pipeline.run()
 
   def test_combine_per_key_with_custom_callable(self):
@@ -65,7 +67,7 @@ class CombinersTest(unittest.TestCase):
         | beam.Create(CombinersTest.SAMPLE_DATA)
         | beam.CombinePerKey(multiply))
 
-    beam.assert_that(result, beam.equal_to([('a', 6), ('b', 200), ('c', 100)]))
+    assert_that(result, equal_to([('a', 6), ('b', 200), ('c', 100)]))
     result.pipeline.run()
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
index 2d35d8d..c7c6dba 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
@@ -23,8 +23,8 @@ import unittest
 import apache_beam as beam
 from apache_beam.examples.cookbook import custom_ptransform
 from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class CustomCountTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/examples/cookbook/filters_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters_test.py b/sdks/python/apache_beam/examples/cookbook/filters_test.py
index 44a352f..fd49f93 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters_test.py
@@ -23,6 +23,8 @@ import unittest
 import apache_beam as beam
 from apache_beam.examples.cookbook import filters
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class FiltersTest(unittest.TestCase):
@@ -45,22 +47,22 @@ class FiltersTest(unittest.TestCase):
   def test_basic(self):
     """Test that the correct result is returned for a simple dataset."""
     results = self._get_result_for_month(1)
-    beam.assert_that(
+    assert_that(
         results,
-        beam.equal_to([{'year': 2010, 'month': 1, 'day': 1, 'mean_temp': 3},
-                       {'year': 2012, 'month': 1, 'day': 2, 'mean_temp': 3}]))
+        equal_to([{'year': 2010, 'month': 1, 'day': 1, 'mean_temp': 3},
+                  {'year': 2012, 'month': 1, 'day': 2, 'mean_temp': 3}]))
     results.pipeline.run()
 
   def test_basic_empty(self):
     """Test that the correct empty result is returned for a simple dataset."""
     results = self._get_result_for_month(3)
-    beam.assert_that(results, beam.equal_to([]))
+    assert_that(results, equal_to([]))
     results.pipeline.run()
 
   def test_basic_empty_missing(self):
     """Test that the correct empty result is returned for a missing month."""
     results = self._get_result_for_month(4)
-    beam.assert_that(results, beam.equal_to([]))
+    assert_that(results, equal_to([]))
     results.pipeline.run()
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index 5aaba10..4f53c61 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -40,6 +40,8 @@ from apache_beam.io import ReadFromText
 from apache_beam.io import WriteToText
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 def run(argv=None, assert_results=None):
@@ -118,12 +120,12 @@ def run(argv=None, assert_results=None):
   # TODO(silviuc): Move the assert_results logic to the unit test.
   if assert_results is not None:
     expected_luddites, expected_writers, expected_nomads = assert_results
-    beam.assert_that(num_luddites, beam.equal_to([expected_luddites]),
-                     label='assert:luddites')
-    beam.assert_that(num_writers, beam.equal_to([expected_writers]),
-                     label='assert:writers')
-    beam.assert_that(num_nomads, beam.equal_to([expected_nomads]),
-                     label='assert:nomads')
+    assert_that(num_luddites, equal_to([expected_luddites]),
+                label='assert:luddites')
+    assert_that(num_writers, equal_to([expected_writers]),
+                label='assert:writers')
+    assert_that(num_nomads, equal_to([expected_nomads]),
+                label='assert:nomads')
   # Execute pipeline.
   return p.run()
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 1bdb9a3..7259572 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -33,6 +33,8 @@ string. The tags can contain only letters, digits and _.
 import apache_beam as beam
 from apache_beam.metrics import Metrics
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 # Quiet some pylint warnings that happen because of the somewhat special
 # format for the code snippets.
@@ -566,8 +568,9 @@ def examples_wordcount_debugging(renames):
       | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
 
   # [START example_wordcount_debugging_assert]
-  beam.assert_that(
-      filtered_words, beam.equal_to([('Flourish', 3), ('stomach', 1)]))
+  beam.testing.util.assert_that(
+      filtered_words, beam.testing.util.equal_to(
+          [('Flourish', 3), ('stomach', 1)]))
   # [END example_wordcount_debugging_assert]
 
   output = (filtered_words
@@ -661,8 +664,8 @@ def model_custom_source(count):
   # [END model_custom_source_use_new_source]
 
   lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
-  beam.assert_that(
-      lines, beam.equal_to(
+  assert_that(
+      lines, equal_to(
           ['line ' + str(number) for number in range(0, count)]))
 
   p.run().wait_until_finish()
@@ -691,8 +694,8 @@ def model_custom_source(count):
   # [END model_custom_source_use_ptransform]
 
   lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
-  beam.assert_that(
-      lines, beam.equal_to(
+  assert_that(
+      lines, equal_to(
           ['line ' + str(number) for number in range(0, count)]))
 
   # Don't test runner api due to pickling errors.
@@ -872,7 +875,7 @@ def model_textio_compressed(renames, expected):
       compression_type=beam.io.filesystem.CompressionTypes.GZIP)
   # [END model_textio_write_compressed]
 
-  beam.assert_that(lines, beam.equal_to(expected))
+  assert_that(lines, equal_to(expected))
   p.visit(SnippetUtils.RenameFiles(renames))
   p.run().wait_until_finish()
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 37cd470..f7b51a7 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -30,10 +30,10 @@ from apache_beam import coders
 from apache_beam import pvalue
 from apache_beam import typehints
 from apache_beam.coders.coders import ToStringCoder
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.examples.snippets import snippets
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 from apache_beam.utils.windowed_value import WindowedValue
 
 # pylint: disable=expression-not-assigned
@@ -158,11 +158,11 @@ class ParDoTest(unittest.TestCase):
                                                     avg_word_len))
     # [END model_pardo_side_input]
 
-    beam.assert_that(small_words, beam.equal_to(['a', 'bb', 'ccc']))
-    beam.assert_that(larger_than_average, beam.equal_to(['ccc', 'dddd']),
-                     label='larger_than_average')
-    beam.assert_that(small_but_nontrivial, beam.equal_to(['bb']),
-                     label='small_but_not_trivial')
+    assert_that(small_words, equal_to(['a', 'bb', 'ccc']))
+    assert_that(larger_than_average, equal_to(['ccc', 'dddd']),
+                label='larger_than_average')
+    assert_that(small_but_nontrivial, equal_to(['bb']),
+                label='small_but_not_trivial')
     p.run()
 
   def test_pardo_side_input_dofn(self):
@@ -816,7 +816,7 @@ class CombineTest(unittest.TestCase):
               | 'group' >> beam.GroupByKey()
               | 'combine' >> beam.CombineValues(sum))
     unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
-    beam.assert_that(unkeyed, beam.equal_to([110, 215, 120]))
+    assert_that(unkeyed, equal_to([110, 215, 120]))
     p.run()
 
   def test_setting_sliding_windows(self):
@@ -834,8 +834,8 @@ class CombineTest(unittest.TestCase):
               | 'group' >> beam.GroupByKey()
               | 'combine' >> beam.CombineValues(sum))
     unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
-    beam.assert_that(unkeyed,
-                     beam.equal_to([2, 2, 2, 18, 23, 39, 39, 39, 41, 41]))
+    assert_that(unkeyed,
+                equal_to([2, 2, 2, 18, 23, 39, 39, 39, 41, 41]))
     p.run()
 
   def test_setting_session_windows(self):
@@ -853,8 +853,8 @@ class CombineTest(unittest.TestCase):
               | 'group' >> beam.GroupByKey()
               | 'combine' >> beam.CombineValues(sum))
     unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
-    beam.assert_that(unkeyed,
-                     beam.equal_to([29, 27]))
+    assert_that(unkeyed,
+                equal_to([29, 27]))
     p.run()
 
   def test_setting_global_window(self):
@@ -872,7 +872,7 @@ class CombineTest(unittest.TestCase):
               | 'group' >> beam.GroupByKey()
               | 'combine' >> beam.CombineValues(sum))
     unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
-    beam.assert_that(unkeyed, beam.equal_to([56]))
+    assert_that(unkeyed, equal_to([56]))
     p.run()
 
   def test_setting_timestamp(self):
@@ -903,7 +903,7 @@ class CombineTest(unittest.TestCase):
               | 'group' >> beam.GroupByKey()
               | 'combine' >> beam.CombineValues(sum))
     unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
-    beam.assert_that(unkeyed, beam.equal_to([42, 187]))
+    assert_that(unkeyed, equal_to([42, 187]))
     p.run()
 
 
@@ -921,7 +921,7 @@ class PTransformTest(unittest.TestCase):
 
     p = TestPipeline()
     lengths = p | beam.Create(["a", "ab", "abc"]) | ComputeWordLengths()
-    beam.assert_that(lengths, beam.equal_to([1, 2, 3]))
+    assert_that(lengths, equal_to([1, 2, 3]))
     p.run()
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index 98acde4..ca9f7b6 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -51,6 +51,8 @@ from apache_beam.io import WriteToText
 from apache_beam.metrics import Metrics
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class FilterTextFn(beam.DoFn):
@@ -133,8 +135,8 @@ def run(argv=None):
   # of the Pipeline implies that the expectations were  met. Learn more at
   # https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to
   # test your pipeline.
-  beam.assert_that(
-      filtered_words, beam.equal_to([('Flourish', 3), ('stomach', 1)]))
+  assert_that(
+      filtered_words, equal_to([('Flourish', 3), ('stomach', 1)]))
 
   # Format the counts into a PCollection of strings and write the output using a
   # "Write" transform that has side effects.

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/io/avroio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py
index 4a21839..6dcf121 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -27,10 +27,10 @@ from apache_beam.io import avroio
 from apache_beam.io import filebasedsource
 from apache_beam.io import source_test_utils
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
 
 # Importing following private class for testing purposes.
 from apache_beam.io.avroio import _AvroSource as AvroSource

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/io/concat_source_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/concat_source_test.py b/sdks/python/apache_beam/io/concat_source_test.py
index a02f9ad..4a8f519 100644
--- a/sdks/python/apache_beam/io/concat_source_test.py
+++ b/sdks/python/apache_beam/io/concat_source_test.py
@@ -27,8 +27,8 @@ from apache_beam.io import range_trackers
 from apache_beam.io import source_test_utils
 from apache_beam.io.concat_source import ConcatSource
 from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class RangeSource(iobase.BoundedSource):

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index e17a004..afb340d 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -41,10 +41,10 @@ from apache_beam.io.filebasedsource import FileBasedSource
 from apache_beam.options.value_provider import StaticValueProvider
 from apache_beam.options.value_provider import RuntimeValueProvider
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
 
 
 class LineSource(FileBasedSource):

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/io/sources_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py
index c0b8ad6..10d401b 100644
--- a/sdks/python/apache_beam/io/sources_test.py
+++ b/sdks/python/apache_beam/io/sources_test.py
@@ -28,8 +28,8 @@ from apache_beam import coders
 from apache_beam.io import iobase
 from apache_beam.io import range_trackers
 from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class LineSource(iobase.BoundedSource):

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index d00afef..9a4ec47 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -44,9 +44,8 @@ from apache_beam.io.filebasedsource_test import write_pattern
 from apache_beam.io.filesystem import CompressionTypes
 
 from apache_beam.testing.test_pipeline import TestPipeline
-
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 # TODO: Refactor code so all io tests are using same library

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/io/tfrecordio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py b/sdks/python/apache_beam/io/tfrecordio_test.py
index b7e370d..3c70ade 100644
--- a/sdks/python/apache_beam/io/tfrecordio_test.py
+++ b/sdks/python/apache_beam/io/tfrecordio_test.py
@@ -36,6 +36,8 @@ from apache_beam.io.tfrecordio import _TFRecordUtil
 from apache_beam.io.tfrecordio import ReadFromTFRecord
 from apache_beam.io.tfrecordio import WriteToTFRecord
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 import crcmod
 
 
@@ -254,7 +256,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
                         coder=coders.BytesCoder(),
                         compression_type=CompressionTypes.AUTO,
                         validate=True)))
-      beam.assert_that(result, beam.equal_to(['foo']))
+      assert_that(result, equal_to(['foo']))
 
   def test_process_multiple(self):
     path = os.path.join(self._new_tempdir(), 'result')
@@ -267,7 +269,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
                         coder=coders.BytesCoder(),
                         compression_type=CompressionTypes.AUTO,
                         validate=True)))
-      beam.assert_that(result, beam.equal_to(['foo', 'bar']))
+      assert_that(result, equal_to(['foo', 'bar']))
 
   def test_process_gzip(self):
     path = os.path.join(self._new_tempdir(), 'result')
@@ -280,7 +282,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
                         coder=coders.BytesCoder(),
                         compression_type=CompressionTypes.GZIP,
                         validate=True)))
-      beam.assert_that(result, beam.equal_to(['foo', 'bar']))
+      assert_that(result, equal_to(['foo', 'bar']))
 
   def test_process_auto(self):
     path = os.path.join(self._new_tempdir(), 'result.gz')
@@ -293,7 +295,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
                         coder=coders.BytesCoder(),
                         compression_type=CompressionTypes.AUTO,
                         validate=True)))
-      beam.assert_that(result, beam.equal_to(['foo', 'bar']))
+      assert_that(result, equal_to(['foo', 'bar']))
 
 
 class TestReadFromTFRecordSource(TestTFRecordSource):
@@ -305,7 +307,7 @@ class TestReadFromTFRecordSource(TestTFRecordSource):
       result = (p
                 | ReadFromTFRecord(
                     path, compression_type=CompressionTypes.GZIP))
-      beam.assert_that(result, beam.equal_to(['foo', 'bar']))
+      assert_that(result, equal_to(['foo', 'bar']))
 
   def test_process_gzip_auto(self):
     path = os.path.join(self._new_tempdir(), 'result.gz')
@@ -314,7 +316,7 @@ class TestReadFromTFRecordSource(TestTFRecordSource):
       result = (p
                 | ReadFromTFRecord(
                     path, compression_type=CompressionTypes.AUTO))
-      beam.assert_that(result, beam.equal_to(['foo', 'bar']))
+      assert_that(result, equal_to(['foo', 'bar']))
 
 
 class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
@@ -337,7 +339,7 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
     # Read the file back and compare.
     with TestPipeline() as p:
       actual_data = p | ReadFromTFRecord(file_path_prefix + '-*')
-      beam.assert_that(actual_data, beam.equal_to(expected_data))
+      assert_that(actual_data, equal_to(expected_data))
 
   def test_end2end_auto_compression(self):
     file_path_prefix = os.path.join(self._new_tempdir(), 'result')
@@ -351,7 +353,7 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
     # Read the file back and compare.
     with TestPipeline() as p:
       actual_data = p | ReadFromTFRecord(file_path_prefix + '-*')
-      beam.assert_that(actual_data, beam.equal_to(expected_data))
+      assert_that(actual_data, equal_to(expected_data))
 
   def test_end2end_auto_compression_unsharded(self):
     file_path_prefix = os.path.join(self._new_tempdir(), 'result')
@@ -365,7 +367,7 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
     # Read the file back and compare.
     with TestPipeline() as p:
       actual_data = p | ReadFromTFRecord(file_path_prefix + '.gz')
-      beam.assert_that(actual_data, beam.equal_to(expected_data))
+      assert_that(actual_data, equal_to(expected_data))
 
   @unittest.skipIf(tf is None, 'tensorflow not installed.')
   def test_end2end_example_proto(self):
@@ -385,7 +387,7 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
       actual_data = (p | ReadFromTFRecord(
           file_path_prefix + '-*',
           coder=beam.coders.ProtoCoder(example.__class__)))
-      beam.assert_that(actual_data, beam.equal_to([example]))
+      assert_that(actual_data, equal_to([example]))
 
   def test_end2end_read_write_read(self):
     path = os.path.join(self._new_tempdir(), 'result')
@@ -400,7 +402,7 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
     # Read the file back and compare.
     with TestPipeline() as p:
       actual_data = p | ReadFromTFRecord(path+'-*', validate=True)
-      beam.assert_that(actual_data, beam.equal_to(expected_data))
+      assert_that(actual_data, equal_to(expected_data))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 8aa8a8a..e0775d1 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -33,6 +33,8 @@ from apache_beam.pipeline import PipelineVisitor
 from apache_beam.pvalue import AsSingleton
 from apache_beam.runners.dataflow.native_io.iobase import NativeSource
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 from apache_beam.transforms import CombineGlobally
 from apache_beam.transforms import Create
 from apache_beam.transforms import FlatMap
@@ -41,8 +43,6 @@ from apache_beam.transforms import DoFn
 from apache_beam.transforms import ParDo
 from apache_beam.transforms import PTransform
 from apache_beam.transforms import WindowInto
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
 from apache_beam.transforms.window import SlidingWindows
 from apache_beam.transforms.window import TimestampedValue
 from apache_beam.utils.timestamp import MIN_TIMESTAMP

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py b/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
index 062e6f9..b7ba15a 100644
--- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
@@ -28,9 +28,9 @@ from apache_beam.metrics.execution import MetricsEnvironment
 from apache_beam.metrics.metricbase import MetricName
 
 from apache_beam.pvalue import AsList
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import BeamAssertException
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import BeamAssertException
+from apache_beam.testing.util import equal_to
 from apache_beam.transforms.window import TimestampedValue
 from apache_beam.runners.portability import maptask_executor_runner
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index c61c49f..fa80b1c 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -36,8 +36,8 @@ from apache_beam.metrics.metricbase import MetricName
 from apache_beam.pipeline import Pipeline
 from apache_beam.runners import DirectRunner
 from apache_beam.runners import create_runner
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 from apache_beam.options.pipeline_options import PipelineOptions
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/testing/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/util.py b/sdks/python/apache_beam/testing/util.py
new file mode 100644
index 0000000..60a6b21
--- /dev/null
+++ b/sdks/python/apache_beam/testing/util.py
@@ -0,0 +1,107 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utilities for testing Beam pipelines."""
+
+from __future__ import absolute_import
+
+from apache_beam import pvalue
+from apache_beam.transforms import window
+from apache_beam.transforms.core import Create
+from apache_beam.transforms.core import Map
+from apache_beam.transforms.core import WindowInto
+from apache_beam.transforms.util import CoGroupByKey
+from apache_beam.transforms.ptransform import PTransform
+
+
+__all__ = [
+    'assert_that',
+    'equal_to',
+    'is_empty',
+    ]
+
+
+class BeamAssertException(Exception):
+  """Exception raised by matcher classes used by assert_that transform."""
+
+  pass
+
+
+# Note that equal_to always sorts the expected and actual since what we
+# compare are PCollections for which there is no guaranteed order.
+# However the sorting does not go beyond top level therefore [1,2] and [2,1]
+# are considered equal and [[1,2]] and [[2,1]] are not.
+# TODO(silviuc): Add contains_in_any_order-style matchers.
+def equal_to(expected):
+  expected = list(expected)
+
+  def _equal(actual):
+    sorted_expected = sorted(expected)
+    sorted_actual = sorted(actual)
+    if sorted_expected != sorted_actual:
+      raise BeamAssertException(
+          'Failed assert: %r == %r' % (sorted_expected, sorted_actual))
+  return _equal
+
+
+def is_empty():
+  def _empty(actual):
+    actual = list(actual)
+    if actual:
+      raise BeamAssertException(
+          'Failed assert: [] == %r' % actual)
+  return _empty
+
+
+def assert_that(actual, matcher, label='assert_that'):
+  """A PTransform that checks a PCollection has an expected value.
+
+  Note that assert_that should be used only for testing pipelines since the
+  check relies on materializing the entire PCollection being checked.
+
+  Args:
+    actual: A PCollection.
+    matcher: A matcher function taking as argument the actual value of a
+      materialized PCollection. The matcher validates this actual value against
+      expectations and raises BeamAssertException if they are not met.
+    label: Optional string label. This is needed in case several assert_that
+      transforms are introduced in the same pipeline.
+
+  Returns:
+    Ignored.
+  """
+  assert isinstance(actual, pvalue.PCollection)
+
+  class AssertThat(PTransform):
+
+    def expand(self, pcoll):
+      # We must have at least a single element to ensure the matcher
+      # code gets run even if the input pcollection is empty.
+      keyed_singleton = pcoll.pipeline | Create([(None, None)])
+      keyed_actual = (
+          pcoll
+          | WindowInto(window.GlobalWindows())
+          | "ToVoidKey" >> Map(lambda v: (None, v)))
+      _ = ((keyed_singleton, keyed_actual)
+           | "Group" >> CoGroupByKey()
+           | "Unkey" >> Map(lambda (k, (_, actual_values)): actual_values)
+           | "Match" >> Map(matcher))
+
+    def default_label(self):
+      return label
+
+  actual | AssertThat()  # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/testing/util_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/util_test.py b/sdks/python/apache_beam/testing/util_test.py
new file mode 100644
index 0000000..1acebb6
--- /dev/null
+++ b/sdks/python/apache_beam/testing/util_test.py
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for testing utilities."""
+
+import unittest
+
+from apache_beam import Create
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to, is_empty
+
+
+class UtilTest(unittest.TestCase):
+
+  def test_assert_that_passes(self):
+    with TestPipeline() as p:
+      assert_that(p | Create([1, 2, 3]), equal_to([1, 2, 3]))
+
+  def test_assert_that_fails(self):
+    with self.assertRaises(Exception):
+      with TestPipeline() as p:
+        assert_that(p | Create([1, 10, 100]), equal_to([1, 2, 3]))
+
+  def test_assert_that_fails_on_empty_input(self):
+    with self.assertRaises(Exception):
+      with TestPipeline() as p:
+        assert_that(p | Create([]), equal_to([1, 2, 3]))
+
+  def test_assert_that_fails_on_empty_expected(self):
+    with self.assertRaises(Exception):
+      with TestPipeline() as p:
+        assert_that(p | Create([1, 2, 3]), is_empty())
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index 1822c19..946a60a 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -24,13 +24,13 @@ import hamcrest as hc
 import apache_beam as beam
 from apache_beam.testing.test_pipeline import TestPipeline
 import apache_beam.transforms.combiners as combine
+from apache_beam.testing.util import assert_that, equal_to
 from apache_beam.transforms.core import CombineGlobally
 from apache_beam.transforms.core import Create
 from apache_beam.transforms.core import Map
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 from apache_beam.transforms.ptransform import PTransform
-from apache_beam.transforms.util import assert_that, equal_to
 
 
 class CombineTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/transforms/create_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/create_test.py b/sdks/python/apache_beam/transforms/create_test.py
index 9ede4c7..55ad7f3 100644
--- a/sdks/python/apache_beam/transforms/create_test.py
+++ b/sdks/python/apache_beam/transforms/create_test.py
@@ -20,9 +20,10 @@ import unittest
 
 from apache_beam.io import source_test_utils
 
-from apache_beam import Create, assert_that, equal_to
+from apache_beam import Create
 from apache_beam.coders import FastPrimitivesCoder
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to
 
 
 class CreateTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 3320d79..f790660 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -33,12 +33,12 @@ from apache_beam.io.iobase import Read
 from apache_beam.options.pipeline_options import TypeOptions
 import apache_beam.pvalue as pvalue
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to
 from apache_beam.transforms import window
 from apache_beam.transforms.core import GroupByKeyOnly
 import apache_beam.transforms.combiners as combine
 from apache_beam.transforms.display import DisplayData, DisplayDataItem
 from apache_beam.transforms.ptransform import PTransform
-from apache_beam.transforms.util import assert_that, equal_to
 import apache_beam.typehints as typehints
 from apache_beam.typehints import with_input_types
 from apache_beam.typehints import with_output_types

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/transforms/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py
index 0bc9107..6500681 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -24,8 +24,8 @@ from nose.plugins.attrib import attr
 
 import apache_beam as beam
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to
 from apache_beam.transforms import window
-from apache_beam.transforms.util import assert_that, equal_to
 
 
 class SideInputsTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index da7f43b..4a5affb 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -27,6 +27,7 @@ import yaml
 import apache_beam as beam
 from apache_beam.runners import pipeline_context
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to
 from apache_beam.transforms import trigger
 from apache_beam.transforms.core import Windowing
 from apache_beam.transforms.trigger import AccumulationMode
@@ -40,7 +41,6 @@ from apache_beam.transforms.trigger import GeneralTriggerDriver
 from apache_beam.transforms.trigger import InMemoryUnmergedState
 from apache_beam.transforms.trigger import Repeatedly
 from apache_beam.transforms.trigger import TriggerFn
-from apache_beam.transforms.util import assert_that, equal_to
 from apache_beam.transforms.window import FixedWindows
 from apache_beam.transforms.window import IntervalWindow
 from apache_beam.transforms.window import MIN_TIMESTAMP

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index a6ecf0a..a7484ac 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -20,14 +20,10 @@
 
 from __future__ import absolute_import
 
-from apache_beam import pvalue
-from apache_beam.transforms import window
 from apache_beam.transforms.core import CombinePerKey
-from apache_beam.transforms.core import Create
 from apache_beam.transforms.core import Flatten
 from apache_beam.transforms.core import GroupByKey
 from apache_beam.transforms.core import Map
-from apache_beam.transforms.core import WindowInto
 from apache_beam.transforms.ptransform import PTransform
 from apache_beam.transforms.ptransform import ptransform_fn
 
@@ -38,9 +34,6 @@ __all__ = [
     'KvSwap',
     'RemoveDuplicates',
     'Values',
-    'assert_that',
-    'equal_to',
-    'is_empty',
     ]
 
 
@@ -169,75 +162,3 @@ def RemoveDuplicates(pcoll):  # pylint: disable=invalid-name
           | 'ToPairs' >> Map(lambda v: (v, None))
           | 'Group' >> CombinePerKey(lambda vs: None)
           | 'RemoveDuplicates' >> Keys())
-
-
-class BeamAssertException(Exception):
-  """Exception raised by matcher classes used by assert_that transform."""
-
-  pass
-
-
-# Note that equal_to always sorts the expected and actual since what we
-# compare are PCollections for which there is no guaranteed order.
-# However the sorting does not go beyond top level therefore [1,2] and [2,1]
-# are considered equal and [[1,2]] and [[2,1]] are not.
-# TODO(silviuc): Add contains_in_any_order-style matchers.
-def equal_to(expected):
-  expected = list(expected)
-
-  def _equal(actual):
-    sorted_expected = sorted(expected)
-    sorted_actual = sorted(actual)
-    if sorted_expected != sorted_actual:
-      raise BeamAssertException(
-          'Failed assert: %r == %r' % (sorted_expected, sorted_actual))
-  return _equal
-
-
-def is_empty():
-  def _empty(actual):
-    actual = list(actual)
-    if actual:
-      raise BeamAssertException(
-          'Failed assert: [] == %r' % actual)
-  return _empty
-
-
-def assert_that(actual, matcher, label='assert_that'):
-  """A PTransform that checks a PCollection has an expected value.
-
-  Note that assert_that should be used only for testing pipelines since the
-  check relies on materializing the entire PCollection being checked.
-
-  Args:
-    actual: A PCollection.
-    matcher: A matcher function taking as argument the actual value of a
-      materialized PCollection. The matcher validates this actual value against
-      expectations and raises BeamAssertException if they are not met.
-    label: Optional string label. This is needed in case several assert_that
-      transforms are introduced in the same pipeline.
-
-  Returns:
-    Ignored.
-  """
-  assert isinstance(actual, pvalue.PCollection)
-
-  class AssertThat(PTransform):
-
-    def expand(self, pcoll):
-      # We must have at least a single element to ensure the matcher
-      # code gets run even if the input pcollection is empty.
-      keyed_singleton = pcoll.pipeline | Create([(None, None)])
-      keyed_actual = (
-          pcoll
-          | WindowInto(window.GlobalWindows())
-          | "ToVoidKey" >> Map(lambda v: (None, v)))
-      _ = ((keyed_singleton, keyed_actual)
-           | "Group" >> CoGroupByKey()
-           | "Unkey" >> Map(lambda (k, (_, actual_values)): actual_values)
-           | "Match" >> Map(matcher))
-
-    def default_label(self):
-      return label
-
-  actual | AssertThat()  # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/transforms/util_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py
deleted file mode 100644
index 7fdef70..0000000
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ /dev/null
@@ -1,50 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for the util transforms."""
-
-import unittest
-
-from apache_beam import Create
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that, equal_to, is_empty
-
-
-class UtilTest(unittest.TestCase):
-
-  def test_assert_that_passes(self):
-    with TestPipeline() as p:
-      assert_that(p | Create([1, 2, 3]), equal_to([1, 2, 3]))
-
-  def test_assert_that_fails(self):
-    with self.assertRaises(Exception):
-      with TestPipeline() as p:
-        assert_that(p | Create([1, 10, 100]), equal_to([1, 2, 3]))
-
-  def test_assert_that_fails_on_empty_input(self):
-    with self.assertRaises(Exception):
-      with TestPipeline() as p:
-        assert_that(p | Create([]), equal_to([1, 2, 3]))
-
-  def test_assert_that_fails_on_empty_expected(self):
-    with self.assertRaises(Exception):
-      with TestPipeline() as p:
-        assert_that(p | Create([1, 2, 3]), is_empty())
-
-
-if __name__ == '__main__':
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index a7797dd..fd1bb9d 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -21,6 +21,7 @@ import unittest
 
 from apache_beam.runners import pipeline_context
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to
 from apache_beam.transforms import CombinePerKey
 from apache_beam.transforms import combiners
 from apache_beam.transforms import core
@@ -31,7 +32,6 @@ from apache_beam.transforms import WindowInto
 from apache_beam.transforms.core import Windowing
 from apache_beam.transforms.trigger import AccumulationMode
 from apache_beam.transforms.trigger import AfterCount
-from apache_beam.transforms.util import assert_that, equal_to
 from apache_beam.transforms.window import FixedWindows
 from apache_beam.transforms.window import GlobalWindow
 from apache_beam.transforms.window import GlobalWindows

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/transforms/write_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py
index 27e7caa..e31b9cc 100644
--- a/sdks/python/apache_beam/transforms/write_ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py
@@ -23,8 +23,8 @@ import apache_beam as beam
 
 from apache_beam.io import iobase
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, is_empty
 from apache_beam.transforms.ptransform import PTransform
-from apache_beam.transforms.util import assert_that, is_empty
 
 
 class _TestSink(iobase.Sink):

http://git-wip-us.apache.org/repos/asf/beam/blob/0b7356e3/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 3494cfe..589dc0e 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -25,7 +25,7 @@ from apache_beam import pvalue
 from apache_beam import typehints
 from apache_beam.options.pipeline_options import OptionsContext
 from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that, equal_to
+from apache_beam.testing.util import assert_that, equal_to
 from apache_beam.typehints import WithTypeHints
 
 # These test often construct a pipeline as value | PTransform to test side


Mime
View raw message