flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [06/10] flink git commit: [FLINK-671] Python API
Date Tue, 21 Apr 2015 13:47:42 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/__init__.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/__init__.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/__init__.py
new file mode 100644
index 0000000..d35bf39
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py
new file mode 100644
index 0000000..cebdc5d
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py
@@ -0,0 +1,247 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+#!/usr/bin/env python
+#
+# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
+# Copyright (c) 2008-2014 California Institute of Technology.
+# License: 3-clause BSD.  The full license text is available at:
+#  - http://trac.mystic.cacr.caltech.edu/project/pathos/browser/dill/LICENSE
+
+"""
+Module to show if an object has changed since it was memorised
+"""
+
+import os
+import sys
+import types
+try:
+    import numpy
+    HAS_NUMPY = True
+except:
+    HAS_NUMPY = False
+try:
+    import builtins
+except ImportError:
+    import __builtin__ as builtins
+
+# memo of objects indexed by id to a tuple (attributes, sequence items)
+# attributes is a dict indexed by attribute name to attribute id
+# sequence items is either a list of ids, of a dictionary of keys to ids
+memo = {}
+id_to_obj = {}
+# types that cannot have changing attributes
+builtins_types = set((str, list, dict, set, frozenset, int))
+dont_memo = set(id(i) for i in (memo, sys.modules, sys.path_importer_cache,
+             os.environ, id_to_obj))
+
+
+def get_attrs(obj):
+    """
+    Gets all the attributes of an object though its __dict__ or return None
+    """
+    if type(obj) in builtins_types \
+       or type(obj) is type and obj in builtins_types:
+        return
+    try:
+        return obj.__dict__
+    except:
+        return
+
+
+def get_seq(obj, cache={str: False, frozenset: False, list: True, set: True,
+                        dict: True, tuple: True, type: False,
+                        types.ModuleType: False, types.FunctionType: False,
+                        types.BuiltinFunctionType: False}):
+    """
+    Gets all the items in a sequence or return None
+    """
+    o_type = type(obj)
+    hsattr = hasattr
+    if o_type in cache:
+        if cache[o_type]:
+            if hsattr(obj, "copy"):
+                return obj.copy()
+            return obj
+    elif HAS_NUMPY and o_type in (numpy.ndarray, numpy.ma.core.MaskedConstant):
+        if obj.shape and obj.size:
+            return obj
+        else:
+            return []
+    elif hsattr(obj, "__contains__") and hsattr(obj, "__iter__") \
+       and hsattr(obj, "__len__") and hsattr(o_type, "__contains__") \
+       and hsattr(o_type, "__iter__") and hsattr(o_type, "__len__"):
+        cache[o_type] = True
+        if hsattr(obj, "copy"):
+            return obj.copy()
+        return obj
+    else:
+        cache[o_type] = False
+        return None
+
+
+def memorise(obj, force=False):
+    """
+    Adds an object to the memo, and recursively adds all the objects
+    attributes, and if it is a container, its items. Use force=True to update
+    an object already in the memo. Updating is not recursively done.
+    """
+    obj_id = id(obj)
+    if obj_id in memo and not force or obj_id in dont_memo:
+        return
+    id_ = id
+    g = get_attrs(obj)
+    if g is None:
+        attrs_id = None
+    else:
+        attrs_id = dict((key,id_(value)) for key, value in g.items())
+
+    s = get_seq(obj)
+    if s is None:
+        seq_id = None
+    elif hasattr(s, "items"):
+        seq_id = dict((id_(key),id_(value)) for key, value in s.items())
+    else:
+        seq_id = [id_(i) for i in s]
+
+    memo[obj_id] = attrs_id, seq_id
+    id_to_obj[obj_id] = obj
+    mem = memorise
+    if g is not None:
+        [mem(value) for key, value in g.items()]
+
+    if s is not None:
+        if hasattr(s, "items"):
+            [(mem(key), mem(item))
+             for key, item in s.items()]
+        else:
+            [mem(item) for item in s]
+
+
+def release_gone():
+    itop, mp, src = id_to_obj.pop, memo.pop, sys.getrefcount
+    [(itop(id_), mp(id_)) for id_, obj in list(id_to_obj.items())
+     if src(obj) < 4]
+
+
+def whats_changed(obj, seen=None, simple=False, first=True):
+    """
+    Check an object against the memo. Returns a list in the form
+    (attribute changes, container changed). Attribute changes is a dict of
+    attribute name to attribute value. container changed is a boolean.
+    If simple is true, just returns a boolean. None for either item means
+    that it has not been checked yet
+    """
+    # Special cases
+    if first:
+        # ignore the _ variable, which only appears in interactive sessions
+        if "_" in builtins.__dict__:
+            del builtins._
+        if seen is None:
+            seen = {}
+
+    obj_id = id(obj)
+
+    if obj_id in seen:
+        if simple:
+            return any(seen[obj_id])
+        return seen[obj_id]
+
+    # Safety checks
+    if obj_id in dont_memo:
+        seen[obj_id] = [{}, False]
+        if simple:
+            return False
+        return seen[obj_id]
+    elif obj_id not in memo:
+        if simple:
+            return True
+        else:
+            raise RuntimeError("Object not memorised " + str(obj))
+
+    seen[obj_id] = ({}, False)
+
+    chngd = whats_changed
+    id_ = id
+
+    # compare attributes
+    attrs = get_attrs(obj)
+    if attrs is None:
+        changed = {}
+    else:
+        obj_attrs = memo[obj_id][0]
+        obj_get = obj_attrs.get
+        changed = dict((key,None) for key in obj_attrs if key not in attrs)
+        for key, o in attrs.items():
+            if id_(o) != obj_get(key, None) or chngd(o, seen, True, False):
+                changed[key] = o
+
+    # compare sequence
+    items = get_seq(obj)
+    seq_diff = False
+    if items is not None:
+        obj_seq = memo[obj_id][1]
+        if len(items) != len(obj_seq):
+            seq_diff = True
+        elif hasattr(obj, "items"):  # dict type obj
+            obj_get = obj_seq.get
+            for key, item in items.items():
+                if id_(item) != obj_get(id_(key)) \
+                   or chngd(key, seen, True, False) \
+                   or chngd(item, seen, True, False):
+                    seq_diff = True
+                    break
+        else:
+            for i, j in zip(items, obj_seq):  # list type obj
+                if id_(i) != j or chngd(i, seen, True, False):
+                    seq_diff = True
+                    break
+    seen[obj_id] = changed, seq_diff
+    if simple:
+        return changed or seq_diff
+    return changed, seq_diff
+
+
+def has_changed(*args, **kwds):
+    kwds['simple'] = True  # ignore simple if passed in
+    return whats_changed(*args, **kwds)
+
+__import__ = __import__
+
+
+def _imp(*args, **kwds):
+    """
+    Replaces the default __import__, to allow a module to be memorised
+    before the user can change it
+    """
+    before = set(sys.modules.keys())
+    mod = __import__(*args, **kwds)
+    after = set(sys.modules.keys()).difference(before)
+    for m in after:
+        memorise(sys.modules[m])
+    return mod
+
+builtins.__import__ = _imp
+if hasattr(builtins, "_"):
+    del builtins._
+
+# memorise all already imported modules. This implies that this must be
+# imported first for any changes to be recorded
+for mod in sys.modules.values():
+    memorise(mod)
+release_gone()

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py
new file mode 100644
index 0000000..b4cbc34
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py
@@ -0,0 +1,91 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+#!/usr/bin/env python
+#
+# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
+# Copyright (c) 2008-2014 California Institute of Technology.
+# License: 3-clause BSD.  The full license text is available at:
+#  - http://trac.mystic.cacr.caltech.edu/project/pathos/browser/dill/LICENSE
+
+from __future__ import absolute_import
+
+from .dill import dump, dumps, load, loads, dump_session, load_session, \
+    Pickler, Unpickler, register, copy, pickle, pickles, \
+    HIGHEST_PROTOCOL, DEFAULT_PROTOCOL, PicklingError, UnpicklingError, \
+    HANDLE_FMODE, CONTENTS_FMODE, FILE_FMODE
+from . import source, temp, detect
+
+# make sure "trace" is turned off
+detect.trace(False)
+
+try:
+    from imp import reload
+except ImportError:
+    pass
+
+# put the objects in order, if possible
+try:
+    from collections import OrderedDict as odict
+except ImportError:
+    try:
+        from ordereddict import OrderedDict as odict
+    except ImportError:
+        odict = dict
+objects = odict()
+# local import of dill._objects
+#from . import _objects
+#objects.update(_objects.succeeds)
+#del _objects
+
+# local import of dill.objtypes
+from . import objtypes as types
+
+def load_types(pickleable=True, unpickleable=True):
+    """load pickleable and/or unpickleable types to dill.types"""
+    # local import of dill.objects
+    from . import _objects
+    if pickleable:
+        objects.update(_objects.succeeds)
+    else:
+        [objects.pop(obj,None) for obj in _objects.succeeds]
+    if unpickleable:
+        objects.update(_objects.failures)
+    else:
+        [objects.pop(obj,None) for obj in _objects.failures]
+    objects.update(_objects.registered)
+    del _objects
+    # reset contents of types to 'empty'
+    [types.__dict__.pop(obj) for obj in list(types.__dict__.keys()) \
+                             if obj.find('Type') != -1]
+    # add corresponding types from objects to types
+    reload(types)
+
+def extend(use_dill=True):
+    '''add (or remove) dill types to/from pickle'''
+    from .dill import _revert_extension, _extend
+    if use_dill: _extend()
+    else: _revert_extension()
+    return
+
+extend()
+
+del absolute_import
+del odict
+
+# end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py
new file mode 100644
index 0000000..457ae1e
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py
@@ -0,0 +1,548 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+#!/usr/bin/env python
+#
+# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
+# Copyright (c) 2008-2014 California Institute of Technology.
+# License: 3-clause BSD.  The full license text is available at:
+#  - http://trac.mystic.cacr.caltech.edu/project/pathos/browser/dill/LICENSE
+"""
+all Python Standard Library objects (currently: CH 1-15 @ 2.7)
+and some other common objects (i.e. numpy.ndarray)
+"""
+
+__all__ = ['registered','failures','succeeds']
+
+# helper imports
+import warnings; warnings.filterwarnings("ignore", category=DeprecationWarning)
+import sys
+PY3 = (hex(sys.hexversion) >= '0x30000f0')
+if PY3:
+    import queue as Queue
+    import dbm as anydbm
+else:
+    import Queue
+    import anydbm
+    import sets # deprecated/removed
+    import mutex # removed
+try:
+    from cStringIO import StringIO # has StringI and StringO types
+except ImportError: # only has StringIO type
+    if PY3:
+        from io import BytesIO as StringIO
+    else:
+        from StringIO import StringIO
+import re
+import array
+import collections
+import codecs
+import struct
+import datetime
+import calendar
+import weakref
+import pprint
+import decimal
+import functools
+import itertools
+import operator
+import tempfile
+import shelve
+import zlib
+import gzip
+import zipfile
+import tarfile
+import xdrlib
+import csv
+import hashlib
+import hmac
+import os
+import logging
+import optparse
+import curses
+#import __hello__
+import threading
+import socket
+import contextlib
+try:
+    import bz2
+    import sqlite3
+    if PY3: import dbm.ndbm as dbm
+    else: import dbm
+    HAS_ALL = True
+except ImportError: # Ubuntu
+    HAS_ALL = False
+try:
+    import ctypes
+    HAS_CTYPES = True
+except ImportError: # MacPorts
+    HAS_CTYPES = False
+from curses import textpad, panel
+
+# helper objects
+class _class:
+    def _method(self):
+        pass
+#   @classmethod
+#   def _clsmethod(cls): #XXX: test me
+#       pass
+#   @staticmethod
+#   def _static(self): #XXX: test me
+#       pass
+class _class2:
+    def __call__(self):
+        pass
+_instance2 = _class2()
+class _newclass(object):
+    def _method(self):
+        pass
+#   @classmethod
+#   def _clsmethod(cls): #XXX: test me
+#       pass
+#   @staticmethod
+#   def _static(self): #XXX: test me
+#       pass
+def _function(x): yield x
+def _function2():
+    try: raise
+    except:
+        from sys import exc_info
+        e, er, tb = exc_info()
+        return er, tb
+if HAS_CTYPES:
+    class _Struct(ctypes.Structure):
+        pass
+    _Struct._fields_ = [("_field", ctypes.c_int),("next", ctypes.POINTER(_Struct))]
+_filedescrip, _tempfile = tempfile.mkstemp('r') # deleted in cleanup
+_tmpf = tempfile.TemporaryFile('w')
+
+# put the objects in order, if possible
+try:
+    from collections import OrderedDict as odict
+except ImportError:
+    try:
+        from ordereddict import OrderedDict as odict
+    except ImportError:
+        odict = dict
+# objects used by dill for type declaration
+registered = d = odict()
+# objects dill fails to pickle
+failures = x = odict()
+# all other type objects
+succeeds = a = odict()
+
+# types module (part of CH 8)
+a['BooleanType'] = bool(1)
+a['BuiltinFunctionType'] = len
+a['BuiltinMethodType'] = a['BuiltinFunctionType']
+a['BytesType'] = _bytes = codecs.latin_1_encode('\x00')[0] # bytes(1)
+a['ClassType'] = _class
+a['ComplexType'] = complex(1)
+a['DictType'] = _dict = {}
+a['DictionaryType'] = a['DictType']
+a['FloatType'] = float(1)
+a['FunctionType'] = _function
+a['InstanceType'] = _instance = _class()
+a['IntType'] = _int = int(1)
+a['ListType'] = _list = []
+a['NoneType'] = None
+a['ObjectType'] = object()
+a['StringType'] = _str = str(1)
+a['TupleType'] = _tuple = ()
+a['TypeType'] = type
+if PY3:
+    a['LongType'] = _int
+    a['UnicodeType'] = _str
+else:
+    a['LongType'] = long(1)
+    a['UnicodeType'] = unicode(1)
+# built-in constants (CH 4)
+a['CopyrightType'] = copyright
+# built-in types (CH 5)
+a['ClassObjectType'] = _newclass # <type 'type'>
+a['ClassInstanceType'] = _newclass() # <type 'class'>
+a['SetType'] = _set = set()
+a['FrozenSetType'] = frozenset()
+# built-in exceptions (CH 6)
+a['ExceptionType'] = _exception = _function2()[0]
+# string services (CH 7)
+a['SREPatternType'] = _srepattern = re.compile('')
+# data types (CH 8)
+a['ArrayType'] = array.array("f")
+a['DequeType'] = collections.deque([0])
+a['DefaultDictType'] = collections.defaultdict(_function, _dict)
+a['TZInfoType'] = datetime.tzinfo()
+a['DateTimeType'] = datetime.datetime.today()
+a['CalendarType'] = calendar.Calendar()
+if not PY3:
+    a['SetsType'] = sets.Set()
+    a['ImmutableSetType'] = sets.ImmutableSet()
+    a['MutexType'] = mutex.mutex()
+# numeric and mathematical types (CH 9)
+a['DecimalType'] = decimal.Decimal(1)
+a['CountType'] = itertools.count(0)
+# data compression and archiving (CH 12)
+a['TarInfoType'] = tarfile.TarInfo()
+# generic operating system services (CH 15)
+a['LoggerType'] = logging.getLogger()
+a['FormatterType'] = logging.Formatter() # pickle ok
+a['FilterType'] = logging.Filter() # pickle ok
+a['LogRecordType'] = logging.makeLogRecord(_dict) # pickle ok
+a['OptionParserType'] = _oparser = optparse.OptionParser() # pickle ok
+a['OptionGroupType'] = optparse.OptionGroup(_oparser,"foo") # pickle ok
+a['OptionType'] = optparse.Option('--foo') # pickle ok
+if HAS_CTYPES:
+    a['CCharType'] = _cchar = ctypes.c_char()
+    a['CWCharType'] = ctypes.c_wchar() # fail == 2.6
+    a['CByteType'] = ctypes.c_byte()
+    a['CUByteType'] = ctypes.c_ubyte()
+    a['CShortType'] = ctypes.c_short()
+    a['CUShortType'] = ctypes.c_ushort()
+    a['CIntType'] = ctypes.c_int()
+    a['CUIntType'] = ctypes.c_uint()
+    a['CLongType'] = ctypes.c_long()
+    a['CULongType'] = ctypes.c_ulong()
+    a['CLongLongType'] = ctypes.c_longlong()
+    a['CULongLongType'] = ctypes.c_ulonglong()
+    a['CFloatType'] = ctypes.c_float()
+    a['CDoubleType'] = ctypes.c_double()
+    a['CSizeTType'] = ctypes.c_size_t()
+    a['CLibraryLoaderType'] = ctypes.cdll
+    a['StructureType'] = _Struct
+    a['BigEndianStructureType'] = ctypes.BigEndianStructure()
+#NOTE: also LittleEndianStructureType and UnionType... abstract classes
+#NOTE: remember for ctypesobj.contents creates a new python object
+#NOTE: ctypes.c_int._objects is memberdescriptor for object's __dict__
+#NOTE: base class of all ctypes data types is non-public _CData
+
+try: # python 2.6
+    import fractions
+    import number
+    import io
+    from io import StringIO as TextIO
+    # built-in functions (CH 2)
+    a['ByteArrayType'] = bytearray([1])
+    # numeric and mathematical types (CH 9)
+    a['FractionType'] = fractions.Fraction()
+    a['NumberType'] = numbers.Number()
+    # generic operating system services (CH 15)
+    a['IOBaseType'] = io.IOBase()
+    a['RawIOBaseType'] = io.RawIOBase()
+    a['TextIOBaseType'] = io.TextIOBase()
+    a['BufferedIOBaseType'] = io.BufferedIOBase()
+    a['UnicodeIOType'] = TextIO() # the new StringIO
+    a['LoggingAdapterType'] = logging.LoggingAdapter(_logger,_dict) # pickle ok
+    if HAS_CTYPES:
+        a['CBoolType'] = ctypes.c_bool(1)
+        a['CLongDoubleType'] = ctypes.c_longdouble()
+except ImportError:
+    pass
+try: # python 2.7
+    import argparse
+    # data types (CH 8)
+    a['OrderedDictType'] = collections.OrderedDict(_dict)
+    a['CounterType'] = collections.Counter(_dict)
+    if HAS_CTYPES:
+        a['CSSizeTType'] = ctypes.c_ssize_t()
+    # generic operating system services (CH 15)
+    a['NullHandlerType'] = logging.NullHandler() # pickle ok  # new 2.7
+    a['ArgParseFileType'] = argparse.FileType() # pickle ok
+#except AttributeError:
+except ImportError:
+    pass
+
+# -- pickle fails on all below here -----------------------------------------
+# types module (part of CH 8)
+a['CodeType'] = compile('','','exec')
+a['DictProxyType'] = type.__dict__
+a['DictProxyType2'] = _newclass.__dict__
+a['EllipsisType'] = Ellipsis
+a['ClosedFileType'] = open(os.devnull, 'wb', buffering=0).close()
+a['GetSetDescriptorType'] = array.array.typecode
+a['LambdaType'] = _lambda = lambda x: lambda y: x #XXX: works when not imported!
+a['MemberDescriptorType'] = type.__dict__['__weakrefoffset__']
+a['MemberDescriptorType2'] = datetime.timedelta.days
+a['MethodType'] = _method = _class()._method #XXX: works when not imported!
+a['ModuleType'] = datetime
+a['NotImplementedType'] = NotImplemented
+a['SliceType'] = slice(1)
+a['UnboundMethodType'] = _class._method #XXX: works when not imported!
+a['TextWrapperType'] = open(os.devnull, 'r') # same as mode='w','w+','r+'
+a['BufferedRandomType'] = open(os.devnull, 'r+b') # same as mode='w+b'
+a['BufferedReaderType'] = open(os.devnull, 'rb') # (default: buffering=-1)
+a['BufferedWriterType'] = open(os.devnull, 'wb')
+try: # oddities: deprecated
+    from _pyio import open as _open
+    a['PyTextWrapperType'] = _open(os.devnull, 'r', buffering=-1)
+    a['PyBufferedRandomType'] = _open(os.devnull, 'r+b', buffering=-1)
+    a['PyBufferedReaderType'] = _open(os.devnull, 'rb', buffering=-1)
+    a['PyBufferedWriterType'] = _open(os.devnull, 'wb', buffering=-1)
+except ImportError:
+    pass
+# other (concrete) object types
+if PY3:
+    d['CellType'] = (_lambda)(0).__closure__[0]
+    a['XRangeType'] = _xrange = range(1)
+else:
+    d['CellType'] = (_lambda)(0).func_closure[0]
+    a['XRangeType'] = _xrange = xrange(1)
+d['MethodDescriptorType'] = type.__dict__['mro']
+d['WrapperDescriptorType'] = type.__repr__
+a['WrapperDescriptorType2'] = type.__dict__['__module__']
+# built-in functions (CH 2)
+if PY3: _methodwrap = (1).__lt__
+else: _methodwrap = (1).__cmp__
+d['MethodWrapperType'] = _methodwrap
+a['StaticMethodType'] = staticmethod(_method)
+a['ClassMethodType'] = classmethod(_method)
+a['PropertyType'] = property()
+d['SuperType'] = super(Exception, _exception)
+# string services (CH 7)
+if PY3: _in = _bytes
+else: _in = _str
+a['InputType'] = _cstrI = StringIO(_in)
+a['OutputType'] = _cstrO = StringIO()
+# data types (CH 8)
+a['WeakKeyDictionaryType'] = weakref.WeakKeyDictionary()
+a['WeakValueDictionaryType'] = weakref.WeakValueDictionary()
+a['ReferenceType'] = weakref.ref(_instance)
+a['DeadReferenceType'] = weakref.ref(_class())
+a['ProxyType'] = weakref.proxy(_instance)
+a['DeadProxyType'] = weakref.proxy(_class())
+a['CallableProxyType'] = weakref.proxy(_instance2)
+a['DeadCallableProxyType'] = weakref.proxy(_class2())
+a['QueueType'] = Queue.Queue()
+# numeric and mathematical types (CH 9)
+d['PartialType'] = functools.partial(int,base=2)
+if PY3:
+    a['IzipType'] = zip('0','1')
+else:
+    a['IzipType'] = itertools.izip('0','1')
+a['ChainType'] = itertools.chain('0','1')
+d['ItemGetterType'] = operator.itemgetter(0)
+d['AttrGetterType'] = operator.attrgetter('__repr__')
+# file and directory access (CH 10)
+if PY3: _fileW = _cstrO
+else: _fileW = _tmpf
+# data persistence (CH 11)
+if HAS_ALL:
+    a['ConnectionType'] = _conn = sqlite3.connect(':memory:')
+    a['CursorType'] = _conn.cursor()
+a['ShelveType'] = shelve.Shelf({})
+# data compression and archiving (CH 12)
+if HAS_ALL:
+    a['BZ2FileType'] = bz2.BZ2File(os.devnull) #FIXME: fail >= 3.3
+    a['BZ2CompressorType'] = bz2.BZ2Compressor()
+    a['BZ2DecompressorType'] = bz2.BZ2Decompressor()
+#a['ZipFileType'] = _zip = zipfile.ZipFile(os.devnull,'w') #FIXME: fail >= 3.2
+#_zip.write(_tempfile,'x') [causes annoying warning/error printed on import]
+#a['ZipInfoType'] = _zip.getinfo('x')
+a['TarFileType'] = tarfile.open(fileobj=_fileW,mode='w')
+# file formats (CH 13)
+a['DialectType'] = csv.get_dialect('excel')
+a['PackerType'] = xdrlib.Packer()
+# optional operating system services (CH 16)
+a['LockType'] = threading.Lock()
+a['RLockType'] = threading.RLock()
+# generic operating system services (CH 15) # also closed/open and r/w/etc...
+a['NamedLoggerType'] = _logger = logging.getLogger(__name__) #FIXME: fail >= 3.2 and <= 2.6
+#a['FrozenModuleType'] = __hello__ #FIXME: prints "Hello world..."
+# interprocess communication (CH 17)
+if PY3:
+    a['SocketType'] = _socket = socket.socket() #FIXME: fail >= 3.3
+    a['SocketPairType'] = socket.socketpair()[0] #FIXME: fail >= 3.3
+else:
+    a['SocketType'] = _socket = socket.socket()
+    a['SocketPairType'] = _socket._sock
+# python runtime services (CH 27)
+if PY3:
+    a['GeneratorContextManagerType'] = contextlib.contextmanager(max)([1])
+else:
+    a['GeneratorContextManagerType'] = contextlib.GeneratorContextManager(max)
+
+try: # ipython
+    __IPYTHON__ is True # is ipython
+except NameError:
+    # built-in constants (CH 4)
+    a['QuitterType'] = quit
+    d['ExitType'] = a['QuitterType']
+try: # numpy
+    from numpy import ufunc as _numpy_ufunc
+    from numpy import array as _numpy_array
+    from numpy import int32 as _numpy_int32
+    a['NumpyUfuncType'] = _numpy_ufunc
+    a['NumpyArrayType'] = _numpy_array
+    a['NumpyInt32Type'] = _numpy_int32
+except ImportError:
+    pass
+try: # python 2.6
+    # numeric and mathematical types (CH 9)
+    a['ProductType'] = itertools.product('0','1')
+    # generic operating system services (CH 15)
+    a['FileHandlerType'] = logging.FileHandler(os.devnull) #FIXME: fail >= 3.2 and <= 2.6
+    a['RotatingFileHandlerType'] = logging.handlers.RotatingFileHandler(os.devnull)
+    a['SocketHandlerType'] = logging.handlers.SocketHandler('localhost',514)
+    a['MemoryHandlerType'] = logging.handlers.MemoryHandler(1)
+except AttributeError:
+    pass
+try: # python 2.7
+    # data types (CH 8)
+    a['WeakSetType'] = weakref.WeakSet() # 2.7
+#   # generic operating system services (CH 15) [errors when dill is imported]
+#   a['ArgumentParserType'] = _parser = argparse.ArgumentParser('PROG')
+#   a['NamespaceType'] = _parser.parse_args() # pickle ok
+#   a['SubParsersActionType'] = _parser.add_subparsers()
+#   a['MutuallyExclusiveGroupType'] = _parser.add_mutually_exclusive_group()
+#   a['ArgumentGroupType'] = _parser.add_argument_group()
+except AttributeError:
+    pass
+
+# -- dill fails in some versions below here ---------------------------------
+# types module (part of CH 8)
+a['FileType'] = open(os.devnull, 'rb', buffering=0) # same 'wb','wb+','rb+'
+# FIXME: FileType fails >= 3.1
+# built-in functions (CH 2)
+a['ListIteratorType'] = iter(_list) # empty vs non-empty FIXME: fail < 3.2
+a['TupleIteratorType']= iter(_tuple) # empty vs non-empty FIXME: fail < 3.2
+a['XRangeIteratorType'] = iter(_xrange) # empty vs non-empty FIXME: fail < 3.2
+# data types (CH 8)
+a['PrettyPrinterType'] = pprint.PrettyPrinter() #FIXME: fail >= 3.2 and == 2.5
+# numeric and mathematical types (CH 9)
+a['CycleType'] = itertools.cycle('0') #FIXME: fail < 3.2
+# file and directory access (CH 10)
+a['TemporaryFileType'] = _tmpf #FIXME: fail >= 3.2 and == 2.5
+# data compression and archiving (CH 12)
+a['GzipFileType'] = gzip.GzipFile(fileobj=_fileW) #FIXME: fail > 3.2 and <= 2.6
+# generic operating system services (CH 15)
+a['StreamHandlerType'] = logging.StreamHandler() #FIXME: fail >= 3.2 and == 2.5
+try: # python 2.6
+    # numeric and mathematical types (CH 9)
+    a['PermutationsType'] = itertools.permutations('0') #FIXME: fail < 3.2
+    a['CombinationsType'] = itertools.combinations('0',1) #FIXME: fail < 3.2
+except AttributeError:
+    pass
+try: # python 2.7
+    # numeric and mathematical types (CH 9)
+    a['RepeatType'] = itertools.repeat(0) #FIXME: fail < 3.2
+    a['CompressType'] = itertools.compress('0',[1]) #FIXME: fail < 3.2
+    #XXX: ...and etc
+except AttributeError:
+    pass
+
+# -- dill fails on all below here -------------------------------------------
+# types module (part of CH 8)
+x['GeneratorType'] = _generator = _function(1) #XXX: priority
+x['FrameType'] = _generator.gi_frame #XXX: inspect.currentframe()
+x['TracebackType'] = _function2()[1] #(see: inspect.getouterframes,getframeinfo)
+# other (concrete) object types
+# (also: Capsule / CObject ?)
+# built-in functions (CH 2)
+x['SetIteratorType'] = iter(_set) #XXX: empty vs non-empty
+# built-in types (CH 5)
+if PY3:
+    x['DictionaryItemIteratorType'] = iter(type.__dict__.items())
+    x['DictionaryKeyIteratorType'] = iter(type.__dict__.keys())
+    x['DictionaryValueIteratorType'] = iter(type.__dict__.values())
+else:
+    x['DictionaryItemIteratorType'] = type.__dict__.iteritems()
+    x['DictionaryKeyIteratorType'] = type.__dict__.iterkeys()
+    x['DictionaryValueIteratorType'] = type.__dict__.itervalues()
+# string services (CH 7)
+x['StructType'] = struct.Struct('c')
+x['CallableIteratorType'] = _srepattern.finditer('')
+x['SREMatchType'] = _srepattern.match('')
+x['SREScannerType'] = _srepattern.scanner('')
+x['StreamReader'] = codecs.StreamReader(_cstrI) #XXX: ... and etc
+# python object persistence (CH 11)
+# x['DbShelveType'] = shelve.open('foo','n')#,protocol=2) #XXX: delete foo
+if HAS_ALL:
+    x['DbmType'] = dbm.open(_tempfile,'n')
+# x['DbCursorType'] = _dbcursor = anydbm.open('foo','n') #XXX: delete foo
+# x['DbType'] = _dbcursor.db
+# data compression and archiving (CH 12)
+x['ZlibCompressType'] = zlib.compressobj()
+x['ZlibDecompressType'] = zlib.decompressobj()
+# file formats (CH 13)
+x['CSVReaderType'] = csv.reader(_cstrI)
+x['CSVWriterType'] = csv.writer(_cstrO)
+x['CSVDictReaderType'] = csv.DictReader(_cstrI)
+x['CSVDictWriterType'] = csv.DictWriter(_cstrO,{})
+# cryptographic services (CH 14)
+x['HashType'] = hashlib.md5()
+x['HMACType'] = hmac.new(_in)
+# generic operating system services (CH 15)
+#x['CursesWindowType'] = _curwin = curses.initscr() #FIXME: messes up tty
+#x['CursesTextPadType'] = textpad.Textbox(_curwin)
+#x['CursesPanelType'] = panel.new_panel(_curwin)
+if HAS_CTYPES:
+    x['CCharPType'] = ctypes.c_char_p()
+    x['CWCharPType'] = ctypes.c_wchar_p()
+    x['CVoidPType'] = ctypes.c_void_p()
+    x['CDLLType'] = _cdll = ctypes.CDLL(None)
+    x['PyDLLType'] = _pydll = ctypes.pythonapi
+    x['FuncPtrType'] = _cdll._FuncPtr()
+    x['CCharArrayType'] = ctypes.create_string_buffer(1)
+    x['CWCharArrayType'] = ctypes.create_unicode_buffer(1)
+    x['CParamType'] = ctypes.byref(_cchar)
+    x['LPCCharType'] = ctypes.pointer(_cchar)
+    x['LPCCharObjType'] = _lpchar = ctypes.POINTER(ctypes.c_char)
+    x['NullPtrType'] = _lpchar()
+    x['NullPyObjectType'] = ctypes.py_object()
+    x['PyObjectType'] = ctypes.py_object(1)
+    x['FieldType'] = _field = _Struct._field
+    x['CFUNCTYPEType'] = _cfunc = ctypes.CFUNCTYPE(ctypes.c_char)
+    x['CFunctionType'] = _cfunc(str)
+try: # python 2.6
+    # numeric and mathematical types (CH 9)
+    x['MethodCallerType'] = operator.methodcaller('mro') # 2.6
+except AttributeError:
+    pass
+try: # python 2.7
+    # built-in types (CH 5)
+    x['MemoryType'] = memoryview(_in) # 2.7
+    x['MemoryType2'] = memoryview(bytearray(_in)) # 2.7
+    if PY3:
+        x['DictItemsType'] = _dict.items() # 2.7
+        x['DictKeysType'] = _dict.keys() # 2.7
+        x['DictValuesType'] = _dict.values() # 2.7
+    else:
+        x['DictItemsType'] = _dict.viewitems() # 2.7
+        x['DictKeysType'] = _dict.viewkeys() # 2.7
+        x['DictValuesType'] = _dict.viewvalues() # 2.7
+    # generic operating system services (CH 15)
+    x['RawTextHelpFormatterType'] = argparse.RawTextHelpFormatter('PROG')
+    x['RawDescriptionHelpFormatterType'] = argparse.RawDescriptionHelpFormatter('PROG')
+    x['ArgDefaultsHelpFormatterType'] = argparse.ArgumentDefaultsHelpFormatter('PROG')
+except NameError:
+    pass
+try: # python 2.7 (and not 3.1)
+    x['CmpKeyType'] = _cmpkey = functools.cmp_to_key(_methodwrap) # 2.7, >=3.2
+    x['CmpKeyObjType'] = _cmpkey('0') #2.7, >=3.2
+except AttributeError:
+    pass
+if PY3: # oddities: removed, etc
+    x['BufferType'] = x['MemoryType']
+else:
+    x['BufferType'] = buffer('')
+
+# -- cleanup ----------------------------------------------------------------
+a.update(d) # registered also succeed
+os.remove(_tempfile)
+
+
+# EOF

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py
new file mode 100644
index 0000000..76357a5
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py
@@ -0,0 +1,240 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+#!/usr/bin/env python
+#
+# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
+# Copyright (c) 2008-2014 California Institute of Technology.
+# License: 3-clause BSD.  The full license text is available at:
+#  - http://trac.mystic.cacr.caltech.edu/project/pathos/browser/dill/LICENSE
+"""
+Methods for detecting objects leading to pickling failures.
+"""
+
+from __future__ import absolute_import
+from inspect import ismethod, isfunction, istraceback, isframe, iscode
+from .pointers import parent, reference, at, parents, children
+
+from .dill import _trace as trace
+from .dill import PY3
+
+def outermost(func): # is analogous to getsource(func,enclosing=True)
+    """get outermost enclosing object (i.e. the outer function in a closure)
+
+    NOTE: this is the object-equivalent of getsource(func, enclosing=True)
+    """
+    if PY3:
+        if ismethod(func):
+            _globals = func.__func__.__globals__ or {}
+        elif isfunction(func):
+            _globals = func.__globals__ or {}
+        else:
+            return #XXX: or raise? no matches
+        _globals = _globals.items()
+    else:
+        if ismethod(func):
+            _globals = func.im_func.func_globals or {}
+        elif isfunction(func):
+            _globals = func.func_globals or {}
+        else:
+            return #XXX: or raise? no matches
+        _globals = _globals.iteritems()
+    # get the enclosing source
+    from .source import getsourcelines
+    try: lines,lnum = getsourcelines(func, enclosing=True)
+    except: #TypeError, IOError
+        lines,lnum = [],None
+    code = ''.join(lines)
+    # get all possible names,objects that are named in the enclosing source
+    _locals = ((name,obj) for (name,obj) in _globals if name in code)
+    # now only save the objects that generate the enclosing block
+    for name,obj in _locals: #XXX: don't really need 'name'
+        try:
+            if getsourcelines(obj) == (lines,lnum): return obj
+        except: #TypeError, IOError
+            pass
+    return #XXX: or raise? no matches
+
+def nestedcode(func): #XXX: or return dict of {co_name: co} ?
+    """get the code objects for any nested functions (e.g. in a closure)"""
+    func = code(func)
+    if not iscode(func): return [] #XXX: or raise? no matches
+    nested = []
+    for co in func.co_consts:
+        if co is None: continue
+        co = code(co)
+        if co: nested.append(co)
+    return nested
+
+def code(func):
+    '''get the code object for the given function or method
+
+    NOTE: use dill.source.getsource(CODEOBJ) to get the source code
+    '''
+    if PY3:
+        im_func = '__func__'
+        func_code = '__code__'
+    else:
+        im_func = 'im_func'
+        func_code = 'func_code'
+    if ismethod(func): func = getattr(func, im_func)
+    if isfunction(func): func = getattr(func, func_code)
+    if istraceback(func): func = func.tb_frame
+    if isframe(func): func = func.f_code
+    if iscode(func): return func
+    return
+
+def nested(func): #XXX: or return dict of {__name__: obj} ?
+    """get any functions inside of func (e.g. inner functions in a closure)
+
+    NOTE: results may differ if the function has been executed or not.
+    If len(nestedcode(func)) > len(nested(func)), try calling func().
+    If possible, python builds code objects, but delays building functions
+    until func() is called.
+    """
+    if PY3:
+        att1 = '__code__'
+        att0 = '__func__'
+    else:
+        att1 = 'func_code' # functions
+        att0 = 'im_func'   # methods
+
+    import gc
+    funcs = []
+    # get the code objects, and try to track down by referrence
+    for co in nestedcode(func):
+        # look for function objects that refer to the code object
+        for obj in gc.get_referrers(co):
+            # get methods
+            _ = getattr(obj, att0, None) # ismethod
+            if getattr(_, att1, None) is co: funcs.append(obj)
+            # get functions
+            elif getattr(obj, att1, None) is co: funcs.append(obj)
+            # get frame objects
+            elif getattr(obj, 'f_code', None) is co: funcs.append(obj)
+            # get code objects
+            elif hasattr(obj, 'co_code') and obj is co: funcs.append(obj)
+#     frameobjs => func.func_code.co_varnames not in func.func_code.co_cellvars
+#     funcobjs => func.func_code.co_cellvars not in func.func_code.co_varnames
+#     frameobjs are not found, however funcobjs are...
+#     (see: test_mixins.quad ... and test_mixins.wtf)
+#     after execution, code objects get compiled, and them may be found by gc
+    return funcs
+
+
+def freevars(func):
+    """get objects defined in enclosing code that are referred to by func
+
+    returns a dict of {name:object}"""
+    if PY3:
+        im_func = '__func__'
+        func_code = '__code__'
+        func_closure = '__closure__'
+    else:
+        im_func = 'im_func'
+        func_code = 'func_code'
+        func_closure = 'func_closure'
+    if ismethod(func): func = getattr(func, im_func)
+    if isfunction(func):
+        closures = getattr(func, func_closure) or ()
+        func = getattr(func, func_code).co_freevars # get freevars
+    else:
+        return {}
+    return dict((name,c.cell_contents) for (name,c) in zip(func,closures))
+
+def globalvars(func):
+    """get objects defined in global scope that are referred to by func
+
+    return a dict of {name:object}"""
+    if PY3:
+        im_func = '__func__'
+        func_code = '__code__'
+        func_globals = '__globals__'
+    else:
+        im_func = 'im_func'
+        func_code = 'func_code'
+        func_globals = 'func_globals'
+    if ismethod(func): func = getattr(func, im_func)
+    if isfunction(func):
+        globs = getattr(func, func_globals) or {}
+        func = getattr(func, func_code).co_names # get names
+    else:
+        return {}
+    #NOTE: if name not in func_globals, then we skip it...
+    return dict((name,globs[name]) for name in func if name in globs)
+
+def varnames(func):
+    """get names of variables defined by func
+
+    returns a tuple (local vars, local vars referrenced by nested functions)"""
+    func = code(func)
+    if not iscode(func):
+        return () #XXX: better ((),())? or None?
+    return func.co_varnames, func.co_cellvars
+
+
+def baditems(obj, exact=False, safe=False): #XXX: obj=globals() ?
+    """get items in object that fail to pickle"""
+    if not hasattr(obj,'__iter__'): # is not iterable
+        return [j for j in (badobjects(obj,0,exact,safe),) if j is not None]
+    obj = obj.values() if getattr(obj,'values',None) else obj
+    _obj = [] # can't use a set, as items may be unhashable
+    [_obj.append(badobjects(i,0,exact,safe)) for i in obj if i not in _obj]
+    return [j for j in _obj if j is not None]
+
+
+def badobjects(obj, depth=0, exact=False, safe=False):
+    """get objects that fail to pickle"""
+    from dill import pickles
+    if not depth:
+        if pickles(obj,exact,safe): return None
+        return obj
+    return dict(((attr, badobjects(getattr(obj,attr),depth-1,exact,safe)) \
+           for attr in dir(obj) if not pickles(getattr(obj,attr),exact,safe)))
+
+def badtypes(obj, depth=0, exact=False, safe=False):
+    """get types for objects that fail to pickle"""
+    from dill import pickles
+    if not depth:
+        if pickles(obj,exact,safe): return None
+        return type(obj)
+    return dict(((attr, badtypes(getattr(obj,attr),depth-1,exact,safe)) \
+           for attr in dir(obj) if not pickles(getattr(obj,attr),exact,safe)))
+
+def errors(obj, depth=0, exact=False, safe=False):
+    """get errors for objects that fail to pickle"""
+    from dill import pickles, copy
+    if not depth:
+        try:
+            pik = copy(obj)
+            if exact:
+                assert pik == obj, \
+                    "Unpickling produces %s instead of %s" % (pik,obj)
+            assert type(pik) == type(obj), \
+                "Unpickling produces %s instead of %s" % (type(pik),type(obj))
+            return None
+        except Exception:
+            import sys
+            return sys.exc_info()[1]
+    return dict(((attr, errors(getattr(obj,attr),depth-1,exact,safe)) \
+           for attr in dir(obj) if not pickles(getattr(obj,attr),exact,safe)))
+
+del absolute_import
+
+
+# EOF

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py
new file mode 100644
index 0000000..238527c
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py
@@ -0,0 +1,1052 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# -*- coding: utf-8 -*-
+#
+# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
+# Copyright (c) 2008-2014 California Institute of Technology.
+# License: 3-clause BSD.  The full license text is available at:
+#  - http://trac.mystic.cacr.caltech.edu/project/pathos/browser/dill/LICENSE
+"""
+dill: a utility for serialization of python objects
+
+Based on code written by Oren Tirosh and Armin Ronacher.
+Extended to a (near) full set of the builtin types (in types module),
+and coded to the pickle interface, by <mmckerns@caltech.edu>.
+Initial port to python3 by Jonathan Dobson, continued by mmckerns.
+Test against "all" python types (Std. Lib. CH 1-15 @ 2.7) by mmckerns.
+Test against CH16+ Std. Lib. ... TBD.
+"""
+__all__ = ['dump','dumps','load','loads','dump_session','load_session',
+           'Pickler','Unpickler','register','copy','pickle','pickles',
+           'HIGHEST_PROTOCOL','DEFAULT_PROTOCOL','PicklingError',
+           'UnpicklingError','HANDLE_FMODE','CONTENTS_FMODE','FILE_FMODE']
+
+import logging
+log = logging.getLogger("dill")
+log.addHandler(logging.StreamHandler())
+def _trace(boolean):
+    """print a trace through the stack when pickling; useful for debugging"""
+    if boolean: log.setLevel(logging.INFO)
+    else: log.setLevel(logging.WARN)
+    return
+
+import os
+import sys
+diff = None
+_use_diff = False
+PY3 = sys.version_info[0] == 3
+if PY3: #XXX: get types from dill.objtypes ?
+    import builtins as __builtin__
+    from pickle import _Pickler as StockPickler, _Unpickler as StockUnpickler
+    from _thread import LockType
+   #from io import IOBase
+    from types import CodeType, FunctionType, MethodType, GeneratorType, \
+        TracebackType, FrameType, ModuleType, BuiltinMethodType
+    BufferType = memoryview #XXX: unregistered
+    ClassType = type # no 'old-style' classes
+    EllipsisType = type(Ellipsis)
+   #FileType = IOBase
+    NotImplementedType = type(NotImplemented)
+    SliceType = slice
+    TypeType = type # 'new-style' classes #XXX: unregistered
+    XRangeType = range
+    DictProxyType = type(object.__dict__)
+else:
+    import __builtin__
+    from pickle import Pickler as StockPickler, Unpickler as StockUnpickler
+    from thread import LockType
+    from types import CodeType, FunctionType, ClassType, MethodType, \
+         GeneratorType, DictProxyType, XRangeType, SliceType, TracebackType, \
+         NotImplementedType, EllipsisType, FrameType, ModuleType, \
+         BufferType, BuiltinMethodType, TypeType
+from pickle import HIGHEST_PROTOCOL, PicklingError, UnpicklingError
+try:
+    from pickle import DEFAULT_PROTOCOL
+except ImportError:
+    DEFAULT_PROTOCOL = HIGHEST_PROTOCOL
+import __main__ as _main_module
+import marshal
+import gc
+# import zlib
+from weakref import ReferenceType, ProxyType, CallableProxyType
+from functools import partial
+from operator import itemgetter, attrgetter
+# new in python2.5
+if sys.hexversion >= 0x20500f0:
+    from types import MemberDescriptorType, GetSetDescriptorType
+# new in python3.3
+if sys.hexversion < 0x03030000:
+    FileNotFoundError = IOError
+try:
+    import ctypes
+    HAS_CTYPES = True
+except ImportError:
+    HAS_CTYPES = False
+try:
+    from numpy import ufunc as NumpyUfuncType
+    from numpy import ndarray as NumpyArrayType
+    def ndarrayinstance(obj):
+        try:
+            if not isinstance(obj, NumpyArrayType): return False
+        except ReferenceError: return False # handle 'R3' weakref in 3.x
+        # verify that __reduce__ has not been overridden
+        NumpyInstance = NumpyArrayType((0,),'int8')
+        if id(obj.__reduce_ex__) == id(NumpyInstance.__reduce_ex__) and \
+           id(obj.__reduce__) == id(NumpyInstance.__reduce__): return True
+        return False
+except ImportError:
+    NumpyUfuncType = None
+    NumpyArrayType = None
+    def ndarrayinstance(obj): return False
+
+# make sure to add these 'hand-built' types to _typemap
+if PY3:
+    CellType = type((lambda x: lambda y: x)(0).__closure__[0])
+else:
+    CellType = type((lambda x: lambda y: x)(0).func_closure[0])
+WrapperDescriptorType = type(type.__repr__)
+MethodDescriptorType = type(type.__dict__['mro'])
+MethodWrapperType = type([].__repr__)
+PartialType = type(partial(int,base=2))
+SuperType = type(super(Exception, TypeError()))
+ItemGetterType = type(itemgetter(0))
+AttrGetterType = type(attrgetter('__repr__'))
+FileType = type(open(os.devnull, 'rb', buffering=0))
+TextWrapperType = type(open(os.devnull, 'r', buffering=-1))
+BufferedRandomType = type(open(os.devnull, 'r+b', buffering=-1))
+BufferedReaderType = type(open(os.devnull, 'rb', buffering=-1))
+BufferedWriterType = type(open(os.devnull, 'wb', buffering=-1))
+try:
+    from _pyio import open as _open
+    PyTextWrapperType = type(_open(os.devnull, 'r', buffering=-1))
+    PyBufferedRandomType = type(_open(os.devnull, 'r+b', buffering=-1))
+    PyBufferedReaderType = type(_open(os.devnull, 'rb', buffering=-1))
+    PyBufferedWriterType = type(_open(os.devnull, 'wb', buffering=-1))
+except ImportError:
+    PyTextWrapperType = PyBufferedRandomType = PyBufferedReaderType = PyBufferedWriterType = None
+try:
+    from cStringIO import StringIO, InputType, OutputType
+except ImportError:
+    if PY3:
+        from io import BytesIO as StringIO
+    else:
+        from StringIO import StringIO
+    InputType = OutputType = None
+try:
+    __IPYTHON__ is True # is ipython
+    ExitType = None     # IPython.core.autocall.ExitAutocall
+    singletontypes = ['exit', 'quit', 'get_ipython']
+except NameError:
+    try: ExitType = type(exit) # apparently 'exit' can be removed
+    except NameError: ExitType = None
+    singletontypes = []
+
+### File modes
+# Pickles the file handle, preserving mode. The position of the unpickled
+# object is as for a new file handle.
+HANDLE_FMODE = 0
+# Pickles the file contents, creating a new file if on load the file does
+# not exist. The position = min(pickled position, EOF) and mode is chosen
+# as such that "best" preserves behavior of the original file.
+CONTENTS_FMODE = 1
+# Pickles the entire file (handle and contents), preserving mode and position.
+FILE_FMODE = 2
+
+### Shorthands (modified from python2.5/lib/pickle.py)
+def copy(obj, *args, **kwds):
+    """use pickling to 'copy' an object"""
+    return loads(dumps(obj, *args, **kwds))
+
+def dump(obj, file, protocol=None, byref=False, fmode=HANDLE_FMODE):#, strictio=False):
+    """pickle an object to a file"""
+    strictio = False #FIXME: strict=True needs cleanup
+    if protocol is None: protocol = DEFAULT_PROTOCOL
+    pik = Pickler(file, protocol)
+    pik._main_module = _main_module
+    # save settings
+    _byref = pik._byref
+    _strictio = pik._strictio
+    _fmode = pik._fmode
+    # apply kwd settings
+    pik._byref = bool(byref)
+    pik._strictio = bool(strictio)
+    pik._fmode = fmode
+    # hack to catch subclassed numpy array instances
+    if NumpyArrayType and ndarrayinstance(obj):
+        @register(type(obj))
+        def save_numpy_array(pickler, obj):
+            log.info("Nu: (%s, %s)" % (obj.shape,obj.dtype))
+            npdict = getattr(obj, '__dict__', None)
+            f, args, state = obj.__reduce__()
+            pik.save_reduce(_create_array, (f, args, state, npdict), obj=obj)
+            return
+    # end hack
+    pik.dump(obj)
+    # return to saved settings
+    pik._byref = _byref
+    pik._strictio = _strictio
+    pik._fmode = _fmode
+    return
+
+def dumps(obj, protocol=None, byref=False, fmode=HANDLE_FMODE):#, strictio=False):
+    """pickle an object to a string"""
+    file = StringIO()
+    dump(obj, file, protocol, byref, fmode)#, strictio)
+    return file.getvalue()
+
+def load(file):
+    """unpickle an object from a file"""
+    pik = Unpickler(file)
+    pik._main_module = _main_module
+    obj = pik.load()
+    if type(obj).__module__ == _main_module.__name__: # point obj class to main
+        try: obj.__class__ == getattr(pik._main_module, type(obj).__name__)
+        except AttributeError: pass # defined in a file
+   #_main_module.__dict__.update(obj.__dict__) #XXX: should update globals ?
+    return obj
+
+def loads(str):
+    """unpickle an object from a string"""
+    file = StringIO(str)
+    return load(file)
+
+# def dumpzs(obj, protocol=None):
+#     """pickle an object to a compressed string"""
+#     return zlib.compress(dumps(obj, protocol))
+
+# def loadzs(str):
+#     """unpickle an object from a compressed string"""
+#     return loads(zlib.decompress(str))
+
+### End: Shorthands ###
+
+### Pickle the Interpreter Session
+def dump_session(filename='/tmp/session.pkl', main_module=_main_module):
+    """pickle the current state of __main__ to a file"""
+    f = open(filename, 'wb')
+    try:
+        pickler = Pickler(f, 2)
+        pickler._main_module = main_module
+        _byref = pickler._byref
+        pickler._byref = False  # disable pickling by name reference
+        pickler._session = True # is best indicator of when pickling a session
+        pickler.dump(main_module)
+        pickler._session = False
+        pickler._byref = _byref
+    finally:
+        f.close()
+    return
+
+def load_session(filename='/tmp/session.pkl', main_module=_main_module):
+    """update the __main__ module with the state from the session file"""
+    f = open(filename, 'rb')
+    try:
+        unpickler = Unpickler(f)
+        unpickler._main_module = main_module
+        unpickler._session = True
+        module = unpickler.load()
+        unpickler._session = False
+        main_module.__dict__.update(module.__dict__)
+    finally:
+        f.close()
+    return
+
+### End: Pickle the Interpreter
+
+### Extend the Picklers
+class Pickler(StockPickler):
+    """python's Pickler extended to interpreter sessions"""
+    dispatch = StockPickler.dispatch.copy()
+    _main_module = None
+    _session = False
+    _byref = False
+    _strictio = False
+    _fmode = HANDLE_FMODE
+    pass
+
+    def __init__(self, *args, **kwargs):
+        StockPickler.__init__(self, *args, **kwargs)
+        self._main_module = _main_module
+        self._diff_cache = {}
+
+class Unpickler(StockUnpickler):
+    """python's Unpickler extended to interpreter sessions and more types"""
+    _main_module = None
+    _session = False
+
+    def find_class(self, module, name):
+        if (module, name) == ('__builtin__', '__main__'):
+            return self._main_module.__dict__ #XXX: above set w/save_module_dict
+        return StockUnpickler.find_class(self, module, name)
+    pass
+
+    def __init__(self, *args, **kwargs):
+        StockUnpickler.__init__(self, *args, **kwargs)
+        self._main_module = _main_module
+
+'''
+def dispatch_table():
+    """get the dispatch table of registered types"""
+    return Pickler.dispatch
+'''
+
+pickle_dispatch_copy = StockPickler.dispatch.copy()
+
+def pickle(t, func):
+    """expose dispatch table for user-created extensions"""
+    Pickler.dispatch[t] = func
+    return
+
+def register(t):
+    def proxy(func):
+        Pickler.dispatch[t] = func
+        return func
+    return proxy
+
+def _revert_extension():
+    for type, func in list(StockPickler.dispatch.items()):
+        if func.__module__ == __name__:
+            del StockPickler.dispatch[type]
+            if type in pickle_dispatch_copy:
+                StockPickler.dispatch[type] = pickle_dispatch_copy[type]
+
+def use_diff(on=True):
+    """
+    reduces size of pickles by only including object which have changed.
+    Decreases pickle size but increases CPU time needed.
+    Also helps avoid some unpicklable objects.
+    MUST be called at start of script, otherwise changes will not be recorded.
+    """
+    global _use_diff, diff
+    _use_diff = on
+    if _use_diff and diff is None:
+        try:
+            from . import diff as d
+        except:
+            import diff as d
+        diff = d
+
+def _create_typemap():
+    import types
+    if PY3:
+        d = dict(list(__builtin__.__dict__.items()) + \
+                 list(types.__dict__.items())).items()
+        builtin = 'builtins'
+    else:
+        d = types.__dict__.iteritems()
+        builtin = '__builtin__'
+    for key, value in d:
+        if getattr(value, '__module__', None) == builtin \
+        and type(value) is type:
+            yield key, value
+    return
+_reverse_typemap = dict(_create_typemap())
+_reverse_typemap.update({
+    'CellType': CellType,
+    'WrapperDescriptorType': WrapperDescriptorType,
+    'MethodDescriptorType': MethodDescriptorType,
+    'MethodWrapperType': MethodWrapperType,
+    'PartialType': PartialType,
+    'SuperType': SuperType,
+    'ItemGetterType': ItemGetterType,
+    'AttrGetterType': AttrGetterType,
+    'FileType': FileType,
+    'BufferedRandomType': BufferedRandomType,
+    'BufferedReaderType': BufferedReaderType,
+    'BufferedWriterType': BufferedWriterType,
+    'TextWrapperType': TextWrapperType,
+    'PyBufferedRandomType': PyBufferedRandomType,
+    'PyBufferedReaderType': PyBufferedReaderType,
+    'PyBufferedWriterType': PyBufferedWriterType,
+    'PyTextWrapperType': PyTextWrapperType,
+})
+if ExitType:
+    _reverse_typemap['ExitType'] = ExitType
+if InputType:
+    _reverse_typemap['InputType'] = InputType
+    _reverse_typemap['OutputType'] = OutputType
+if PY3:
+    _typemap = dict((v, k) for k, v in _reverse_typemap.items())
+else:
+    _typemap = dict((v, k) for k, v in _reverse_typemap.iteritems())
+
+def _unmarshal(string):
+    return marshal.loads(string)
+
+def _load_type(name):
+    return _reverse_typemap[name]
+
+def _create_type(typeobj, *args):
+    return typeobj(*args)
+
+def _create_function(fcode, fglobals, fname=None, fdefaults=None, \
+                                      fclosure=None, fdict=None):
+    # same as FunctionType, but enable passing __dict__ to new function,
+    # __dict__ is the storehouse for attributes added after function creation
+    if fdict is None: fdict = dict()
+    func = FunctionType(fcode, fglobals, fname, fdefaults, fclosure)
+    func.__dict__.update(fdict) #XXX: better copy? option to copy?
+    return func
+
+def _create_ftype(ftypeobj, func, args, kwds):
+    if kwds is None:
+        kwds = {}
+    if args is None:
+        args = ()
+    return ftypeobj(func, *args, **kwds)
+
+def _create_lock(locked, *args):
+    from threading import Lock
+    lock = Lock()
+    if locked:
+        if not lock.acquire(False):
+            raise UnpicklingError("Cannot acquire lock")
+    return lock
+
+# thanks to matsjoyce for adding all the different file modes
+def _create_filehandle(name, mode, position, closed, open, strictio, fmode, fdata): # buffering=0
+    # only pickles the handle, not the file contents... good? or StringIO(data)?
+    # (for file contents see: http://effbot.org/librarybook/copy-reg.htm)
+    # NOTE: handle special cases first (are there more special cases?)
+    names = {'<stdin>':sys.__stdin__, '<stdout>':sys.__stdout__,
+             '<stderr>':sys.__stderr__} #XXX: better fileno=(0,1,2) ?
+    if name in list(names.keys()):
+        f = names[name] #XXX: safer "f=sys.stdin"
+    elif name == '<tmpfile>':
+        f = os.tmpfile()
+    elif name == '<fdopen>':
+        import tempfile
+        f = tempfile.TemporaryFile(mode)
+    else:
+        # treat x mode as w mode
+        if "x" in mode and sys.hexversion < 0x03030000:
+            raise ValueError("invalid mode: '%s'" % mode)
+
+        if not os.path.exists(name):
+            if strictio:
+                raise FileNotFoundError("[Errno 2] No such file or directory: '%s'" % name)
+            elif "r" in mode and fmode != FILE_FMODE:
+                name = '<fdopen>' # or os.devnull?
+            current_size = 0 # or maintain position?
+        else:
+            current_size = os.path.getsize(name)
+
+        if position > current_size:
+            if strictio:
+                raise ValueError("invalid buffer size")
+            elif fmode == CONTENTS_FMODE:
+                position = current_size
+        # try to open the file by name
+        # NOTE: has different fileno
+        try:
+            #FIXME: missing: *buffering*, encoding, softspace
+            if fmode == FILE_FMODE:
+                f = open(name, mode if "w" in mode else "w")
+                f.write(fdata)
+                if "w" not in mode:
+                    f.close()
+                    f = open(name, mode)
+            elif name == '<fdopen>': # file did not exist
+                import tempfile
+                f = tempfile.TemporaryFile(mode)
+            elif fmode == CONTENTS_FMODE \
+               and ("w" in mode or "x" in mode):
+                # stop truncation when opening
+                flags = os.O_CREAT
+                if "+" in mode:
+                    flags |= os.O_RDWR
+                else:
+                    flags |= os.O_WRONLY
+                f = os.fdopen(os.open(name, flags), mode)
+                # set name to the correct value
+                if PY3:
+                    r = getattr(f, "buffer", f)
+                    r = getattr(r, "raw", r)
+                    r.name = name
+                else:
+                    class FILE(ctypes.Structure):
+                        _fields_ = [("refcount", ctypes.c_long),
+                                    ("type_obj", ctypes.py_object),
+                                    ("file_pointer", ctypes.c_voidp),
+                                    ("name", ctypes.py_object)]
+
+                    class PyObject(ctypes.Structure):
+                        _fields_ = [
+                            ("ob_refcnt", ctypes.c_int),
+                            ("ob_type", ctypes.py_object)
+                            ]
+                    if not HAS_CTYPES:
+                        raise ImportError("No module named 'ctypes'")
+                    ctypes.cast(id(f), ctypes.POINTER(FILE)).contents.name = name
+                    ctypes.cast(id(name), ctypes.POINTER(PyObject)).contents.ob_refcnt += 1
+                assert f.name == name
+            else:
+                f = open(name, mode)
+        except (IOError, FileNotFoundError):
+            err = sys.exc_info()[1]
+            raise UnpicklingError(err)
+    if closed:
+        f.close()
+    elif position >= 0 and fmode != HANDLE_FMODE:
+        f.seek(position)
+    return f
+
+def _create_stringi(value, position, closed):
+    f = StringIO(value)
+    if closed: f.close()
+    else: f.seek(position)
+    return f
+
+def _create_stringo(value, position, closed):
+    f = StringIO()
+    if closed: f.close()
+    else:
+       f.write(value)
+       f.seek(position)
+    return f
+
+class _itemgetter_helper(object):
+    def __init__(self):
+        self.items = []
+    def __getitem__(self, item):
+        self.items.append(item)
+        return
+
+class _attrgetter_helper(object):
+    def __init__(self, attrs, index=None):
+        self.attrs = attrs
+        self.index = index
+    def __getattribute__(self, attr):
+        attrs = object.__getattribute__(self, "attrs")
+        index = object.__getattribute__(self, "index")
+        if index is None:
+            index = len(attrs)
+            attrs.append(attr)
+        else:
+            attrs[index] = ".".join([attrs[index], attr])
+        return type(self)(attrs, index)
+
+if HAS_CTYPES:
+    ctypes.pythonapi.PyCell_New.restype = ctypes.py_object
+    ctypes.pythonapi.PyCell_New.argtypes = [ctypes.py_object]
+    # thanks to Paul Kienzle for cleaning the ctypes CellType logic
+    def _create_cell(contents):
+        return ctypes.pythonapi.PyCell_New(contents)
+
+def _create_weakref(obj, *args):
+    from weakref import ref
+    if obj is None: # it's dead
+        if PY3:
+            from collections import UserDict
+        else:
+            from UserDict import UserDict
+        return ref(UserDict(), *args)
+    return ref(obj, *args)
+
+def _create_weakproxy(obj, callable=False, *args):
+    from weakref import proxy
+    if obj is None: # it's dead
+        if callable: return proxy(lambda x:x, *args)
+        if PY3:
+            from collections import UserDict
+        else:
+            from UserDict import UserDict
+        return proxy(UserDict(), *args)
+    return proxy(obj, *args)
+
+def _eval_repr(repr_str):
+    return eval(repr_str)
+
+def _create_array(f, args, state, npdict=None):
+   #array = numpy.core.multiarray._reconstruct(*args)
+    array = f(*args)
+    array.__setstate__(state)
+    if npdict is not None: # we also have saved state in __dict__
+        array.__dict__.update(npdict)
+    return array
+
+def _getattr(objclass, name, repr_str):
+    # hack to grab the reference directly
+    try: #XXX: works only for __builtin__ ?
+        attr = repr_str.split("'")[3]
+        return eval(attr+'.__dict__["'+name+'"]')
+    except:
+        attr = getattr(objclass,name)
+        if name == '__dict__':
+            attr = attr[name]
+        return attr
+
+def _get_attr(self, name):
+    # stop recursive pickling
+    return getattr(self, name)
+
+def _dict_from_dictproxy(dictproxy):
+    _dict = dictproxy.copy() # convert dictproxy to dict
+    _dict.pop('__dict__', None)
+    _dict.pop('__weakref__', None)
+    return _dict
+
+def _import_module(import_name, safe=False):
+    try:
+        if '.' in import_name:
+            items = import_name.split('.')
+            module = '.'.join(items[:-1])
+            obj = items[-1]
+        else:
+            return __import__(import_name)
+        return getattr(__import__(module, None, None, [obj]), obj)
+    except (ImportError, AttributeError):
+        if safe:
+            return None
+        raise
+
+def _locate_function(obj, session=False):
+    if obj.__module__ == '__main__': # and session:
+        return False
+    found = _import_module(obj.__module__ + '.' + obj.__name__, safe=True)
+    return found is obj
+
+@register(CodeType)
+def save_code(pickler, obj):
+    log.info("Co: %s" % obj)
+    pickler.save_reduce(_unmarshal, (marshal.dumps(obj),), obj=obj)
+    return
+
+@register(FunctionType)
+def save_function(pickler, obj):
+    if not _locate_function(obj): #, pickler._session):
+        log.info("F1: %s" % obj)
+        if PY3:
+            pickler.save_reduce(_create_function, (obj.__code__, 
+                                obj.__globals__, obj.__name__,
+                                obj.__defaults__, obj.__closure__,
+                                obj.__dict__), obj=obj)
+        else:
+            pickler.save_reduce(_create_function, (obj.func_code,
+                                obj.func_globals, obj.func_name,
+                                obj.func_defaults, obj.func_closure,
+                                obj.__dict__), obj=obj)
+    else:
+        log.info("F2: %s" % obj)
+        StockPickler.save_global(pickler, obj) #NOTE: also takes name=...
+    return
+
+@register(dict)
+def save_module_dict(pickler, obj):
+    if is_dill(pickler) and obj == pickler._main_module.__dict__ and not pickler._session:
+        log.info("D1: <dict%s" % str(obj.__repr__).split('dict')[-1]) # obj
+        if PY3:
+            pickler.write(bytes('c__builtin__\n__main__\n', 'UTF-8'))
+        else:
+            pickler.write('c__builtin__\n__main__\n')
+    elif not is_dill(pickler) and obj == _main_module.__dict__:
+        log.info("D3: <dict%s" % str(obj.__repr__).split('dict')[-1]) # obj
+        if PY3:
+            pickler.write(bytes('c__main__\n__dict__\n', 'UTF-8'))
+        else:
+            pickler.write('c__main__\n__dict__\n')   #XXX: works in general?
+    elif '__name__' in obj and obj != _main_module.__dict__ \
+    and obj is getattr(_import_module(obj['__name__'],True), '__dict__', None):
+        log.info("D4: <dict%s" % str(obj.__repr__).split('dict')[-1]) # obj
+        if PY3:
+            pickler.write(bytes('c%s\n__dict__\n' % obj['__name__'], 'UTF-8'))
+        else:
+            pickler.write('c%s\n__dict__\n' % obj['__name__'])
+    else:
+        log.info("D2: <dict%s" % str(obj.__repr__).split('dict')[-1]) # obj
+        if is_dill(pickler) and pickler._session:
+            # we only care about session the first pass thru
+            pickler._session = False 
+        StockPickler.save_dict(pickler, obj)
+    return
+
+@register(ClassType)
+def save_classobj(pickler, obj):
+    if obj.__module__ == '__main__': #XXX: use _main_module.__name__ everywhere?
+        log.info("C1: %s" % obj)
+        pickler.save_reduce(ClassType, (obj.__name__, obj.__bases__,
+                                        obj.__dict__), obj=obj)
+                                       #XXX: or obj.__dict__.copy()), obj=obj) ?
+    else:
+        log.info("C2: %s" % obj)
+        StockPickler.save_global(pickler, obj)
+    return
+
+@register(LockType)
+def save_lock(pickler, obj):
+    log.info("Lo: %s" % obj)
+    pickler.save_reduce(_create_lock, (obj.locked(),), obj=obj)
+    return
+
+@register(ItemGetterType)
+def save_itemgetter(pickler, obj):
+    log.info("Ig: %s" % obj)
+    helper = _itemgetter_helper()
+    obj(helper)
+    pickler.save_reduce(type(obj), tuple(helper.items), obj=obj)
+    return
+
+@register(AttrGetterType)
+def save_attrgetter(pickler, obj):
+    log.info("Ag: %s" % obj)
+    attrs = []
+    helper = _attrgetter_helper(attrs)
+    obj(helper)
+    pickler.save_reduce(type(obj), tuple(attrs), obj=obj)
+    return
+
+def _save_file(pickler, obj, open_):
+    obj.flush()
+    if obj.closed:
+        position = None
+    else:
+        if obj in (sys.__stdout__, sys.__stderr__, sys.__stdin__):
+            position = -1
+        else:
+            position = obj.tell()
+    if pickler._fmode == FILE_FMODE:
+        f = open_(obj.name, "r")
+        fdata = f.read()
+        f.close()
+    else:
+        fdata = ""
+    strictio = pickler._strictio
+    fmode = pickler._fmode
+    pickler.save_reduce(_create_filehandle, (obj.name, obj.mode, position,
+                                             obj.closed, open_, strictio,
+                                             fmode, fdata), obj=obj)
+    return
+
+
+@register(FileType) #XXX: in 3.x has buffer=0, needs different _create?
+@register(BufferedRandomType)
+@register(BufferedReaderType)
+@register(BufferedWriterType)
+@register(TextWrapperType)
+def save_file(pickler, obj):
+    log.info("Fi: %s" % obj)
+    return _save_file(pickler, obj, open)
+
+if PyTextWrapperType:
+    @register(PyBufferedRandomType)
+    @register(PyBufferedReaderType)
+    @register(PyBufferedWriterType)
+    @register(PyTextWrapperType)
+    def save_file(pickler, obj):
+        log.info("Fi: %s" % obj)
+        return _save_file(pickler, obj, _open)
+
+# The following two functions are based on 'saveCStringIoInput'
+# and 'saveCStringIoOutput' from spickle
+# Copyright (c) 2011 by science+computing ag
+# License: http://www.apache.org/licenses/LICENSE-2.0
+if InputType:
+    @register(InputType)
+    def save_stringi(pickler, obj):
+        log.info("Io: %s" % obj)
+        if obj.closed:
+            value = ''; position = None
+        else:
+            value = obj.getvalue(); position = obj.tell()
+        pickler.save_reduce(_create_stringi, (value, position, \
+                                              obj.closed), obj=obj)
+        return
+
+    @register(OutputType)
+    def save_stringo(pickler, obj):
+        log.info("Io: %s" % obj)
+        if obj.closed:
+            value = ''; position = None
+        else:
+            value = obj.getvalue(); position = obj.tell()
+        pickler.save_reduce(_create_stringo, (value, position, \
+                                              obj.closed), obj=obj)
+        return
+
+@register(PartialType)
+def save_functor(pickler, obj):
+    log.info("Fu: %s" % obj)
+    pickler.save_reduce(_create_ftype, (type(obj), obj.func, obj.args,
+                                        obj.keywords), obj=obj)
+    return
+
+@register(SuperType)
+def save_functor(pickler, obj):
+    log.info("Su: %s" % obj)
+    pickler.save_reduce(super, (obj.__thisclass__, obj.__self__), obj=obj)
+    return
+
+@register(BuiltinMethodType)
+def save_builtin_method(pickler, obj):
+    if obj.__self__ is not None:
+        log.info("B1: %s" % obj)
+        pickler.save_reduce(_get_attr, (obj.__self__, obj.__name__), obj=obj)
+    else:
+        log.info("B2: %s" % obj)
+        StockPickler.save_global(pickler, obj)
+    return
+
+@register(MethodType) #FIXME: fails for 'hidden' or 'name-mangled' classes
+def save_instancemethod0(pickler, obj):# example: cStringIO.StringI
+    log.info("Me: %s" % obj) #XXX: obj.__dict__ handled elsewhere?
+    if PY3:
+        pickler.save_reduce(MethodType, (obj.__func__, obj.__self__), obj=obj)
+    else:
+        pickler.save_reduce(MethodType, (obj.im_func, obj.im_self,
+                                         obj.im_class), obj=obj)
+    return
+
+if sys.hexversion >= 0x20500f0:
+    @register(MemberDescriptorType)
+    @register(GetSetDescriptorType)
+    @register(MethodDescriptorType)
+    @register(WrapperDescriptorType)
+    def save_wrapper_descriptor(pickler, obj):
+        log.info("Wr: %s" % obj)
+        pickler.save_reduce(_getattr, (obj.__objclass__, obj.__name__,
+                                       obj.__repr__()), obj=obj)
+        return
+
+    @register(MethodWrapperType)
+    def save_instancemethod(pickler, obj):
+        log.info("Mw: %s" % obj)
+        pickler.save_reduce(getattr, (obj.__self__, obj.__name__), obj=obj)
+        return
+else:
+    @register(MethodDescriptorType)
+    @register(WrapperDescriptorType)
+    def save_wrapper_descriptor(pickler, obj):
+        log.info("Wr: %s" % obj)
+        pickler.save_reduce(_getattr, (obj.__objclass__, obj.__name__,
+                                       obj.__repr__()), obj=obj)
+        return
+
+if HAS_CTYPES:
+    @register(CellType)
+    def save_cell(pickler, obj):
+        log.info("Ce: %s" % obj)
+        pickler.save_reduce(_create_cell, (obj.cell_contents,), obj=obj)
+        return
+ 
+# The following function is based on 'saveDictProxy' from spickle
+# Copyright (c) 2011 by science+computing ag
+# License: http://www.apache.org/licenses/LICENSE-2.0
+@register(DictProxyType)
+def save_dictproxy(pickler, obj):
+    log.info("Dp: %s" % obj)
+    attr = obj.get('__dict__')
+   #pickler.save_reduce(_create_dictproxy, (attr,'nested'), obj=obj)
+    if type(attr) == GetSetDescriptorType and attr.__name__ == "__dict__" \
+    and getattr(attr.__objclass__, "__dict__", None) == obj:
+        pickler.save_reduce(getattr, (attr.__objclass__, "__dict__"), obj=obj)
+        return
+    # all bad below... so throw ReferenceError or TypeError
+    from weakref import ReferenceError
+    raise ReferenceError("%s does not reference a class __dict__" % obj)
+
+@register(SliceType)
+def save_slice(pickler, obj):
+    log.info("Sl: %s" % obj)
+    pickler.save_reduce(slice, (obj.start, obj.stop, obj.step), obj=obj)
+    return
+
+@register(XRangeType)
+@register(EllipsisType)
+@register(NotImplementedType)
+def save_singleton(pickler, obj):
+    log.info("Si: %s" % obj)
+    pickler.save_reduce(_eval_repr, (obj.__repr__(),), obj=obj)
+    return
+
+# thanks to Paul Kienzle for pointing out ufuncs didn't pickle
+if NumpyArrayType:
+    @register(NumpyUfuncType)
+    def save_numpy_ufunc(pickler, obj):
+        log.info("Nu: %s" % obj)
+        StockPickler.save_global(pickler, obj)
+        return
+# NOTE: the above 'save' performs like:
+#   import copy_reg
+#   def udump(f): return f.__name__
+#   def uload(name): return getattr(numpy, name)
+#   copy_reg.pickle(NumpyUfuncType, udump, uload)
+
+def _proxy_helper(obj): # a dead proxy returns a reference to None
+    """get memory address of proxy's reference object"""
+    try: #FIXME: has to be a smarter way to identify if it's a proxy
+        address = int(repr(obj).rstrip('>').split(' at ')[-1], base=16)
+    except ValueError: # has a repr... is thus probably not a proxy
+        address = id(obj)
+    return address
+
+def _locate_object(address, module=None):
+    """get object located at the given memory address (inverse of id(obj))"""
+    special = [None, True, False] #XXX: more...?
+    for obj in special:
+        if address == id(obj): return obj
+    if module:
+        if PY3:
+            objects = iter(module.__dict__.values())
+        else:
+            objects = module.__dict__.itervalues()
+    else: objects = iter(gc.get_objects())
+    for obj in objects:
+        if address == id(obj): return obj
+    # all bad below... nothing found so throw ReferenceError or TypeError
+    from weakref import ReferenceError
+    try: address = hex(address)
+    except TypeError:
+        raise TypeError("'%s' is not a valid memory address" % str(address))
+    raise ReferenceError("Cannot reference object at '%s'" % address)
+
+@register(ReferenceType)
+def save_weakref(pickler, obj):
+    refobj = obj()
+    log.info("R1: %s" % obj)
+   #refobj = ctypes.pythonapi.PyWeakref_GetObject(obj) # dead returns "None"
+    pickler.save_reduce(_create_weakref, (refobj,), obj=obj)
+    return
+
+@register(ProxyType)
+@register(CallableProxyType)
+def save_weakproxy(pickler, obj):
+    refobj = _locate_object(_proxy_helper(obj))
+    try: log.info("R2: %s" % obj)
+    except ReferenceError: log.info("R3: %s" % sys.exc_info()[1])
+   #callable = bool(getattr(refobj, '__call__', None))
+    if type(obj) is CallableProxyType: callable = True
+    else: callable = False
+    pickler.save_reduce(_create_weakproxy, (refobj, callable), obj=obj)
+    return
+
+@register(ModuleType)
+def save_module(pickler, obj):
+    if False: #_use_diff:
+        if obj.__name__ != "dill":
+            try:
+                changed = diff.whats_changed(obj, seen=pickler._diff_cache)[0]
+            except RuntimeError:  # not memorised module, probably part of dill
+                pass
+            else:
+                log.info("M1: %s with diff" % obj)
+                log.info("Diff: %s", changed.keys())
+                pickler.save_reduce(_import_module, (obj.__name__,), obj=obj,
+                                    state=changed)
+                return
+
+        log.info("M2: %s" % obj)
+        pickler.save_reduce(_import_module, (obj.__name__,), obj=obj)
+    else:
+        # if a module file name starts with prefx, it should be a builtin
+        # module, so should be pickled as a reference
+        prefix = getattr(sys, "base_prefix", sys.prefix)
+        std_mod = getattr(obj, "__file__", prefix).startswith(prefix)
+        if obj.__name__ not in ("builtins", "dill") \
+           and not std_mod or is_dill(pickler) and obj is pickler._main_module:
+            log.info("M1: %s" % obj)
+            _main_dict = obj.__dict__.copy() #XXX: better no copy? option to copy?
+            [_main_dict.pop(item, None) for item in singletontypes
+                + ["__builtins__", "__loader__"]]
+            pickler.save_reduce(_import_module, (obj.__name__,), obj=obj,
+                                state=_main_dict)
+        else:
+            log.info("M2: %s" % obj)
+            pickler.save_reduce(_import_module, (obj.__name__,), obj=obj)
+        return
+    return
+
+@register(TypeType)
+def save_type(pickler, obj):
+    if obj in _typemap:
+        log.info("T1: %s" % obj)
+        pickler.save_reduce(_load_type, (_typemap[obj],), obj=obj)
+    elif obj.__module__ == '__main__':
+        try: # use StockPickler for special cases [namedtuple,]
+            [getattr(obj, attr) for attr in ('_fields','_asdict',
+                                             '_make','_replace')]
+            log.info("T6: %s" % obj)
+            StockPickler.save_global(pickler, obj)
+            return
+        except AttributeError: pass
+        if type(obj) == type:
+        #   try: # used when pickling the class as code (or the interpreter)
+            if is_dill(pickler) and not pickler._byref:
+                # thanks to Tom Stepleton pointing out pickler._session unneeded
+                log.info("T2: %s" % obj)
+                _dict = _dict_from_dictproxy(obj.__dict__)
+        #   except: # punt to StockPickler (pickle by class reference)
+            else:
+                log.info("T5: %s" % obj)
+                StockPickler.save_global(pickler, obj)
+                return
+        else:
+            log.info("T3: %s" % obj)
+            _dict = obj.__dict__
+       #print (_dict)
+       #print ("%s\n%s" % (type(obj), obj.__name__))
+       #print ("%s\n%s" % (obj.__bases__, obj.__dict__))
+        pickler.save_reduce(_create_type, (type(obj), obj.__name__,
+                                           obj.__bases__, _dict), obj=obj)
+    else:
+        log.info("T4: %s" % obj)
+       #print (obj.__dict__)
+       #print ("%s\n%s" % (type(obj), obj.__name__))
+       #print ("%s\n%s" % (obj.__bases__, obj.__dict__))
+        StockPickler.save_global(pickler, obj)
+    return
+
+# quick sanity checking
+def pickles(obj,exact=False,safe=False,**kwds):
+    """quick check if object pickles with dill"""
+    if safe: exceptions = (Exception,) # RuntimeError, ValueError
+    else:
+        exceptions = (TypeError, AssertionError, PicklingError, UnpicklingError)
+    try:
+        pik = copy(obj, **kwds)
+        try:
+            result = bool(pik.all() == obj.all())
+        except AttributeError:
+            result = pik == obj
+        if result: return True
+        if not exact:
+            return type(pik) == type(obj)
+        return False
+    except exceptions:
+        return False
+
+# use to protect against missing attributes
+def is_dill(pickler):
+    "check the dill-ness of your pickler"
+    return 'dill' in pickler.__module__
+   #return hasattr(pickler,'_main_module')
+
+def _extend():
+    """extend pickle with all of dill's registered types"""
+    # need to have pickle not choke on _main_module?  use is_dill(pickler)
+    for t,func in Pickler.dispatch.items():
+        try:
+            StockPickler.dispatch[t] = func
+        except: #TypeError, PicklingError, UnpicklingError
+            log.info("skip: %s" % t)
+        else: pass
+    return
+
+del diff, _use_diff, use_diff
+
+# EOF

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/info.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/info.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/info.py
new file mode 100644
index 0000000..65b48d4
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/info.py
@@ -0,0 +1,17 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################


Mime
View raw message