beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [beam] branch master updated: [BEAM-5321] Port transforms package to Python 3 (#7104)
Date Fri, 07 Dec 2018 00:26:22 GMT
This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9159d9b  [BEAM-5321] Port transforms package to Python 3 (#7104)
9159d9b is described below

commit 9159d9b8b072bf2e7b1b3a6fdd269ffb7081612f
Author: Robbe Sneyders <robbe.sneyders@gmail.com>
AuthorDate: Fri Dec 7 01:26:15 2018 +0100

    [BEAM-5321] Port transforms package to Python 3 (#7104)
    
    * Port transforms package to Python 3
---
 .../apache_beam/transforms/ptransform_test.py      | 114 +++++++++++++--------
 .../apache_beam/transforms/userstate_test.py       |  32 +++---
 sdks/python/setup.py                               |  13 ++-
 sdks/python/tox.ini                                |   4 +-
 4 files changed, 97 insertions(+), 66 deletions(-)

diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 95ea03f..ffb4331 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -668,7 +668,7 @@ class PTransformTest(unittest.TestCase):
   def test_apply_to_list(self):
     self.assertCountEqual(
         [1, 2, 3], [0, 1, 2] | 'AddOne' >> beam.Map(lambda x: x + 1))
-    self.assertItemsEqual([1],
+    self.assertCountEqual([1],
                           [0, 1, 2] | 'Odd' >> beam.Filter(lambda x: x % 2))
     self.assertCountEqual([1, 2, 100, 3],
                           ([1, 2, 3], [100]) | beam.Flatten())
@@ -947,7 +947,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
        | 'Upper' >> beam.ParDo(ToUpperCaseWithPrefix(), 'hello'))
 
     self.assertEqual("Type hint violation for 'Upper': "
-                     "requires <type 'str'> but got <type 'int'> for element",
+                     "requires {} but got {} for element".format(str, int),
                      e.exception.args[0])
 
   def test_do_fn_pipeline_runtime_type_check_satisfied(self):
@@ -982,7 +982,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       self.p.run()
 
     self.assertEqual("Type hint violation for 'Add': "
-                     "requires <type 'int'> but got <type 'str'> for element",
+                     "requires {} but got {} for element".format(int, str),
                      e.exception.args[0])
 
   def test_pardo_does_not_type_check_using_type_hint_decorators(self):
@@ -999,7 +999,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
        | 'ToStr' >> beam.FlatMap(int_to_str))
 
     self.assertEqual("Type hint violation for 'ToStr': "
-                     "requires <type 'int'> but got <type 'str'> for a",
+                     "requires {} but got {} for a".format(int, str),
                      e.exception.args[0])
 
   def test_pardo_properly_type_checks_using_type_hint_decorators(self):
@@ -1031,7 +1031,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
           .with_input_types(str).with_output_types(str)))
 
     self.assertEqual("Type hint violation for 'Upper': "
-                     "requires <type 'str'> but got <type 'int'> for x",
+                     "requires {} but got {} for x".format(str, int),
                      e.exception.args[0])
 
   def test_pardo_properly_type_checks_using_type_hint_methods(self):
@@ -1056,7 +1056,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
        .with_input_types(str).with_output_types(str))
 
     self.assertEqual("Type hint violation for 'Upper': "
-                     "requires <type 'str'> but got <type 'int'> for x",
+                     "requires {} but got {} for x".format(str, int),
                      e.exception.args[0])
 
   def test_map_properly_type_checks_using_type_hints_methods(self):
@@ -1082,7 +1082,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
        | 'Upper' >> beam.Map(upper))
 
     self.assertEqual("Type hint violation for 'Upper': "
-                     "requires <type 'str'> but got <type 'int'> for s",
+                     "requires {} but got {} for s".format(str, int),
                      e.exception.args[0])
 
   def test_map_properly_type_checks_using_type_hints_decorator(self):
@@ -1109,7 +1109,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
        | 'Below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int))
 
     self.assertEqual("Type hint violation for 'Below 3': "
-                     "requires <type 'int'> but got <type 'str'> for x",
+                     "requires {} but got {} for x".format(int, str),
                      e.exception.args[0])
 
   def test_filter_type_checks_using_type_hints_method(self):
