beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/2] beam git commit: [BEAM-1464] All test should use TestPipeline instead of specifying DirectRunner
Date Sat, 11 Feb 2017 01:52:30 GMT
Repository: beam
Updated Branches:
  refs/heads/master a2826195b -> 3acdd956f


[BEAM-1464] All test should use TestPipeline instead of specifying DirectRunner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0f3a24ac
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0f3a24ac
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0f3a24ac

Branch: refs/heads/master
Commit: 0f3a24acf308452160ab07f444226c927012822f
Parents: a282619
Author: Sourabh Bajaj <sourabhbajaj@google.com>
Authored: Fri Feb 10 16:21:35 2017 -0800
Committer: Ahmet Altay <altay@google.com>
Committed: Fri Feb 10 17:52:03 2017 -0800

----------------------------------------------------------------------
 .../cookbook/bigquery_tornadoes_test.py         |  3 +-
 sdks/python/apache_beam/io/avroio_test.py       | 11 ++++---
 sdks/python/apache_beam/io/textio_test.py       |  2 +-
 sdks/python/apache_beam/io/tfrecordio_test.py   | 34 ++++++++++----------
 sdks/python/apache_beam/pipeline_test.py        |  2 +-
 sdks/python/apache_beam/pvalue_test.py          |  5 +--
 .../apache_beam/transforms/sideinputs_test.py   |  2 +-
 .../apache_beam/transforms/trigger_test.py      |  4 +--
 8 files changed, 33 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0f3a24ac/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
index 0fabe3f..0c66d7e 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
@@ -22,12 +22,13 @@ import unittest
 
 import apache_beam as beam
 from apache_beam.examples.cookbook import bigquery_tornadoes
+from apache_beam.test_pipeline import TestPipeline
 
 
 class BigQueryTornadoesTest(unittest.TestCase):
 
   def test_basics(self):
