beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [39/50] [abbrv] incubator-beam git commit: Move all files to apache_beam folder
Date Tue, 14 Jun 2016 23:13:14 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/internal/pickler.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py
new file mode 100644
index 0000000..00f7fc7
--- /dev/null
+++ b/sdks/python/apache_beam/internal/pickler.py
@@ -0,0 +1,205 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Pickler for values, functions, and classes.
+
+Pickles created by the pickling library contain non-ASCII characters, so
+we base64-encode the results so that we can put them in a JSON objects.
+The pickler is used to embed FlatMap callable objects into the workflow JSON
+description.
+
+The pickler module should be used to pickle functions and modules; for values,
+the coders.*PickleCoder classes should be used instead.
+"""
+
+import base64
+import logging
+import sys
+import traceback
+import types
+
+import dill
+
+
+def is_nested_class(cls):
+  """Returns true if argument is a class object that appears to be nested."""
+  return (isinstance(cls, type)
+          and cls.__module__ != '__builtin__'
+          and cls.__name__ not in sys.modules[cls.__module__].__dict__)
+
+
+def find_containing_class(nested_class):
+  """Finds containing class of a nestec class passed as argument."""
+
+  def find_containing_class_inner(outer):
+    for k, v in outer.__dict__.items():
+      if v is nested_class:
+        return outer, k
+      elif isinstance(v, (type, types.ClassType)) and hasattr(v, '__dict__'):
+        res = find_containing_class_inner(v)
+        if res: return res
+
+  return find_containing_class_inner(sys.modules[nested_class.__module__])
+
+
+def _nested_type_wrapper(fun):
+  """A wrapper for the standard pickler handler for class objects.
+
+  Args:
+    fun: Original pickler handler for type objects.
+
+  Returns:
+    A wrapper for type objects that handles nested classes.
+
+  The wrapper detects if an object being pickled is a nested class object.
+  For nested class object only it will save the containing class object so
+  the nested structure is recreated during unpickle.
+  """
+
+  def wrapper(pickler, obj):
+    # When the nested class is defined in the __main__ module we do not have to
+    # do anything special because the pickler itself will save the constituent
+    # parts of the type (i.e., name, base classes, dictionary) and then
+    # recreate it during unpickling.
+    if is_nested_class(obj) and obj.__module__ != '__main__':
+      containing_class_and_name = find_containing_class(obj)
+      if containing_class_and_name is not None:
+        return pickler.save_reduce(
+            getattr, containing_class_and_name, obj=obj)
+    try:
+      return fun(pickler, obj)
+    except dill.dill.PicklingError:
+      # pylint: disable=protected-access
+      return pickler.save_reduce(
+          dill.dill._create_type,
+          (type(obj), obj.__name__, obj.__bases__,
+           dill.dill._dict_from_dictproxy(obj.__dict__)),
+          obj=obj)
+      # pylint: enable=protected-access
+
+  return wrapper
+
+# Monkey patch the standard pickler dispatch table entry for type objects.
+# Dill, for certain types, defers to the standard pickler (including type
+# objects). We wrap the standard handler using type_wrapper() because
+# for nested class we want to pickle the actual enclosing class object so we
+# can recreate it during unpickling.
+# TODO(silviuc): Make sure we submit the fix upstream to GitHub dill project.
+dill.dill.Pickler.dispatch[type] = _nested_type_wrapper(
+    dill.dill.Pickler.dispatch[type])
+
+
+# Dill pickles generators objects without complaint, but unpickling produces
+# TypeError: object.__new__(generator) is not safe, use generator.__new__()
+# on some versions of Python.
+def reject_generators(unused_pickler, unused_obj):
+  raise TypeError("can't (safely) pickle generator objects")
+dill.dill.Pickler.dispatch[types.GeneratorType] = reject_generators
+
+
+# This if guards against dill not being full initialized when generating docs.
+if 'save_module' in dir(dill.dill):
+
+  # Always pickle non-main modules by name.
+  old_save_module = dill.dill.save_module
+
+  @dill.dill.register(dill.dill.ModuleType)
+  def save_module(pickler, obj):
+    if dill.dill.is_dill(pickler) and obj is pickler._main:
+      return old_save_module(pickler, obj)
+    else:
+      dill.dill.log.info('M2: %s' % obj)
+      # pylint: disable=protected-access
+      pickler.save_reduce(dill.dill._import_module, (obj.__name__,), obj=obj)
+      # pylint: enable=protected-access
+      dill.dill.log.info('# M2')
+
+  # Pickle module dictionaries (commonly found in lambda's globals)
+  # by referencing their module.
+  old_save_module_dict = dill.dill.save_module_dict
+  known_module_dicts = {}
+
+  @dill.dill.register(dict)
+  def new_save_module_dict(pickler, obj):
+    obj_id = id(obj)
+    if not known_module_dicts or '__file__' in obj or '__package__' in obj:
+      if obj_id not in known_module_dicts:
+        for m in sys.modules.values():
+          try:
+            if m and m.__name__ != '__main__':
+              d = m.__dict__
+              known_module_dicts[id(d)] = m, d
+          except AttributeError:
+            # Skip modules that do not have the __name__ attribute.
+            pass
+    if obj_id in known_module_dicts and dill.dill.is_dill(pickler):
+      m = known_module_dicts[obj_id][0]
+      try:
+        # pylint: disable=protected-access
+        dill.dill._import_module(m.__name__)
+        return pickler.save_reduce(
+            getattr, (known_module_dicts[obj_id][0], '__dict__'), obj=obj)
+      except (ImportError, AttributeError):
+        return old_save_module_dict(pickler, obj)
+    else:
+      return old_save_module_dict(pickler, obj)
+  dill.dill.save_module_dict = new_save_module_dict
+
+
+  def _nest_dill_logging():
+    """Prefix all dill logging with its depth in the callstack.
+
+    Useful for debugging pickling of deeply nested structures.
+    """
+    old_log_info = dill.dill.log.info
+    def new_log_info(msg, *args, **kwargs):
+      old_log_info(
+          ('1 2 3 4 5 6 7 8 9 0 ' * 10)[:len(traceback.extract_stack())] + msg,
+          *args, **kwargs)
+    dill.dill.log.info = new_log_info
+
+
+# Turn off verbose logging from the dill pickler.
+logging.getLogger('dill').setLevel(logging.WARN)
+
+
+# TODO(ccy): Currently, there are still instances of pickler.dumps() and
+# pickler.loads() being used for data, which results in an unnecessary base64
+# encoding.  This should be cleaned up.
+def dumps(o):
+  try:
+    return base64.b64encode(dill.dumps(o))
+  except Exception:          # pylint: disable=broad-except
+    dill.dill._trace(True)   # pylint: disable=protected-access
+    return base64.b64encode(dill.dumps(o))
+  finally:
+    dill.dill._trace(False)  # pylint: disable=protected-access
+
+
+def loads(s):
+  try:
+    return dill.loads(base64.b64decode(s))
+  except Exception:          # pylint: disable=broad-except
+    dill.dill._trace(True)   # pylint: disable=protected-access
+    return dill.loads(base64.b64decode(s))
+  finally:
+    dill.dill._trace(False)  # pylint: disable=protected-access
+
+
+def dump_session(file_path):
+  return dill.dump_session(file_path)
+
+
+def load_session(file_path):
+  return dill.load_session(file_path)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/internal/pickler_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/pickler_test.py b/sdks/python/apache_beam/internal/pickler_test.py
new file mode 100644
index 0000000..4d90084
--- /dev/null
+++ b/sdks/python/apache_beam/internal/pickler_test.py
@@ -0,0 +1,78 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unit tests for the pickler module."""
+
+import unittest
+
+from google.cloud.dataflow.internal import module_test
+from google.cloud.dataflow.internal.pickler import dumps
+from google.cloud.dataflow.internal.pickler import loads
+
+
+class PicklerTest(unittest.TestCase):
+
+  def test_basics(self):
+    self.assertEquals([1, 'a', (u'z',)], loads(dumps([1, 'a', (u'z',)])))
+    fun = lambda x: 'xyz-%s' % x
+    self.assertEquals('xyz-abc', loads(dumps(fun))('abc'))
+
+  def test_lambda_with_globals(self):
+    """Tests that the globals of a function are preserved."""
+
+    # The point of the test is that the lambda being called after unpickling
+    # relies on having the re module being loaded.
+    self.assertEquals(
+        ['abc', 'def'],
+        loads(dumps(module_test.get_lambda_with_globals()))('abc def'))
+
+  def test_lambda_with_closure(self):
+    """Tests that the closure of a function is preserved."""
+    self.assertEquals(
+        'closure: abc',
+        loads(dumps(module_test.get_lambda_with_closure('abc')))())
+
+  def test_class(self):
+    """Tests that a class object is pickled correctly."""
+    self.assertEquals(
+        ['abc', 'def'],
+        loads(dumps(module_test.Xyz))().foo('abc def'))
+
+  def test_object(self):
+    """Tests that a class instance is pickled correctly."""
+    self.assertEquals(
+        ['abc', 'def'],
+        loads(dumps(module_test.XYZ_OBJECT)).foo('abc def'))
+
+  def test_nested_class(self):
+    """Tests that a nested class object is pickled correctly."""
+    self.assertEquals(
+        'X:abc',
+        loads(dumps(module_test.TopClass.NestedClass('abc'))).datum)
+    self.assertEquals(
+        'Y:abc',
+        loads(dumps(module_test.TopClass.MiddleClass.NestedClass('abc'))).datum)
+
+  def test_dynamic_class(self):
+    """Tests that a nested class object is pickled correctly."""
+    self.assertEquals(
+        'Z:abc',
+        loads(dumps(module_test.create_class('abc'))).get())
+
+  def test_generators(self):
+    with self.assertRaises(TypeError):
+      dumps((_ for _ in range(10)))
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/internal/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/util.py b/sdks/python/apache_beam/internal/util.py
new file mode 100644
index 0000000..c45f3f3
--- /dev/null
+++ b/sdks/python/apache_beam/internal/util.py
@@ -0,0 +1,90 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Utility functions used throughout the dataflow package."""
+
+
+class ArgumentPlaceholder(object):
+  """A place holder object replacing PValues in argument lists.
+
+  A Fn object can take any number of "side inputs", which are PValues that will
+  be evaluated during pipeline execution and will be provided to the function
+  at the moment of its execution as positional or keyword arguments.
+
+  This is used only internally and should never be used by user code. A custom
+  Fn object by the time it executes will have such values replaced with real
+  computed values.
+  """
+
+  def __eq__(self, other):
+    """Tests for equality of two placeholder objects.
+
+    Args:
+      other: Another placeholder object to compare to.
+
+    This method is used only for test code. All placeholder objects are
+    equal to each other.
+    """
+    return isinstance(other, ArgumentPlaceholder)
+
+
+def remove_objects_from_args(args, kwargs, pvalue_classes):
+  """Replaces all objects of a given type in args/kwargs with a placeholder.
+
+  Args:
+    args: A list of positional arguments.
+    kwargs: A dictionary of keyword arguments.
+    pvalue_classes: A tuple of class objects representing the types of the
+      arguments that must be replaced with a placeholder value (instance of
+      ArgumentPlaceholder)
+
+  Returns:
+    A 3-tuple containing a modified list of positional arguments, a modified
+    dictionary of keyword arguments, and a list of all objects replaced with
+    a placeholder value.
+  """
+  pvals = []
+  def swapper(value):
+    pvals.append(value)
+    return ArgumentPlaceholder()
+  new_args = [swapper(v) if isinstance(v, pvalue_classes) else v for v in args]
+  # Make sure the order in which we process the dictionary keys is predictable
+  # by sorting the entries first. This will be important when putting back
+  # PValues.
+  new_kwargs = dict((k, swapper(v)) if isinstance(v, pvalue_classes) else (k, v)
+                    for k, v in sorted(kwargs.iteritems()))
+  return (new_args, new_kwargs, pvals)
+
+
+def insert_values_in_args(args, kwargs, values):
+  """Replaces all placeholders in args/kwargs with actual values.
+
+  Args:
+    args: A list of positional arguments.
+    kwargs: A dictionary of keyword arguments.
+    values: A list of values that will be used to replace placeholder values.
+
+  Returns:
+    A 2-tuple containing a modified list of positional arguments, and a
+    modified dictionary of keyword arguments.
+  """
+  # Use a local iterator so that we don't modify values.
+  v_iter = iter(values)
+  new_args = [
+      v_iter.next() if isinstance(arg, ArgumentPlaceholder) else arg
+      for arg in args]
+  new_kwargs = dict(
+      (k, v_iter.next()) if isinstance(v, ArgumentPlaceholder) else (k, v)
+      for k, v in sorted(kwargs.iteritems()))
+  return (new_args, new_kwargs)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/internal/util_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/util_test.py b/sdks/python/apache_beam/internal/util_test.py
new file mode 100644
index 0000000..6a2fc93
--- /dev/null
+++ b/sdks/python/apache_beam/internal/util_test.py
@@ -0,0 +1,58 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unit tests for the util module."""
+
+import unittest
+
+from google.cloud.dataflow.internal.util import ArgumentPlaceholder
+from google.cloud.dataflow.internal.util import insert_values_in_args
+from google.cloud.dataflow.internal.util import remove_objects_from_args
+
+
+class UtilTest(unittest.TestCase):
+
+  def test_remove_objects_from_args(self):
+    args, kwargs, objs = remove_objects_from_args(
+        [1, 'a'], {'x': 1, 'y': 3.14}, (str, float))
+    self.assertEquals([1, ArgumentPlaceholder()], args)
+    self.assertEquals({'x': 1, 'y': ArgumentPlaceholder()}, kwargs)
+    self.assertEquals(['a', 3.14], objs)
+
+  def test_remove_objects_from_args_nothing_to_remove(self):
+    args, kwargs, objs = remove_objects_from_args(
+        [1, 2], {'x': 1, 'y': 2}, (str, float))
+    self.assertEquals([1, 2], args)
+    self.assertEquals({'x': 1, 'y': 2}, kwargs)
+    self.assertEquals([], objs)
+
+  def test_insert_values_in_args(self):
+    values = ['a', 'b']
+    args = [1, ArgumentPlaceholder()]
+    kwargs = {'x': 1, 'y': ArgumentPlaceholder()}
+    args, kwargs = insert_values_in_args(args, kwargs, values)
+    self.assertEquals([1, 'a'], args)
+    self.assertEquals({'x': 1, 'y': 'b'}, kwargs)
+
+  def test_insert_values_in_args_nothing_to_insert(self):
+    values = []
+    args = [1, 'a']
+    kwargs = {'x': 1, 'y': 'b'}
+    args, kwargs = insert_values_in_args(args, kwargs, values)
+    self.assertEquals([1, 'a'], args)
+    self.assertEquals({'x': 1, 'y': 'b'}, kwargs)
+
+
+if __name__ == '__main__':
+  unittest.main()


Mime
View raw message