@@ -1134,7 +1134,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
        | 'Half' >> beam.Filter(more_than_half))
 
     self.assertEqual("Type hint violation for 'Half': "
-                     "requires <type 'float'> but got <type 'int'> for a",
+                     "requires {} but got {} for a".format(float, int),
                      e.exception.args[0])
 
   def test_filter_type_checks_using_type_hints_decorator(self):
@@ -1183,7 +1183,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     self.assertEqual("Input type hint violation at F: "
                      "expected Tuple[TypeVariable[K], TypeVariable[V]], "
-                     "got <type 'int'>",
+                     "got {}".format(int),
                      e.exception.args[0])
 
   def test_group_by_does_not_type_check(self):
@@ -1260,8 +1260,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         e.exception.args[0],
         "Runtime type violation detected within ParDo(ToStr): "
         "Type-hint for argument: 'x' violated. "
-        "Expected an instance of <type 'int'>, "
-        "instead found some_string, an instance of <type 'str'>.")
+        "Expected an instance of {}, "
+        "instead found some_string, an instance of {}.".format(int, str))
 
   def test_run_time_type_checking_enabled_types_satisfied(self):
     self.p._options.view_as(TypeOptions).pipeline_type_check = False
@@ -1351,8 +1351,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         e.exception.args[0],
         "Runtime type violation detected within ParDo(ToInt): "
         "Type-hint for argument: 'x' violated. "
-        "Expected an instance of <type 'str'>, "
-        "instead found 1, an instance of <type 'int'>.")
+        "Expected an instance of {}, "
+        "instead found 1, an instance of {}.".format(str, int))
 
   def test_pipeline_runtime_checking_violation_composite_type_input(self):
     self.p._options.view_as(TypeOptions).runtime_type_check = True
@@ -1398,8 +1398,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         "Runtime type violation detected within "
         "ParDo(ToInt): "
         "According to type-hint expected output should be "
-        "of type <type 'int'>. Instead, received '1.0', "
-        "an instance of type <type 'float'>.")
+        "of type {}. Instead, received '1.0', "
+        "an instance of type {}.".format(int, float))
 
   def test_pipeline_runtime_checking_violation_composite_type_output(self):
     self.p._options.view_as(TypeOptions).runtime_type_check = True
@@ -1441,8 +1441,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         e.exception.args[0],
         "Runtime type violation detected within ParDo(Add 1): "
         "Type-hint for argument: 'b' violated. "
-        "Expected an instance of <type 'int'>, "
-        "instead found 1.0, an instance of <type 'float'>.")
+        "Expected an instance of {}, "
+        "instead found 1.0, an instance of {}.".format(int, float))
 
   def test_pipeline_runtime_checking_violation_with_side_inputs_via_method(self):  # pylint:
disable=line-too-long
     self.p._options.view_as(TypeOptions).runtime_type_check = True
@@ -1460,8 +1460,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         e.exception.args[0],
         "Runtime type violation detected within ParDo(Add 1): "
         "Type-hint for argument: 'one' violated. "
-        "Expected an instance of <type 'int'>, "
-        "instead found 1.0, an instance of <type 'float'>.")
+        "Expected an instance of {}, "
+        "instead found 1.0, an instance of {}.".format(int, float))
 
   def test_combine_properly_pipeline_type_checks_using_decorator(self):
     @with_output_types(int)
@@ -1491,7 +1491,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.assertEqual(
         "All functions for a Combine PTransform must accept a "
         "single argument compatible with: Iterable[Any]. "
-        "Instead a function with input type: <type 'int'> was received.",
+        "Instead a function with input type: {} was received.".format(int),
         e.exception.args[0])
 
   def test_combine_pipeline_type_propagation_using_decorators(self):
@@ -1550,7 +1550,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         "Runtime type violation detected within "
         "Mul/CombinePerKey: "
         "Type-hint for return type violated. "
-        "Expected an instance of <type 'int'>, instead found")
+        "Expected an instance of {}, instead found".format(int))
 
   def test_combine_pipeline_type_check_using_methods(self):
     d = (self.p
@@ -1585,7 +1585,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
           .with_input_types(str).with_output_types(str)))
 
     self.assertEqual("Input type hint violation at SortJoin: "
-                     "expected <type 'str'>, got <type 'int'>",
+                     "expected {}, got {}".format(str, int),
                      e.exception.args[0])
 
   def test_combine_runtime_type_check_violation_using_methods(self):