-    p = beam.Pipeline('DirectRunner')
+    p = TestPipeline()
     rows = (p | 'create' >> beam.Create([
         {'month': 1, 'day': 1, 'tornado': False},
         {'month': 1, 'day': 2, 'tornado': True},

http://git-wip-us.apache.org/repos/asf/beam/blob/0f3a24ac/sdks/python/apache_beam/io/avroio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py
index 76e723f..233ab69 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -25,6 +25,7 @@ import apache_beam as beam
 from apache_beam.io import avroio
 from apache_beam.io import filebasedsource
 from apache_beam.io import source_test_utils
+from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 from apache_beam.transforms.util import assert_that
@@ -316,16 +317,16 @@ class TestAvro(unittest.TestCase):
 
   def test_source_transform(self):
     path = self._write_data()
-    with beam.Pipeline('DirectRunner') as p:
+    with TestPipeline() as p:
       assert_that(p | avroio.ReadFromAvro(path), equal_to(self.RECORDS))
 
   def test_sink_transform(self):
     with tempfile.NamedTemporaryFile() as dst:
       path = dst.name
-      with beam.Pipeline('DirectRunner') as p:
+      with TestPipeline() as p:
         # pylint: disable=expression-not-assigned
         p | beam.Create(self.RECORDS) | avroio.WriteToAvro(path, self.SCHEMA)
-      with beam.Pipeline('DirectRunner') as p:
+      with TestPipeline() as p:
         # json used for stable sortability
         readback = p | avroio.ReadFromAvro(path + '*') | beam.Map(json.dumps)
         assert_that(readback, equal_to([json.dumps(r) for r in self.RECORDS]))
@@ -334,11 +335,11 @@ class TestAvro(unittest.TestCase):
   def test_sink_transform_snappy(self):
     with tempfile.NamedTemporaryFile() as dst:
       path = dst.name
-      with beam.Pipeline('DirectRunner') as p:
+      with TestPipeline() as p:
         # pylint: disable=expression-not-assigned
         p | beam.Create(self.RECORDS) | avroio.WriteToAvro(
             path, self.SCHEMA, codec='snappy')
-      with beam.Pipeline('DirectRunner') as p:
+      with TestPipeline() as p:
         # json used for stable sortability
         readback = p | avroio.ReadFromAvro(path + '*') | beam.Map(json.dumps)
         assert_that(readback, equal_to([json.dumps(r) for r in self.RECORDS]))

http://git-wip-us.apache.org/repos/asf/beam/blob/0f3a24ac/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index 0dedc95..f3ce843 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -529,7 +529,7 @@ class TextSourceTest(_TestCaseWithTempDirCleanUp):
     with gzip.GzipFile(file_name, 'wb') as f:
       f.write('\n'.join(lines))
 
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'Read' >> ReadFromText(
         file_name, 0, CompressionTypes.GZIP,
         True, coders.StrUtf8Coder(), skip_header_lines=2)

http://git-wip-us.apache.org/repos/asf/beam/blob/0f3a24ac/sdks/python/apache_beam/io/tfrecordio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py b/sdks/python/apache_beam/io/tfrecordio_test.py
index ecd58f5..e5dcbdc 100644
--- a/sdks/python/apache_beam/io/tfrecordio_test.py
+++ b/sdks/python/apache_beam/io/tfrecordio_test.py
@@ -35,7 +35,7 @@ from apache_beam.io.tfrecordio import _TFRecordSource
 from apache_beam.io.tfrecordio import _TFRecordUtil
 from apache_beam.io.tfrecordio import ReadFromTFRecord
 from apache_beam.io.tfrecordio import WriteToTFRecord
-from apache_beam.runners import DirectRunner
+from apache_beam.test_pipeline import TestPipeline
 import crcmod
 
 
@@ -202,7 +202,7 @@ class TestWriteToTFRecord(TestTFRecordSink):
 
   def test_write_record_gzip(self):
     file_path_prefix = os.path.join(self._new_tempdir(), 'result')
-    with beam.Pipeline(DirectRunner()) as p:
+    with TestPipeline() as p:
       input_data = ['foo', 'bar']
       _ = p | beam.Create(input_data) | WriteToTFRecord(
           file_path_prefix, compression_type=fileio.CompressionTypes.GZIP)
@@ -217,7 +217,7 @@ class TestWriteToTFRecord(TestTFRecordSink):
 
   def test_write_record_auto(self):
     file_path_prefix = os.path.join(self._new_tempdir(), 'result')
-    with beam.Pipeline(DirectRunner()) as p:
+    with TestPipeline() as p:
       input_data = ['foo', 'bar']
       _ = p | beam.Create(input_data) | WriteToTFRecord(
           file_path_prefix, file_name_suffix='.gz')
@@ -246,7 +246,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
   def test_process_single(self):
     path = os.path.join(self._new_tempdir(), 'result')
     self._write_file(path, FOO_RECORD_BASE64)
-    with beam.Pipeline(DirectRunner()) as p:
+    with TestPipeline() as p:
       result = (p
                 | beam.Read(
                     _TFRecordSource(
@@ -258,7 +258,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
   def test_process_multiple(self):
     path = os.path.join(self._new_tempdir(), 'result')
     self._write_file(path, FOO_BAR_RECORD_BASE64)
-    with beam.Pipeline(DirectRunner()) as p:
+    with TestPipeline() as p:
       result = (p
                 | beam.Read(
                     _TFRecordSource(
@@ -270,7 +270,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
   def test_process_gzip(self):
     path = os.path.join(self._new_tempdir(), 'result')
     self._write_file_gzip(path, FOO_BAR_RECORD_BASE64)
-    with beam.Pipeline(DirectRunner()) as p:
+    with TestPipeline() as p:
       result = (p
                 | beam.Read(
                     _TFRecordSource(
@@ -282,7 +282,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
   def test_process_auto(self):
     path = os.path.join(self._new_tempdir(), 'result.gz')
     self._write_file_gzip(path, FOO_BAR_RECORD_BASE64)
-    with beam.Pipeline(DirectRunner()) as p:
+    with TestPipeline() as p:
       result = (p
                 | beam.Read(
                     _TFRecordSource(
@@ -297,7 +297,7 @@ class TestReadFromTFRecordSource(TestTFRecordSource):
   def test_process_gzip(self):
     path = os.path.join(self._new_tempdir(), 'result')
     self._write_file_gzip(path, FOO_BAR_RECORD_BASE64)
-    with beam.Pipeline(DirectRunner()) as p:
+    with TestPipeline() as p:
       result = (p
                 | ReadFromTFRecord(
                     path, compression_type=fileio.CompressionTypes.GZIP))
@@ -306,7 +306,7 @@ class TestReadFromTFRecordSource(TestTFRecordSource):
   def test_process_gzip_auto(self):
     path = os.path.join(self._new_tempdir(), 'result.gz')
     self._write_file_gzip(path, FOO_BAR_RECORD_BASE64)
-    with beam.Pipeline(DirectRunner()) as p:
+    with TestPipeline() as p:
       result = (p
                 | ReadFromTFRecord(
                     path, compression_type=fileio.CompressionTypes.AUTO))
@@ -326,12 +326,12 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
     file_path_prefix = os.path.join(self._new_tempdir(), 'result')
 
     # Generate a TFRecord file.
-    with beam.Pipeline(DirectRunner()) as p:
+    with TestPipeline() as p:
       expected_data = [self.create_inputs() for _ in range(0, 10)]
       _ = p | beam.Create(expected_data) | WriteToTFRecord(file_path_prefix)
 
     # Read the file back and compare.
-    with beam.Pipeline(DirectRunner()) as p:
+    with TestPipeline() as p:
       actual_data = p | ReadFromTFRecord(file_path_prefix + '-*')
       beam.assert_that(actual_data, beam.equal_to(expected_data))
 
@@ -339,13 +339,13 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
     file_path_prefix = os.path.join(self._new_tempdir(), 'result')
 
     # Generate a TFRecord file.
-    with beam.Pipeline(DirectRunner()) as p:
+    with TestPipeline() as p:
       expected_data = [self.create_inputs() for _ in range(0, 10)]
       _ = p | beam.Create(expected_data) | WriteToTFRecord(
           file_path_prefix, file_name_suffix='.gz')
 
     # Read the file back and compare.
-    with beam.Pipeline(DirectRunner()) as p:
+    with TestPipeline() as p:
       actual_data = p | ReadFromTFRecord(file_path_prefix + '-*')
       beam.assert_that(actual_data, beam.equal_to(expected_data))
 
@@ -353,13 +353,13 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
     file_path_prefix = os.path.join(self._new_tempdir(), 'result')
 
     # Generate a TFRecord file.
-    with beam.Pipeline(DirectRunner()) as p:
+    with TestPipeline() as p:
       expected_data = [self.create_inputs() for _ in range(0, 10)]
       _ = p | beam.Create(expected_data) | WriteToTFRecord(
           file_path_prefix + '.gz', shard_name_template='')
 
     # Read the file back and compare.
-    with beam.Pipeline(DirectRunner()) as p:
+    with TestPipeline() as p:
       actual_data = p | ReadFromTFRecord(file_path_prefix + '.gz')
       beam.assert_that(actual_data, beam.equal_to(expected_data))
 
@@ -372,12 +372,12 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
     example.features.feature['bytes'].bytes_list.value.extend(
         [b'foo', b'bar'])
 
-    with beam.Pipeline(DirectRunner()) as p:
+    with TestPipeline() as p:
       _ = p | beam.Create([example]) | WriteToTFRecord(
           file_path_prefix, coder=beam.coders.ProtoCoder(example.__class__))
 
     # Read the file back and compare.
-    with beam.Pipeline(DirectRunner()) as p:
+    with TestPipeline() as p:
       actual_data = (p | ReadFromTFRecord(
           file_path_prefix + '-*',
           coder=beam.coders.ProtoCoder(example.__class__)))

http://git-wip-us.apache.org/repos/asf/beam/blob/0f3a24ac/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 2f188aa..e02ebc3 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -229,7 +229,7 @@ class PipelineTest(unittest.TestCase):
     num_elements = 10
     num_maps = 100
 
-    pipeline = TestPipeline(runner='DirectRunner')
+    pipeline = TestPipeline()
 
     # Consumed memory should not be proportional to the number of maps.
     memory_threshold = (

http://git-wip-us.apache.org/repos/asf/beam/blob/0f3a24ac/sdks/python/apache_beam/pvalue_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue_test.py b/sdks/python/apache_beam/pvalue_test.py
index 6b4b663..86f1987 100644
--- a/sdks/python/apache_beam/pvalue_test.py
+++ b/sdks/python/apache_beam/pvalue_test.py
@@ -25,6 +25,7 @@ from apache_beam.pvalue import AsIter
 from apache_beam.pvalue import AsList
 from apache_beam.pvalue import AsSingleton
 from apache_beam.pvalue import PValue
+from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms import Create
 
 
@@ -39,12 +40,12 @@ class FakePipeline(Pipeline):
 class PValueTest(unittest.TestCase):
 
   def test_pvalue_expected_arguments(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     value = PValue(pipeline)
     self.assertEqual(pipeline, value.pipeline)
 
   def test_pcollectionview_not_recreated(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     value = pipeline | 'create1' >> Create([1, 2, 3])
     value2 = pipeline | 'create2' >> Create([(1, 1), (2, 2), (3, 3)])
     value3 = pipeline | 'create3' >> Create([(1, 1), (2, 2), (3, 3)])

http://git-wip-us.apache.org/repos/asf/beam/blob/0f3a24ac/sdks/python/apache_beam/transforms/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py
index 4672709..7df870f 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -31,7 +31,7 @@ from apache_beam.transforms.util import assert_that, equal_to
 class SideInputsTest(unittest.TestCase):
 
   def create_pipeline(self):
-    return TestPipeline('DirectRunner')
+    return TestPipeline()
 
   def run_windowed_side_inputs(self, elements, main_window_fn,
                                side_window_fn=None,

http://git-wip-us.apache.org/repos/asf/beam/blob/0f3a24ac/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index fd34f4c..72bab2e 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -25,7 +25,7 @@ import unittest
 import yaml
 
 import apache_beam as beam
-from apache_beam.pipeline import Pipeline
+from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms import trigger
 from apache_beam.transforms.core import Windowing
 from apache_beam.transforms.trigger import AccumulationMode
@@ -383,7 +383,7 @@ class TriggerTest(unittest.TestCase):
 class TriggerPipelineTest(unittest.TestCase):
 
   def test_after_count(self):
-    p = Pipeline('DirectRunner')
+    p = TestPipeline()
     result = (p
               | beam.Create([1, 2, 3, 4, 5, 10, 11])
               | beam.FlatMap(lambda t: [('A', t), ('B', t + 5)])


Mime
View raw message