@@ -1604,8 +1604,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         "Runtime type violation detected within "
         "ParDo(SortJoin/KeyWithVoid): "
         "Type-hint for argument: 'v' violated. "
-        "Expected an instance of <type 'str'>, "
-        "instead found 0, an instance of <type 'int'>.")
+        "Expected an instance of {}, "
+        "instead found 0, an instance of {}.".format(str, int))
 
   def test_combine_insufficient_type_hint_information(self):
     self.p._options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
@@ -1638,11 +1638,18 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
        | 'C' >> beam.Create(['test']).with_output_types(str)
        | 'Mean' >> combine.Mean.Globally())
 
-    self.assertEqual(
-        "Type hint violation for 'CombinePerKey': "
-        "requires Tuple[TypeVariable[K], Union[float, int, long]] "
-        "but got Tuple[None, str] for element",
-        e.exception.args[0])
+    if sys.version_info[0] >= 3:
+      expected_msg = \
+        "Type hint violation for 'CombinePerKey': " \
+        "requires Tuple[TypeVariable[K], Union[float, int]] " \
+        "but got Tuple[None, str] for element"
+    else:
+      expected_msg = \
+        "Type hint violation for 'CombinePerKey': " \
+        "requires Tuple[TypeVariable[K], Union[float, int, long]] " \
+        "but got Tuple[None, str] for element"
+
+    self.assertEqual(expected_msg, e.exception.args[0])
 
   def test_mean_globally_runtime_checking_satisfied(self):
     self.p._options.view_as(TypeOptions).runtime_type_check = True
@@ -1695,11 +1702,18 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
        | 'EvenMean' >> combine.Mean.PerKey())
       self.p.run()
 
-    self.assertEqual(
-        "Type hint violation for 'CombinePerKey(MeanCombineFn)': "
-        "requires Tuple[TypeVariable[K], Union[float, int, long]] "
-        "but got Tuple[str, str] for element",
-        e.exception.args[0])
+    if sys.version_info[0] >= 3:
+      expected_msg = \
+        "Type hint violation for 'CombinePerKey(MeanCombineFn)': " \
+        "requires Tuple[TypeVariable[K], Union[float, int]] " \
+        "but got Tuple[str, str] for element"
+    else:
+      expected_msg = \
+        "Type hint violation for 'CombinePerKey(MeanCombineFn)': " \
+        "requires Tuple[TypeVariable[K], Union[float, int, long]] " \
+        "but got Tuple[str, str] for element"
+
+    self.assertEqual(expected_msg, e.exception.args[0])
 
   def test_mean_per_key_runtime_checking_satisfied(self):
     self.p._options.view_as(TypeOptions).runtime_type_check = True
@@ -1726,14 +1740,24 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
        | 'OddMean' >> combine.Mean.PerKey())
       self.p.run()
 
-    self.assertStartswith(
-        e.exception.args[0],
-        "Runtime type violation detected within "
-        "OddMean/CombinePerKey(MeanCombineFn): "
-        "Type-hint for argument: 'element' violated: "
-        "Union[float, int, long] type-constraint violated. "
-        "Expected an instance of one of: ('float', 'int', 'long'), "
-        "received str instead")
+    if sys.version_info[0] >= 3:
+      expected_msg = \
+        "Runtime type violation detected within " \
+        "OddMean/CombinePerKey(MeanCombineFn): " \
+        "Type-hint for argument: 'element' violated: " \
+        "Union[float, int] type-constraint violated. " \
+        "Expected an instance of one of: ('float', 'int'), " \
+        "received str instead"
+    else:
+      expected_msg = \
+        "Runtime type violation detected within " \
+        "OddMean/CombinePerKey(MeanCombineFn): " \
+        "Type-hint for argument: 'element' violated: " \
+        "Union[float, int, long] type-constraint violated. " \
+        "Expected an instance of one of: ('float', 'int', 'long'), " \
+        "received str instead"
+
+    self.assertStartswith(e.exception.args[0], expected_msg)
 
   def test_count_globally_pipeline_type_checking_satisfied(self):
     d = (self.p
@@ -1775,7 +1799,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.assertEqual(
         "Type hint violation for 'CombinePerKey(CountCombineFn)': "
         "requires Tuple[TypeVariable[K], Any] "
-        "but got <type 'int'> for element",
+        "but got {} for element".format(int),
         e.exception.args[0])
 
   def test_count_perkey_runtime_type_checking_satisfied(self):
@@ -1856,7 +1880,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.assertEqual(
         "Type hint violation for 'CombinePerKey(TopCombineFn)': "
         "requires Tuple[TypeVariable[K], TypeVariable[T]] "
-        "but got <type 'int'> for element",
+        "but got {} for element".format(int),
         e.exception.args[0])
 
   def test_per_key_pipeline_checking_satisfied(self):
diff --git a/sdks/python/apache_beam/transforms/userstate_test.py b/sdks/python/apache_beam/transforms/userstate_test.py
index d3d592f..cb91cc0 100644
--- a/sdks/python/apache_beam/transforms/userstate_test.py
+++ b/sdks/python/apache_beam/transforms/userstate_test.py
@@ -29,6 +29,7 @@ from apache_beam.coders import VarIntCoder
 from apache_beam.runners.common import DoFnSignature
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.test_stream import TestStream
+from apache_beam.testing.util import equal_to
 from apache_beam.transforms import userstate
 from apache_beam.transforms.combiners import ToListCombineFn
 from apache_beam.transforms.combiners import TopCombineFn
@@ -337,13 +338,13 @@ class StatefulDoFnOnDirectRunnerTest(unittest.TestCase):
       def process(self, element, buffer=DoFn.StateParam(BUFFER_STATE),
                   timer1=DoFn.TimerParam(EXPIRY_TIMER)):
         unused_key, value = element
-        buffer.add('A' + str(value))
+        buffer.add(b'A' + str(value).encode('latin1'))
         timer1.set(20)
 
       @on_timer(EXPIRY_TIMER)
       def expiry_callback(self, buffer=DoFn.StateParam(BUFFER_STATE),
                           timer=DoFn.TimerParam(EXPIRY_TIMER)):
-        yield ''.join(sorted(buffer.read()))
+        yield b''.join(sorted(buffer.read()))
 
     with TestPipeline() as p:
       test_stream = (TestStream()
@@ -362,7 +363,7 @@ class StatefulDoFnOnDirectRunnerTest(unittest.TestCase):
     # fire after the watermark passes time 20, and another time after element
     # 4, since the timer issued at that point should fire immediately.
     self.assertEqual(
-        ['A1A2A3', 'A1A2A3A4'],
+        [b'A1A2A3', b'A1A2A3A4'],
         StatefulDoFnOnDirectRunnerTest.all_records)
 
   def test_stateful_dofn_nonkeyed_input(self):
@@ -495,7 +496,7 @@ class StatefulDoFnOnDirectRunnerTest(unittest.TestCase):
           state.add(value)
           timer.set(100)
         else:
-          yield 'Record<%s,%s,%s>' % (key, existing_values[0], value)
+          yield b'Record<%s,%s,%s>' % (key, existing_values[0], value)
           state.clear()
           timer.clear()
 
@@ -504,29 +505,28 @@ class StatefulDoFnOnDirectRunnerTest(unittest.TestCase):
         buffered = list(state.read())
         assert len(buffered) == 1, buffered
         state.clear()
-        yield 'Unmatched<%s>' % (buffered[0],)
+        yield b'Unmatched<%s>' % (buffered[0],)
 
     with TestPipeline() as p:
       test_stream = (TestStream()
                      .advance_watermark_to(10)
-                     .add_elements([('A', 'a'), ('B', 'b')])
-                     .add_elements([('A', 'aa'), ('C', 'c')])
+                     .add_elements([(b'A', b'a'), (b'B', b'b')])
+                     .add_elements([(b'A', b'aa'), (b'C', b'c')])
                      .advance_watermark_to(25)
-                     .add_elements([('A', 'aaa'), ('B', 'bb')])
-                     .add_elements([('D', 'd'), ('D', 'dd'), ('D', 'ddd'),
-                                    ('D', 'dddd')])
+                     .add_elements([(b'A', b'aaa'), (b'B', b'bb')])
+                     .add_elements([(b'D', b'd'), (b'D', b'dd'), (b'D', b'ddd'),
+                                    (b'D', b'dddd')])
                      .advance_watermark_to(125)
-                     .add_elements([('C', 'cc')]))
+                     .add_elements([(b'C', b'cc')]))
       (p
        | test_stream
        | beam.ParDo(HashJoinStatefulDoFn())
        | beam.ParDo(self.record_dofn()))
 
-    self.assertEqual(
-        ['Record<A,a,aa>', 'Record<B,b,bb>', 'Record<D,d,dd>',
-         'Record<D,ddd,dddd>', 'Unmatched<aaa>', 'Unmatched<c>',
-         'Unmatched<cc>'],
-        sorted(StatefulDoFnOnDirectRunnerTest.all_records))
+    equal_to(StatefulDoFnOnDirectRunnerTest.all_records)(
+        [b'Record<A,a,aa>', b'Record<B,b,bb>', b'Record<D,d,dd>',
+         b'Record<D,ddd,dddd>', b'Unmatched<aaa>', b'Unmatched<c>',
+         b'Unmatched<cc>'])
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index a4aec51..9eb6c30 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -102,16 +102,17 @@ else:
     cythonize = lambda *args, **kwargs: []
 
 REQUIRED_PACKAGES_PY2_ONLY = [
-    'avro>=1.8.1,<2.0.0'
+    'avro>=1.8.1,<2.0.0',
+    'dill>=0.2.6,<=0.2.8.2',
 ]
 
 REQUIRED_PACKAGES_PY3_ONLY = [
-    'avro-python3>=1.8.1,<2.0.0'
+    'avro-python3>=1.8.1,<2.0.0',
+    'dill==0.2.9.dev0'
 ]
 
 REQUIRED_PACKAGES = [
     'crcmod>=1.7,<2.0',
-    'dill>=0.2.6,<=0.2.8.2',
     'fastavro>=0.21.4,<0.22',
     'grpcio>=1.8,<2',
     'hdfs>=2.1.0,<3.0.0',
@@ -148,8 +149,13 @@ GCP_REQUIREMENTS = [
 
 if sys.version_info[0] == 2:
   REQUIRED_PACKAGES = REQUIRED_PACKAGES + REQUIRED_PACKAGES_PY2_ONLY
+  DEPENDENCY_LINKS = []
 elif sys.version_info[0] >= 3:
   REQUIRED_PACKAGES = REQUIRED_PACKAGES + REQUIRED_PACKAGES_PY3_ONLY
+  # TODO(BEAM-6135): Revert when new dill version released
+  DEPENDENCY_LINKS = ['git+https://github.com/uqfoundation/dill.git'
+                      '@7a73fbe3d6aa445f93f58f266687b7315d14a3ac'
+                      '#egg=dill-0.2.9.dev0']
 
 
 # We must generate protos after setup_requires are installed.
@@ -198,6 +204,7 @@ setuptools.setup(
         'apache_beam/utils/windowed_value.py',
     ]),
     install_requires=REQUIRED_PACKAGES,
+    dependency_links=DEPENDENCY_LINKS,
     python_requires=python_requires,
     test_suite='nose.collector',
     tests_require=REQUIRED_TEST_PACKAGES,
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 09c794b..a357a2f 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -41,7 +41,7 @@ deps =
 # Otherwise we get "OSError: [Errno 2] No such file or directory" errors.
 # Source:
 # https://github.com/tox-dev/tox/issues/123#issuecomment-284714629
-install_command = {envbindir}/python {envbindir}/pip install {opts} {packages}
+install_command = {envbindir}/python {envbindir}/pip install --process-dependency-links {opts}
{packages}
 list_dependencies_command = {envbindir}/python {envbindir}/pip freeze
 
 [testenv:py27]
@@ -58,7 +58,7 @@ setenv =
   BEAM_EXPERIMENTAL_PY3=1
   RUN_SKIPPED_PY3_TESTS=0
 modules =
-  apache_beam.typehints,apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners,apache_beam.io.hadoopfilesystem_test,apache_beam.io.hdfs_integration_test,apache_beam.io.gcp.tests.utils_test,apache_beam.io.gcp.big_query_query_to_table_it_test,apache_beam.io.gcp.bigquery_io_read_it_test,apache_beam.io.gcp.bigquery_test,apache_beam.io.gcp.
[...]
+  apache_beam.typehints,apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners,apache_beam.io.hadoopfilesystem_test,apache_beam.io.hdfs_integration_test,apache_beam.io.gcp.tests.utils_test,apache_beam.io.gcp.big_query_query_to_table_it_test,apache_beam.io.gcp.bigquery_io_read_it_test,apache_beam.io.gcp.bigquery_test,apache_beam.io.gcp.
[...]
 commands =
   python --version
   pip --version


Mime
View raw message