flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [2/8] flink git commit: [FLINK-2901] Move PythonAPI to flink-libraries
Date Thu, 12 Nov 2015 20:09:17 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/DataSet.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/DataSet.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/DataSet.py
deleted file mode 100644
index 390a08d..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/DataSet.py
+++ /dev/null
@@ -1,906 +0,0 @@
-# ###############################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-import inspect
-import copy
-import types as TYPES
-
-from flink.plan.Constants import _Fields, _Identifier, WriteMode, STRING
-from flink.functions.CoGroupFunction import CoGroupFunction
-from flink.functions.FilterFunction import FilterFunction
-from flink.functions.FlatMapFunction import FlatMapFunction
-from flink.functions.CrossFunction import CrossFunction
-from flink.functions.GroupReduceFunction import GroupReduceFunction
-from flink.functions.JoinFunction import JoinFunction
-from flink.functions.MapFunction import MapFunction
-from flink.functions.MapPartitionFunction import MapPartitionFunction
-from flink.functions.ReduceFunction import ReduceFunction
-
-def deduct_output_type(dataset):
-    skip = set([_Identifier.GROUP, _Identifier.SORT, _Identifier.UNION])
-    source = set([_Identifier.SOURCE_CSV, _Identifier.SOURCE_TEXT, _Identifier.SOURCE_VALUE])
-    default = set([_Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST, _Identifier.JOINT, _Identifier.JOINH, _Identifier.JOIN])
-
-    while True:
-        dataset_type = dataset[_Fields.IDENTIFIER]
-        if dataset_type in skip:
-            dataset = dataset[_Fields.PARENT]
-            continue
-        if dataset_type in source:
-            if dataset_type == _Identifier.SOURCE_TEXT:
-                return STRING
-            if dataset_type == _Identifier.SOURCE_VALUE:
-                return dataset[_Fields.VALUES][0]
-            if dataset_type == _Identifier.SOURCE_CSV:
-                return dataset[_Fields.TYPES]
-        if dataset_type == _Identifier.PROJECTION:
-            return tuple([deduct_output_type(dataset[_Fields.PARENT])[k] for k in dataset[_Fields.KEYS]])
-        if dataset_type in default:
-            if dataset[_Fields.OPERATOR] is not None: #udf-join/cross
-                return dataset[_Fields.TYPES]
-            if len(dataset[_Fields.PROJECTIONS]) == 0: #defaultjoin/-cross
-                return (deduct_output_type(dataset[_Fields.PARENT]), deduct_output_type(dataset[_Fields.OTHER]))
-            else: #projectjoin/-cross
-                t1 = deduct_output_type(dataset[_Fields.PARENT])
-                t2 = deduct_output_type(dataset[_Fields.OTHER])
-                out_type = []
-                for prj in dataset[_Fields.PROJECTIONS]:
-                    if len(prj[1]) == 0: #projection on non-tuple dataset
-                        if prj[0] == "first":
-                            out_type.append(t1)
-                        else:
-                            out_type.append(t2)
-                    else: #projection on tuple dataset
-                        for key in prj[1]:
-                            if prj[0] == "first":
-                                out_type.append(t1[key])
-                            else:
-                                out_type.append(t2[key])
-                return tuple(out_type)
-        return dataset[_Fields.TYPES]
-
-
-class Set(object):
-    def __init__(self, env, info, copy_set=False):
-        self._env = env
-        self._info = info
-        if not copy_set:
-            self._info[_Fields.ID] = env._counter
-            self._info[_Fields.BCVARS] = []
-            self._info[_Fields.CHILDREN] = []
-            self._info[_Fields.SINKS] = []
-            self._info[_Fields.NAME] = None
-            env._counter += 1
-
-    def output(self, to_error=False):
-        """
-        Writes a DataSet to the standard output stream (stdout).
-        """
-        child = dict()
-        child[_Fields.IDENTIFIER] = _Identifier.SINK_PRINT
-        child[_Fields.PARENT] = self._info
-        child[_Fields.TO_ERR] = to_error
-        self._info[_Fields.SINKS].append(child)
-        self._env._sinks.append(child)
-
-    def write_text(self, path, write_mode=WriteMode.NO_OVERWRITE):
-        """
-        Writes a DataSet as a text file to the specified location.
-
-        :param path: he path pointing to the location the text file is written to.
-        :param write_mode: OutputFormat.WriteMode value, indicating whether files should be overwritten
-        """
-        child = dict()
-        child[_Fields.IDENTIFIER] = _Identifier.SINK_TEXT
-        child[_Fields.PARENT] = self._info
-        child[_Fields.PATH] = path
-        child[_Fields.WRITE_MODE] = write_mode
-        self._info[_Fields.SINKS].append(child)
-        self._env._sinks.append(child)
-
-    def write_csv(self, path, line_delimiter="\n", field_delimiter=',', write_mode=WriteMode.NO_OVERWRITE):
-        """
-        Writes a Tuple DataSet as a CSV file to the specified location.
-
-        Note: Only a Tuple DataSet can written as a CSV file.
-        :param path: The path pointing to the location the CSV file is written to.
-        :param write_mode: OutputFormat.WriteMode value, indicating whether files should be overwritten
-        """
-        child = dict()
-        child[_Fields.IDENTIFIER] = _Identifier.SINK_CSV
-        child[_Fields.PATH] = path
-        child[_Fields.PARENT] = self._info
-        child[_Fields.DELIMITER_FIELD] = field_delimiter
-        child[_Fields.DELIMITER_LINE] = line_delimiter
-        child[_Fields.WRITE_MODE] = write_mode
-        self._info[_Fields.SINKS].append(child)
-        self._env._sinks.append(child)
-
-    def reduce_group(self, operator, types, combinable=False):
-        """
-        Applies a GroupReduce transformation.
-
-        The transformation calls a GroupReduceFunction once for each group of the DataSet, or one when applied on a
-        non-grouped DataSet.
-        The GroupReduceFunction can iterate over all elements of the DataSet and
-        emit any number of output elements including none.
-
-        :param operator: The GroupReduceFunction that is applied on the DataSet.
-        :param types: The type of the resulting DataSet.
-        :return:A GroupReduceOperator that represents the reduced DataSet.
-        """
-        if isinstance(operator, TYPES.FunctionType):
-            f = operator
-            operator = GroupReduceFunction()
-            operator.reduce = f
-        child = dict()
-        child_set = OperatorSet(self._env, child)
-        child[_Fields.IDENTIFIER] = _Identifier.GROUPREDUCE
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OPERATOR] = copy.deepcopy(operator)
-        child[_Fields.OPERATOR]._combine = False
-        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        child[_Fields.TYPES] = types
-        child[_Fields.COMBINE] = combinable
-        child[_Fields.COMBINEOP] = operator
-        child[_Fields.COMBINEOP]._combine = True
-        child[_Fields.NAME] = "PythonGroupReduce"
-        self._info[_Fields.CHILDREN].append(child)
-        self._env._sets.append(child)
-        return child_set
-
-
-class ReduceSet(Set):
-    def __init__(self, env, info, copy_set=False):
-        super(ReduceSet, self).__init__(env, info, copy_set)
-        if not copy_set:
-            self._is_chained = False
-
-    def reduce(self, operator):
-        """
-        Applies a Reduce transformation on a non-grouped DataSet.
-
-        The transformation consecutively calls a ReduceFunction until only a single element remains which is the result
-        of the transformation. A ReduceFunction combines two elements into one new element of the same type.
-
-        :param operator:The ReduceFunction that is applied on the DataSet.
-        :return:A ReduceOperator that represents the reduced DataSet.
-        """
-        if isinstance(operator, TYPES.FunctionType):
-            f = operator
-            operator = ReduceFunction()
-            operator.reduce = f
-        child = dict()
-        child_set = OperatorSet(self._env, child)
-        child[_Fields.IDENTIFIER] = _Identifier.REDUCE
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OPERATOR] = operator
-        child[_Fields.COMBINEOP] = operator
-        child[_Fields.COMBINE] = False
-        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        child[_Fields.NAME] = "PythonReduce"
-        child[_Fields.TYPES] = deduct_output_type(self._info)
-        self._info[_Fields.CHILDREN].append(child)
-        self._env._sets.append(child)
-        return child_set
-
-
-class DataSet(ReduceSet):
-    def __init__(self, env, info, copy_set=False):
-        super(DataSet, self).__init__(env, info, copy_set)
-
-    def project(self, *fields):
-        """
-        Applies a Project transformation on a Tuple DataSet.
-
-        Note: Only Tuple DataSets can be projected. The transformation projects each Tuple of the DataSet onto a
-        (sub)set of fields.
-
-        :param fields: The field indexes of the input tuples that are retained.
-                        The order of fields in the output tuple corresponds to the order of field indexes.
-        :return: The projected DataSet.
-
-        """
-        child = dict()
-        child_set = DataSet(self._env, child)
-        child[_Fields.IDENTIFIER] = _Identifier.PROJECTION
-        child[_Fields.PARENT] = self._info
-        child[_Fields.KEYS] = fields
-        self._info[_Fields.CHILDREN].append(child)
-        self._env._sets.append(child)
-        return child_set
-
-    def group_by(self, *keys):
-        """
-        Groups a Tuple DataSet using field position keys.
-        Note: Field position keys only be specified for Tuple DataSets.
-        The field position keys specify the fields of Tuples on which the DataSet is grouped.
-        This method returns an UnsortedGrouping on which one of the following grouping transformation can be applied.
-        sort_group() to get a SortedGrouping.
-        reduce() to apply a Reduce transformation.
-        group_reduce() to apply a GroupReduce transformation.
-
-        :param keys: One or more field positions on which the DataSet will be grouped.
-        :return:A Grouping on which a transformation needs to be applied to obtain a transformed DataSet.
-        """
-        child = dict()
-        child_chain = []
-        child_set = UnsortedGrouping(self._env, child, child_chain)
-        child[_Fields.IDENTIFIER] = _Identifier.GROUP
-        child[_Fields.PARENT] = self._info
-        child[_Fields.KEYS] = keys
-        child_chain.append(child)
-        self._info[_Fields.CHILDREN].append(child)
-        self._env._sets.append(child)
-        return child_set
-
-    def co_group(self, other_set):
-        """
-        Initiates a CoGroup transformation which combines the elements of two DataSets into on DataSet.
-
-        It groups each DataSet individually on a key and gives groups of both DataSets with equal keys together into a
-        CoGroupFunction. If a DataSet has a group with no matching key in the other DataSet,
-        the CoGroupFunction is called with an empty group for the non-existing group.
-        The CoGroupFunction can iterate over the elements of both groups and return any number of elements
-        including none.
-
-        :param other_set: The other DataSet of the CoGroup transformation.
-        :return:A CoGroupOperator to continue the definition of the CoGroup transformation.
-        """
-        child = dict()
-        other_set._info[_Fields.CHILDREN].append(child)
-        child_set = CoGroupOperatorWhere(self._env, child)
-        child[_Fields.IDENTIFIER] = _Identifier.COGROUP
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OTHER] = other_set._info
-        self._info[_Fields.CHILDREN].append(child)
-        return child_set
-
-    def cross(self, other_set):
-        """
-        Initiates a Cross transformation which combines the elements of two DataSets into one DataSet.
-
-        It builds all pair combinations of elements of both DataSets, i.e., it builds a Cartesian product.
-
-        :param other_set: The other DataSet with which this DataSet is crossed.
-        :return:A CrossOperator to continue the definition of the Cross transformation.
-        """
-        return self._cross(other_set, _Identifier.CROSS)
-
-    def cross_with_huge(self, other_set):
-        """
-        Initiates a Cross transformation which combines the elements of two DataSets into one DataSet.
-
-        It builds all pair combinations of elements of both DataSets, i.e., it builds a Cartesian product.
-        This method also gives the hint to the optimizer that
-        the second DataSet to cross is much larger than the first one.
-
-        :param other_set: The other DataSet with which this DataSet is crossed.
-        :return:A CrossOperator to continue the definition of the Cross transformation.
-        """
-        return self._cross(other_set, _Identifier.CROSSH)
-
-    def cross_with_tiny(self, other_set):
-        """
-        Initiates a Cross transformation which combines the elements of two DataSets into one DataSet.
-
-        It builds all pair combinations of elements of both DataSets, i.e., it builds a Cartesian product.
-        This method also gives the hint to the optimizer that
-        the second DataSet to cross is much smaller than the first one.
-
-        :param other_set: The other DataSet with which this DataSet is crossed.
-        :return:A CrossOperator to continue the definition of the Cross transformation.
-        """
-        return self._cross(other_set, _Identifier.CROSST)
-
-    def _cross(self, other_set, identifier):
-        child = dict()
-        child_set = CrossOperator(self._env, child)
-        child[_Fields.IDENTIFIER] = identifier
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OTHER] = other_set._info
-        child[_Fields.PROJECTIONS] = []
-        child[_Fields.OPERATOR] = None
-        child[_Fields.META] = None
-        self._info[_Fields.CHILDREN].append(child)
-        other_set._info[_Fields.CHILDREN].append(child)
-        self._env._sets.append(child)
-        return child_set
-
-    def filter(self, operator):
-        """
-        Applies a Filter transformation on a DataSet.
-
-        he transformation calls a FilterFunction for each element of the DataSet and retains only those element
-        for which the function returns true. Elements for which the function returns false are filtered.
-
-        :param operator: The FilterFunction that is called for each element of the DataSet.
-        :return:A FilterOperator that represents the filtered DataSet.
-        """
-        if isinstance(operator, TYPES.FunctionType):
-            f = operator
-            operator = FilterFunction()
-            operator.filter = f
-        child = dict()
-        child_set = OperatorSet(self._env, child)
-        child[_Fields.IDENTIFIER] = _Identifier.FILTER
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OPERATOR] = operator
-        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        child[_Fields.NAME] = "PythonFilter"
-        child[_Fields.TYPES] = deduct_output_type(self._info)
-        self._info[_Fields.CHILDREN].append(child)
-        self._env._sets.append(child)
-        return child_set
-
-    def flat_map(self, operator, types):
-        """
-        Applies a FlatMap transformation on a DataSet.
-
-        The transformation calls a FlatMapFunction for each element of the DataSet.
-        Each FlatMapFunction call can return any number of elements including none.
-
-        :param operator: The FlatMapFunction that is called for each element of the DataSet.
-        :param types: The type of the resulting DataSet.
-        :return:A FlatMapOperator that represents the transformed DataSe
-        """
-        if isinstance(operator, TYPES.FunctionType):
-            f = operator
-            operator = FlatMapFunction()
-            operator.flat_map = f
-        child = dict()
-        child_set = OperatorSet(self._env, child)
-        child[_Fields.IDENTIFIER] = _Identifier.FLATMAP
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OPERATOR] = operator
-        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        child[_Fields.TYPES] = types
-        child[_Fields.NAME] = "PythonFlatMap"
-        self._info[_Fields.CHILDREN].append(child)
-        self._env._sets.append(child)
-        return child_set
-
-    def join(self, other_set):
-        """
-        Initiates a Join transformation.
-
-        A Join transformation joins the elements of two DataSets on key equality.
-
-        :param other_set: The other DataSet with which this DataSet is joined
-        :return:A JoinOperator to continue the definition of the Join transformation.
-        """
-        return self._join(other_set, _Identifier.JOIN)
-
-    def join_with_huge(self, other_set):
-        """
-        Initiates a Join transformation.
-
-        A Join transformation joins the elements of two DataSets on key equality.
-        This method also gives the hint to the optimizer that
-        the second DataSet to join is much larger than the first one.
-
-        :param other_set: The other DataSet with which this DataSet is joined
-        :return:A JoinOperator to continue the definition of the Join transformation.
-        """
-        return self._join(other_set, _Identifier.JOINH)
-
-    def join_with_tiny(self, other_set):
-        """
-        Initiates a Join transformation.
-
-        A Join transformation joins the elements of two DataSets on key equality.
-        This method also gives the hint to the optimizer that
-        the second DataSet to join is much smaller than the first one.
-
-        :param other_set: The other DataSet with which this DataSet is joined
-        :return:A JoinOperator to continue the definition of the Join transformation.
-        """
-        return self._join(other_set, _Identifier.JOINT)
-
-    def _join(self, other_set, identifier):
-        child = dict()
-        child_set = JoinOperatorWhere(self._env, child)
-        child[_Fields.IDENTIFIER] = identifier
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OTHER] = other_set._info
-        child[_Fields.OPERATOR] = None
-        child[_Fields.META] = None
-        child[_Fields.PROJECTIONS] = []
-        self._info[_Fields.CHILDREN].append(child)
-        other_set._info[_Fields.CHILDREN].append(child)
-        self._env._sets.append(child)
-        return child_set
-
-    def map(self, operator, types):
-        """
-        Applies a Map transformation on a DataSet.
-
-        The transformation calls a MapFunction for each element of the DataSet.
-        Each MapFunction call returns exactly one element.
-
-        :param operator: The MapFunction that is called for each element of the DataSet.
-        :param types: The type of the resulting DataSet
-        :return:A MapOperator that represents the transformed DataSet
-        """
-        if isinstance(operator, TYPES.FunctionType):
-            f = operator
-            operator = MapFunction()
-            operator.map = f
-        child = dict()
-        child_set = OperatorSet(self._env, child)
-        child[_Fields.IDENTIFIER] = _Identifier.MAP
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OPERATOR] = operator
-        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        child[_Fields.TYPES] = types
-        child[_Fields.NAME] = "PythonMap"
-        self._info[_Fields.CHILDREN].append(child)
-        self._env._sets.append(child)
-        return child_set
-
-    def map_partition(self, operator, types):
-        """
-        Applies a MapPartition transformation on a DataSet.
-
-        The transformation calls a MapPartitionFunction once per parallel partition of the DataSet.
-        The entire partition is available through the given Iterator.
-        Each MapPartitionFunction may return an arbitrary number of results.
-
-        The number of elements that each instance of the MapPartition function
-        sees is non deterministic and depends on the degree of parallelism of the operation.
-
-        :param operator: The MapFunction that is called for each element of the DataSet.
-        :param types: The type of the resulting DataSet
-        :return:A MapOperator that represents the transformed DataSet
-        """
-        if isinstance(operator, TYPES.FunctionType):
-            f = operator
-            operator = MapPartitionFunction()
-            operator.map_partition = f
-        child = dict()
-        child_set = OperatorSet(self._env, child)
-        child[_Fields.IDENTIFIER] = _Identifier.MAPPARTITION
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OPERATOR] = operator
-        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        child[_Fields.TYPES] = types
-        child[_Fields.NAME] = "PythonMapPartition"
-        self._info[_Fields.CHILDREN].append(child)
-        self._env._sets.append(child)
-        return child_set
-
-    def union(self, other_set):
-        """
-        Creates a union of this DataSet with an other DataSet.
-
-        The other DataSet must be of the same data type.
-
-        :param other_set: The other DataSet which is unioned with the current DataSet.
-        :return:The resulting DataSet.
-        """
-        child = dict()
-        child_set = DataSet(self._env, child)
-        child[_Fields.IDENTIFIER] = _Identifier.UNION
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OTHER] = other_set._info
-        self._info[_Fields.CHILDREN].append(child)
-        other_set._info[_Fields.CHILDREN].append(child)
-        self._env._sets.append(child)
-        return child_set
-
-
-class OperatorSet(DataSet):
-    def __init__(self, env, info, copy_set=False):
-        super(OperatorSet, self).__init__(env, info, copy_set)
-
-    def with_broadcast_set(self, name, set):
-        child = dict()
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OTHER] = set._info
-        child[_Fields.NAME] = name
-        self._info[_Fields.BCVARS].append(child)
-        set._info[_Fields.CHILDREN].append(child)
-        self._env._broadcast.append(child)
-        return self
-
-
-class Grouping(object):
-    def __init__(self, env, info, child_chain):
-        self._env = env
-        self._child_chain = child_chain
-        self._info = info
-        info[_Fields.ID] = env._counter
-        info[_Fields.CHILDREN] = []
-        info[_Fields.SINKS] = []
-        env._counter += 1
-
-    def reduce_group(self, operator, types, combinable=False):
-        """
-        Applies a GroupReduce transformation.
-
-        The transformation calls a GroupReduceFunction once for each group of the DataSet, or one when applied on a
-        non-grouped DataSet.
-        The GroupReduceFunction can iterate over all elements of the DataSet and
-        emit any number of output elements including none.
-
-        :param operator: The GroupReduceFunction that is applied on the DataSet.
-        :param types: The type of the resulting DataSet.
-        :return:A GroupReduceOperator that represents the reduced DataSet.
-        """
-        if isinstance(operator, TYPES.FunctionType):
-            f = operator
-            operator = GroupReduceFunction()
-            operator.reduce = f
-        operator._set_grouping_keys(self._child_chain[0][_Fields.KEYS])
-        operator._set_sort_ops([(x[_Fields.FIELD], x[_Fields.ORDER]) for x in self._child_chain[1:]])
-        child = dict()
-        child_set = OperatorSet(self._env, child)
-        child[_Fields.IDENTIFIER] = _Identifier.GROUPREDUCE
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OPERATOR] = copy.deepcopy(operator)
-        child[_Fields.OPERATOR]._combine = False
-        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        child[_Fields.TYPES] = types
-        child[_Fields.COMBINE] = combinable
-        child[_Fields.COMBINEOP] = operator
-        child[_Fields.COMBINEOP]._combine = True
-        child[_Fields.NAME] = "PythonGroupReduce"
-        self._info[_Fields.CHILDREN].append(child)
-        self._env._sets.append(child)
-
-        return child_set
-
-    def sort_group(self, field, order):
-        """
-        Sorts Tuple elements within a group on the specified field in the specified Order.
-
-        Note: Only groups of Tuple elements can be sorted.
-        Groups can be sorted by multiple fields by chaining sort_group() calls.
-
-        :param field:The Tuple field on which the group is sorted.
-        :param order: The Order in which the specified Tuple field is sorted. See DataSet.Order.
-        :return:A SortedGrouping with specified order of group element.
-        """
-        child = dict()
-        child_set = SortedGrouping(self._env, child, self._child_chain)
-        child[_Fields.IDENTIFIER] = _Identifier.SORT
-        child[_Fields.PARENT] = self._info
-        child[_Fields.FIELD] = field
-        child[_Fields.ORDER] = order
-        self._info[_Fields.CHILDREN].append(child)
-        self._child_chain.append(child)
-        self._env._sets.append(child)
-        return child_set
-
-
-class UnsortedGrouping(Grouping):
-    def __init__(self, env, info, child_chain):
-        super(UnsortedGrouping, self).__init__(env, info, child_chain)
-
-    def reduce(self, operator):
-        """
-        Applies a Reduce transformation on a non-grouped DataSet.
-
-        The transformation consecutively calls a ReduceFunction until only a single element remains which is the result
-        of the transformation. A ReduceFunction combines two elements into one new element of the same type.
-
-        :param operator:The ReduceFunction that is applied on the DataSet.
-        :return:A ReduceOperator that represents the reduced DataSet.
-        """
-        operator._set_grouping_keys(self._child_chain[0][_Fields.KEYS])
-        for i in self._child_chain:
-            self._env._sets.append(i)
-        child = dict()
-        child_set = OperatorSet(self._env, child)
-        child[_Fields.IDENTIFIER] = _Identifier.REDUCE
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OPERATOR] = copy.deepcopy(operator)
-        child[_Fields.OPERATOR]._combine = False
-        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        child[_Fields.COMBINE] = True
-        child[_Fields.COMBINEOP] = operator
-        child[_Fields.COMBINEOP]._combine = True
-        child[_Fields.NAME] = "PythonReduce"
-        child[_Fields.TYPES] = deduct_output_type(self._info)
-        self._info[_Fields.CHILDREN].append(child)
-        self._env._sets.append(child)
-
-        return child_set
-
-
-class SortedGrouping(Grouping):
-    def __init__(self, env, info, child_chain):
-        super(SortedGrouping, self).__init__(env, info, child_chain)
-
-
-class CoGroupOperatorWhere(object):
-    def __init__(self, env, info):
-        self._env = env
-        self._info = info
-
-    def where(self, *fields):
-        """
-        Continues a CoGroup transformation.
-
-        Defines the Tuple fields of the first co-grouped DataSet that should be used as grouping keys.
-        Note: Fields can only be selected as grouping keys on Tuple DataSets.
-
-        :param fields: The indexes of the Tuple fields of the first co-grouped DataSets that should be used as keys.
-        :return: An incomplete CoGroup transformation.
-        """
-        self._info[_Fields.KEY1] = fields
-        return CoGroupOperatorTo(self._env, self._info)
-
-
-class CoGroupOperatorTo(object):
-    def __init__(self, env, info):
-        self._env = env
-        self._info = info
-
-    def equal_to(self, *fields):
-        """
-        Continues a CoGroup transformation.
-
-        Defines the Tuple fields of the second co-grouped DataSet that should be used as grouping keys.
-        Note: Fields can only be selected as grouping keys on Tuple DataSets.
-
-        :param fields: The indexes of the Tuple fields of the second co-grouped DataSet that should be used as keys.
-        :return: An incomplete CoGroup transformation.
-        """
-        self._info[_Fields.KEY2] = fields
-        return CoGroupOperatorUsing(self._env, self._info)
-
-
-class CoGroupOperatorUsing(object):
-    def __init__(self, env, info):
-        self._env = env
-        self._info = info
-
-    def using(self, operator, types):
-        """
-        Finalizes a CoGroup transformation.
-
-        Applies a CoGroupFunction to groups of elements with identical keys.
-        Each CoGroupFunction call returns an arbitrary number of keys.
-
-        :param operator: The CoGroupFunction that is called for all groups of elements with identical keys.
-        :param types: The type of the resulting DataSet.
-        :return:An CoGroupOperator that represents the co-grouped result DataSet.
-        """
-        if isinstance(operator, TYPES.FunctionType):
-            f = operator
-            operator = CoGroupFunction()
-            operator.co_group = f
-        new_set = OperatorSet(self._env, self._info)
-        operator._keys1 = self._info[_Fields.KEY1]
-        operator._keys2 = self._info[_Fields.KEY2]
-        self._info[_Fields.OPERATOR] = operator
-        self._info[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        self._info[_Fields.TYPES] = types
-        self._info[_Fields.NAME] = "PythonCoGroup"
-        self._env._sets.append(self._info)
-        return new_set
-
-
-class JoinOperatorWhere(object):
-    def __init__(self, env, info):
-        self._env = env
-        self._info = info
-
-    def where(self, *fields):
-        """
-        Continues a Join transformation.
-
-        Defines the Tuple fields of the first join DataSet that should be used as join keys.
-        Note: Fields can only be selected as join keys on Tuple DataSets.
-
-        :param fields: The indexes of the Tuple fields of the first join DataSets that should be used as keys.
-        :return:An incomplete Join transformation.
-
-        """
-        self._info[_Fields.KEY1] = fields
-        return JoinOperatorTo(self._env, self._info)
-
-
-class JoinOperatorTo(object):
-    def __init__(self, env, info):
-        self._env = env
-        self._info = info
-
-    def equal_to(self, *fields):
-        """
-        Continues a Join transformation.
-
-        Defines the Tuple fields of the second join DataSet that should be used as join keys.
-        Note: Fields can only be selected as join keys on Tuple DataSets.
-
-        :param fields:The indexes of the Tuple fields of the second join DataSet that should be used as keys.
-        :return:An incomplete Join Transformation.
-        """
-        self._info[_Fields.KEY2] = fields
-        return JoinOperator(self._env, self._info)
-
-
-class JoinOperatorProjection(DataSet):
-    def __init__(self, env, info):
-        super(JoinOperatorProjection, self).__init__(env, info)
-
-    def project_first(self, *fields):
-        """
-        Initiates a ProjectJoin transformation.
-
-        Projects the first join input.
-        If the first join input is a Tuple DataSet, fields can be selected by their index.
-        If the first join input is not a Tuple DataSet, no parameters should be passed.
-
-        :param fields: The indexes of the selected fields.
-        :return: An incomplete JoinProjection.
-        """
-        self._info[_Fields.PROJECTIONS].append(("first", fields))
-        return self
-
-    def project_second(self, *fields):
-        """
-        Initiates a ProjectJoin transformation.
-
-        Projects the second join input.
-        If the second join input is a Tuple DataSet, fields can be selected by their index.
-        If the second join input is not a Tuple DataSet, no parameters should be passed.
-
-        :param fields: The indexes of the selected fields.
-        :return: An incomplete JoinProjection.
-        """
-        self._info[_Fields.PROJECTIONS].append(("second", fields))
-        return self
-
-
-class JoinOperator(DataSet):
-    def __init__(self, env, info):
-        super(JoinOperator, self).__init__(env, info)
-        self._info[_Fields.TYPES] = None
-
-    def project_first(self, *fields):
-        """
-        Initiates a ProjectJoin transformation.
-
-        Projects the first join input.
-        If the first join input is a Tuple DataSet, fields can be selected by their index.
-        If the first join input is not a Tuple DataSet, no parameters should be passed.
-
-        :param fields: The indexes of the selected fields.
-        :return: An incomplete JoinProjection.
-        """
-        return JoinOperatorProjection(self._env, self._info).project_first(*fields)
-
-    def project_second(self, *fields):
-        """
-        Initiates a ProjectJoin transformation.
-
-        Projects the second join input.
-        If the second join input is a Tuple DataSet, fields can be selected by their index.
-        If the second join input is not a Tuple DataSet, no parameters should be passed.
-
-        :param fields: The indexes of the selected fields.
-        :return: An incomplete JoinProjection.
-        """
-        return JoinOperatorProjection(self._env, self._info).project_second(*fields)
-
-    def using(self, operator, types):
-        """
-        Finalizes a Join transformation.
-
-        Applies a JoinFunction to each pair of joined elements. Each JoinFunction call returns exactly one element.
-
-        :param operator:The JoinFunction that is called for each pair of joined elements.
-        :param types:
-        :return:An Set that represents the joined result DataSet.
-        """
-        if isinstance(operator, TYPES.FunctionType):
-            f = operator
-            operator = JoinFunction()
-            operator.join = f
-        self._info[_Fields.OPERATOR] = operator
-        self._info[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        self._info[_Fields.TYPES] = types
-        self._info[_Fields.NAME] = "PythonJoin"
-        self._env._sets.append(self._info)
-        return OperatorSet(self._env, self._info, copy_set=True)
-
-
-class CrossOperatorProjection(DataSet):
-    def __init__(self, env, info):
-        super(CrossOperatorProjection, self).__init__(env, info)
-
-    def project_first(self, *fields):
-        """
-        Initiates a ProjectCross transformation.
-
-        Projects the first join input.
-        If the first join input is a Tuple DataSet, fields can be selected by their index.
-        If the first join input is not a Tuple DataSet, no parameters should be passed.
-
-        :param fields: The indexes of the selected fields.
-        :return: An incomplete CrossProjection.
-        """
-        self._info[_Fields.PROJECTIONS].append(("first", fields))
-        return self
-
-    def project_second(self, *fields):
-        """
-        Initiates a ProjectCross transformation.
-
-        Projects the second join input.
-        If the second join input is a Tuple DataSet, fields can be selected by their index.
-        If the second join input is not a Tuple DataSet, no parameters should be passed.
-
-        :param fields: The indexes of the selected fields.
-        :return: An incomplete CrossProjection.
-        """
-        self._info[_Fields.PROJECTIONS].append(("second", fields))
-        return self
-
-
-class CrossOperator(DataSet):
-    def __init__(self, env, info):
-        super(CrossOperator, self).__init__(env, info)
-        info[_Fields.TYPES] = None
-
-    def project_first(self, *fields):
-        """
-        Initiates a ProjectCross transformation.
-
-        Projects the first join input.
-        If the first join input is a Tuple DataSet, fields can be selected by their index.
-        If the first join input is not a Tuple DataSet, no parameters should be passed.
-
-        :param fields: The indexes of the selected fields.
-        :return: An incomplete CrossProjection.
-        """
-        return CrossOperatorProjection(self._env, self._info).project_first(*fields)
-
-    def project_second(self, *fields):
-        """
-        Initiates a ProjectCross transformation.
-
-        Projects the second join input.
-        If the second join input is a Tuple DataSet, fields can be selected by their index.
-        If the second join input is not a Tuple DataSet, no parameters should be passed.
-
-        :param fields: The indexes of the selected fields.
-        :return: An incomplete CrossProjection.
-        """
-        return CrossOperatorProjection(self._env, self._info).project_second(*fields)
-
-    def using(self, operator, types):
-        """
-        Finalizes a Cross transformation.
-
-        Applies a CrossFunction to each pair of joined elements. Each CrossFunction call returns exactly one element.
-
-        :param operator:The CrossFunction that is called for each pair of joined elements.
-        :param types: The type of the resulting DataSet.
-        :return:An Set that represents the joined result DataSet.
-        """
-        if isinstance(operator, TYPES.FunctionType):
-            f = operator
-            operator = CrossFunction()
-            operator.cross = f
-        self._info[_Fields.OPERATOR] = operator
-        self._info[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        self._info[_Fields.TYPES] = types
-        self._info[_Fields.NAME] = "PythonCross"
-        return OperatorSet(self._env, self._info, copy_set=True)

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/Environment.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/Environment.py
deleted file mode 100644
index 236eda4..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/Environment.py
+++ /dev/null
@@ -1,345 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-from flink.connection import Connection
-from flink.connection import Collector
-from flink.plan.DataSet import DataSet
-from flink.plan.Constants import _Fields, _Identifier
-from flink.utilities import Switch
-import copy
-import sys
-
-
-def get_environment():
-    """
-    Creates an execution environment that represents the context in which the program is currently executed.
-    
-    :return:The execution environment of the context in which the program is executed.
-    """
-    return Environment()
-
-
-class Environment(object):
-    def __init__(self):
-        # util
-        self._counter = 0
-
-        #parameters
-        self._parameters = []
-
-        #sets
-        self._sources = []
-        self._sets = []
-        self._sinks = []
-
-        #specials
-        self._broadcast = []
-
-    def read_csv(self, path, types, line_delimiter="\n", field_delimiter=','):
-        """
-        Create a DataSet that represents the tuples produced by reading the given CSV file.
-
-        :param path: The path of the CSV file.
-        :param types: Specifies the types for the CSV fields.
-        :return:A CsvReader that can be used to configure the CSV input.
-        """
-        child = dict()
-        child_set = DataSet(self, child)
-        child[_Fields.IDENTIFIER] = _Identifier.SOURCE_CSV
-        child[_Fields.DELIMITER_LINE] = line_delimiter
-        child[_Fields.DELIMITER_FIELD] = field_delimiter
-        child[_Fields.PATH] = path
-        child[_Fields.TYPES] = types
-        self._sources.append(child)
-        return child_set
-
-    def read_text(self, path):
-        """
-        Creates a DataSet that represents the Strings produced by reading the given file line wise.
-
-        The file will be read with the system's default character set.
-
-        :param path: The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
-        :return: A DataSet that represents the data read from the given file as text lines.
-        """
-        child = dict()
-        child_set = DataSet(self, child)
-        child[_Fields.IDENTIFIER] = _Identifier.SOURCE_TEXT
-        child[_Fields.PATH] = path
-        self._sources.append(child)
-        return child_set
-
-    def from_elements(self, *elements):
-        """
-        Creates a new data set that contains the given elements.
-
-        The elements must all be of the same type, for example, all of the String or Integer.
-        The sequence of elements must not be empty.
-
-        :param elements: The elements to make up the data set.
-        :return: A DataSet representing the given list of elements.
-        """
-        child = dict()
-        child_set = DataSet(self, child)
-        child[_Fields.IDENTIFIER] = _Identifier.SOURCE_VALUE
-        child[_Fields.VALUES] = elements
-        self._sources.append(child)
-        return child_set
-
-    def set_degree_of_parallelism(self, degree):
-        """
-        Sets the degree of parallelism (DOP) for operations executed through this environment.
-
-        Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with x parallel instances.
-
-        :param degreeOfParallelism: The degree of parallelism
-        """
-        self._parameters.append(("dop", degree))
-
-    def execute(self, local=False, debug=False):
-        """
-        Triggers the program execution.
-
-        The environment will execute all parts of the program that have resulted in a "sink" operation.
-        """
-        if debug:
-            local = True
-        self._parameters.append(("mode", local))
-        self._parameters.append(("debug", debug))
-        self._optimize_plan()
-
-        plan_mode = sys.stdin.readline().rstrip('\n') == "plan"
-
-        if plan_mode:
-            output_path = sys.stdin.readline().rstrip('\n')
-            self._connection = Connection.OneWayBusyBufferingMappedFileConnection(output_path)
-            self._collector = Collector.TypedCollector(self._connection)
-            self._send_plan()
-            self._connection._write_buffer()
-        else:
-            import struct
-            operator = None
-            try:
-                port = int(sys.stdin.readline().rstrip('\n'))
-
-                id = int(sys.stdin.readline().rstrip('\n'))
-                input_path = sys.stdin.readline().rstrip('\n')
-                output_path = sys.stdin.readline().rstrip('\n')
-
-                operator = None
-                for set in self._sets:
-                    if set[_Fields.ID] == id:
-                        operator = set[_Fields.OPERATOR]
-                    if set[_Fields.ID] == -id:
-                        operator = set[_Fields.COMBINEOP]
-                operator._configure(input_path, output_path, port)
-                operator._go()
-                sys.stdout.flush()
-                sys.stderr.flush()
-            except:
-                sys.stdout.flush()
-                sys.stderr.flush()
-                if operator is not None:
-                    operator._connection._socket.send(struct.pack(">i", -2))
-                raise
-
-    def _optimize_plan(self):
-        self._find_chains()
-
-    def _find_chains(self):
-        udf = set([_Identifier.MAP, _Identifier.FLATMAP, _Identifier.FILTER, _Identifier.MAPPARTITION,
-                   _Identifier.GROUPREDUCE, _Identifier.REDUCE, _Identifier.COGROUP,
-                   _Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST,
-                   _Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT])
-        chainable = set([_Identifier.MAP, _Identifier.FILTER, _Identifier.FLATMAP, _Identifier.GROUPREDUCE, _Identifier.REDUCE])
-        multi_input = set([_Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT, _Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST, _Identifier.COGROUP, _Identifier.UNION])
-        x = len(self._sets) - 1
-        while x > -1:
-            child = self._sets[x]
-            child_type = child[_Fields.IDENTIFIER]
-            if child_type in chainable:
-                parent = child[_Fields.PARENT]
-                parent_type = parent[_Fields.IDENTIFIER]
-                if len(parent[_Fields.SINKS]) == 0:
-                    if child_type == _Identifier.GROUPREDUCE or child_type == _Identifier.REDUCE:
-                        if child[_Fields.COMBINE]:
-                            while parent_type == _Identifier.GROUP or parent_type == _Identifier.SORT:
-                                parent = parent[_Fields.PARENT]
-                                parent_type = parent[_Fields.IDENTIFIER]
-                            if parent_type in udf and len(parent[_Fields.CHILDREN]) == 1:
-                                if parent[_Fields.OPERATOR] is not None:
-                                    function = child[_Fields.COMBINEOP]
-                                    parent[_Fields.OPERATOR]._chain(function)
-                                    child[_Fields.COMBINE] = False
-                                    parent[_Fields.NAME] += " -> PythonCombine"
-                                    for bcvar in child[_Fields.BCVARS]:
-                                        bcvar_copy = copy.deepcopy(bcvar)
-                                        bcvar_copy[_Fields.PARENT] = parent
-                                        self._broadcast.append(bcvar_copy)
-                    else:
-                        if parent_type in udf and len(parent[_Fields.CHILDREN]) == 1:
-                            parent_op = parent[_Fields.OPERATOR]
-                            if parent_op is not None:
-                                function = child[_Fields.OPERATOR]
-                                parent_op._chain(function)
-                                parent[_Fields.NAME] += " -> " + child[_Fields.NAME]
-                                parent[_Fields.TYPES] = child[_Fields.TYPES]
-                                for grand_child in child[_Fields.CHILDREN]:
-                                    if grand_child[_Fields.IDENTIFIER] in multi_input:
-                                        if grand_child[_Fields.PARENT][_Fields.ID] == child[_Fields.ID]:
-                                            grand_child[_Fields.PARENT] = parent
-                                        else:
-                                            grand_child[_Fields.OTHER] = parent
-                                    else:
-                                        grand_child[_Fields.PARENT] = parent
-                                        parent[_Fields.CHILDREN].append(grand_child)
-                                parent[_Fields.CHILDREN].remove(child)
-                                for sink in child[_Fields.SINKS]:
-                                    sink[_Fields.PARENT] = parent
-                                    parent[_Fields.SINKS].append(sink)
-                                for bcvar in child[_Fields.BCVARS]:
-                                    bcvar[_Fields.PARENT] = parent
-                                    parent[_Fields.BCVARS].append(bcvar)
-                                self._remove_set((child))
-            x -= 1
-
-    def _remove_set(self, set):
-        self._sets[:] = [s for s in self._sets if s[_Fields.ID]!=set[_Fields.ID]]
-
-    def _send_plan(self):
-        self._send_parameters()
-        self._collector.collect(len(self._sources) + len(self._sets) + len(self._sinks) + len(self._broadcast))
-        self._send_sources()
-        self._send_operations()
-        self._send_sinks()
-        self._send_broadcast()
-
-    def _send_parameters(self):
-        self._collector.collect(len(self._parameters))
-        for parameter in self._parameters:
-            self._collector.collect(parameter)
-
-    def _send_sources(self):
-        for source in self._sources:
-            identifier = source[_Fields.IDENTIFIER]
-            collect = self._collector.collect
-            collect(identifier)
-            collect(source[_Fields.ID])
-            for case in Switch(identifier):
-                if case(_Identifier.SOURCE_CSV):
-                    collect(source[_Fields.PATH])
-                    collect(source[_Fields.DELIMITER_FIELD])
-                    collect(source[_Fields.DELIMITER_LINE])
-                    collect(source[_Fields.TYPES])
-                    break
-                if case(_Identifier.SOURCE_TEXT):
-                    collect(source[_Fields.PATH])
-                    break
-                if case(_Identifier.SOURCE_VALUE):
-                    collect(len(source[_Fields.VALUES]))
-                    for value in source[_Fields.VALUES]:
-                        collect(value)
-                    break
-
-    def _send_operations(self):
-        collect = self._collector.collect
-        for set in self._sets:
-            identifier = set.get(_Fields.IDENTIFIER)
-            collect(set[_Fields.IDENTIFIER])
-            collect(set[_Fields.ID])
-            collect(set[_Fields.PARENT][_Fields.ID])
-            for case in Switch(identifier):
-                if case(_Identifier.SORT):
-                    collect(set[_Fields.FIELD])
-                    collect(set[_Fields.ORDER])
-                    break
-                if case(_Identifier.GROUP):
-                    collect(set[_Fields.KEYS])
-                    break
-                if case(_Identifier.COGROUP):
-                    collect(set[_Fields.OTHER][_Fields.ID])
-                    collect(set[_Fields.KEY1])
-                    collect(set[_Fields.KEY2])
-                    collect(set[_Fields.TYPES])
-                    collect(set[_Fields.NAME])
-                    break
-                if case(_Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST):
-                    collect(set[_Fields.OTHER][_Fields.ID])
-                    collect(set[_Fields.TYPES])
-                    collect(len(set[_Fields.PROJECTIONS]))
-                    for p in set[_Fields.PROJECTIONS]:
-                        collect(p[0])
-                        collect(p[1])
-                    collect(set[_Fields.NAME])
-                    break
-                if case(_Identifier.REDUCE, _Identifier.GROUPREDUCE):
-                    collect(set[_Fields.TYPES])
-                    collect(set[_Fields.COMBINE])
-                    collect(set[_Fields.NAME])
-                    break
-                if case(_Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT):
-                    collect(set[_Fields.KEY1])
-                    collect(set[_Fields.KEY2])
-                    collect(set[_Fields.OTHER][_Fields.ID])
-                    collect(set[_Fields.TYPES])
-                    collect(len(set[_Fields.PROJECTIONS]))
-                    for p in set[_Fields.PROJECTIONS]:
-                        collect(p[0])
-                        collect(p[1])
-                    collect(set[_Fields.NAME])
-                    break
-                if case(_Identifier.MAP, _Identifier.MAPPARTITION, _Identifier.FLATMAP, _Identifier.FILTER):
-                    collect(set[_Fields.TYPES])
-                    collect(set[_Fields.NAME])
-                    break
-                if case(_Identifier.UNION):
-                    collect(set[_Fields.OTHER][_Fields.ID])
-                    break
-                if case(_Identifier.PROJECTION):
-                    collect(set[_Fields.KEYS])
-                    break
-                if case():
-                    raise KeyError("Environment._send_child_sets(): Invalid operation identifier: " + str(identifier))
-
-    def _send_sinks(self):
-        for sink in self._sinks:
-            identifier = sink[_Fields.IDENTIFIER]
-            collect = self._collector.collect
-            collect(identifier)
-            collect(sink[_Fields.PARENT][_Fields.ID])
-            for case in Switch(identifier):
-                if case(_Identifier.SINK_CSV):
-                    collect(sink[_Fields.PATH])
-                    collect(sink[_Fields.DELIMITER_FIELD])
-                    collect(sink[_Fields.DELIMITER_LINE])
-                    collect(sink[_Fields.WRITE_MODE])
-                    break;
-                if case(_Identifier.SINK_TEXT):
-                    collect(sink[_Fields.PATH])
-                    collect(sink[_Fields.WRITE_MODE])
-                    break
-                if case(_Identifier.SINK_PRINT):
-                    collect(sink[_Fields.TO_ERR])
-                    break
-
-    def _send_broadcast(self):
-        collect = self._collector.collect
-        for entry in self._broadcast:
-            collect(_Identifier.BROADCAST)
-            collect(entry[_Fields.PARENT][_Fields.ID])
-            collect(entry[_Fields.OTHER][_Fields.ID])
-            collect(entry[_Fields.NAME])
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/__init__.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/__init__.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/__init__.py
deleted file mode 100644
index d35bf39..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/utilities/__init__.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/utilities/__init__.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/utilities/__init__.py
deleted file mode 100644
index faae78a..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/utilities/__init__.py
+++ /dev/null
@@ -1,36 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-
-class Switch(object):
-    def __init__(self, value):
-        self.value = value
-        self.fall = False
-
-    def __iter__(self):
-        yield self.match
-        raise StopIteration
-
-    def match(self, *args):
-        if self.fall or not args:
-            return True
-        elif self.value in args:
-            self.fall = True
-            return True
-        else:
-            return False

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/setup.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/setup.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/setup.py
deleted file mode 100644
index 82447e9..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/setup.py
+++ /dev/null
@@ -1,33 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-from distutils.core import setup
-
-setup(
-    name='flink',
-    version='1.0',
-    packages=['flink',
-              'flink.connection',
-              'flink.functions',
-              'flink.plan',
-              'flink.utilities'],
-    url='http://flink.apache.org',
-    license='Licensed under the Apache License, Version 2.0',
-    author='',
-    author_email='',
-    description='Flink Python API'
-)

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/test/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinderTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/test/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinderTest.java b/flink-staging/flink-language-binding/flink-python/src/test/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinderTest.java
deleted file mode 100644
index 4295e93..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/test/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinderTest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.languagebinding.api.java.python;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.ARGUMENT_PYTHON_2;
-import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.ARGUMENT_PYTHON_3;
-import org.junit.Test;
-import org.junit.BeforeClass;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PythonPlanBinderTest {
-	private static final Logger LOG = LoggerFactory.getLogger(PythonPlanBinder.class);
-	
-	private static boolean python2Supported = true;
-	private static boolean python3Supported = true;
-	private static List<String> TEST_FILES;
-	
-	@BeforeClass
-	public static void setup() throws Exception {
-		findTestFiles();
-		checkPythonSupport();
-	}
-	
-	private static void findTestFiles() throws Exception {
-		TEST_FILES = new ArrayList();
-		FileSystem fs = FileSystem.getLocalFileSystem();
-		FileStatus[] status = fs.listStatus(
-				new Path(fs.getWorkingDirectory().toString()
-						+ "/src/test/python/org/apache/flink/languagebinding/api/python/flink/test"));
-		for (FileStatus f : status) {
-			String file = f.getPath().toString();
-			if (file.endsWith(".py")) {
-				TEST_FILES.add(file);
-			}
-		}
-	}
-	
-	private static void checkPythonSupport() {	
-		try {
-			Runtime.getRuntime().exec("python");
-		} catch (IOException ex) {
-			python2Supported = false;
-			LOG.info("No Python 2 runtime detected.");
-		}
-		try {
-			Runtime.getRuntime().exec("python3");
-		} catch (IOException ex) {
-			python3Supported = false;
-			LOG.info("No Python 3 runtime detected.");
-		}
-	}
-	
-	@Test
-	public void testPython2() throws Exception {
-		if (python2Supported) {
-			for (String file : TEST_FILES) {
-				LOG.info("testing " + file);
-				PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_2, file});
-			}
-		}
-	}
-	
-	@Test
-	public void testPython3() throws Exception {
-		if (python3Supported) {
-			for (String file : TEST_FILES) {
-				LOG.info("testing " + file);
-				PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_3, file});
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_csv
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_csv b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_csv
deleted file mode 100644
index a103a5c..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_csv
+++ /dev/null
@@ -1,2 +0,0 @@
-4,2,hello
-3,2,world

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_text
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_text b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_text
deleted file mode 100644
index e7be084..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_text
+++ /dev/null
@@ -1,2 +0,0 @@
-sup guys
-i am the world

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_csv.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_csv.py b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_csv.py
deleted file mode 100644
index e9fa822..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_csv.py
+++ /dev/null
@@ -1,31 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-from flink.plan.Environment import get_environment
-from flink.plan.Constants import INT, STRING
-from flink.plan.Constants import WriteMode
-
-if __name__ == "__main__":
-    env = get_environment()
-
-    d1 = env.read_csv("src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_csv", (INT, INT, STRING))
-
-    d1.write_csv("/tmp/flink/result", line_delimiter="\n", field_delimiter="|", write_mode=WriteMode.OVERWRITE)
-
-    env.set_degree_of_parallelism(1)
-
-    env.execute(local=True)

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py
deleted file mode 100644
index 2116d1f..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py
+++ /dev/null
@@ -1,264 +0,0 @@
-# ###############################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-################################################################################
-from flink.plan.Environment import get_environment
-from flink.functions.MapFunction import MapFunction
-from flink.functions.FlatMapFunction import FlatMapFunction
-from flink.functions.FilterFunction import FilterFunction
-from flink.functions.MapPartitionFunction import MapPartitionFunction
-from flink.functions.ReduceFunction import ReduceFunction
-from flink.functions.CrossFunction import CrossFunction
-from flink.functions.JoinFunction import JoinFunction
-from flink.functions.GroupReduceFunction import GroupReduceFunction
-from flink.functions.CoGroupFunction import CoGroupFunction
-from flink.plan.Constants import INT, STRING, FLOAT, BOOL, Order
-
-
-class Mapper(MapFunction):
-    def map(self, value):
-        return value * value
-
-
-class Filter(FilterFunction):
-    def __init__(self, limit):
-        super(Filter, self).__init__()
-        self.limit = limit
-
-    def filter(self, value):
-        return value > self.limit
-
-
-class FlatMap(FlatMapFunction):
-    def flat_map(self, value, collector):
-        collector.collect(value)
-        collector.collect(value * 2)
-
-
-class MapPartition(MapPartitionFunction):
-    def map_partition(self, iterator, collector):
-        for value in iterator:
-            collector.collect(value * 2)
-
-
-class Reduce(ReduceFunction):
-    def reduce(self, value1, value2):
-        return value1 + value2
-
-
-class Reduce2(ReduceFunction):
-    def reduce(self, value1, value2):
-        return (value1[0] + value2[0], value1[1] + value2[1], value1[2], value1[3] or value2[3])
-
-
-class Cross(CrossFunction):
-    def cross(self, value1, value2):
-        return (value1, value2[3])
-
-
-class MapperBcv(MapFunction):
-    def map(self, value):
-        factor = self.context.get_broadcast_variable("test")[0][0]
-        return value * factor
-
-
-class Join(JoinFunction):
-    def join(self, value1, value2):
-        if value1[3]:
-            return value2[0] + str(value1[0])
-        else:
-            return value2[0] + str(value1[1])
-
-
-class GroupReduce(GroupReduceFunction):
-    def reduce(self, iterator, collector):
-        if iterator.has_next():
-            i, f, s, b = iterator.next()
-            for value in iterator:
-                i += value[0]
-                f += value[1]
-                b |= value[3]
-            collector.collect((i, f, s, b))
-
-
-class GroupReduce2(GroupReduceFunction):
-    def reduce(self, iterator, collector):
-        for value in iterator:
-            collector.collect(value)
-
-
-class GroupReduce3(GroupReduceFunction):
-    def reduce(self, iterator, collector):
-        collector.collect(iterator.next())
-
-    def combine(self, iterator, collector):
-        if iterator.has_next():
-            v1 = iterator.next()
-        if iterator.has_next():
-            v2 = iterator.next()
-        if v1[0] < v2[0]:
-            collector.collect(v1)
-        else:
-            collector.collect(v2)
-
-
-class CoGroup(CoGroupFunction):
-    def co_group(self, iterator1, iterator2, collector):
-        while iterator1.has_next() and iterator2.has_next():
-            collector.collect((iterator1.next(), iterator2.next()))
-
-
-class Id(MapFunction):
-    def map(self, value):
-        return value
-
-
-class Verify(MapPartitionFunction):
-    def __init__(self, expected, name):
-        super(Verify, self).__init__()
-        self.expected = expected
-        self.name = name
-
-    def map_partition(self, iterator, collector):
-        index = 0
-        for value in iterator:
-            if value != self.expected[index]:
-                print(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value))
-                raise Exception(self.name + " failed!")
-            index += 1
-        collector.collect(self.name + " successful!")
-
-
-class Verify2(MapPartitionFunction):
-    def __init__(self, expected, name):
-        super(Verify2, self).__init__()
-        self.expected = expected
-        self.name = name
-
-    def map_partition(self, iterator, collector):
-        for value in iterator:
-            if value in self.expected:
-                try:
-                    self.expected.remove(value)
-                except Exception:
-                    raise Exception(self.name + " failed!")
-        collector.collect(self.name + " successful!")
-
-
-if __name__ == "__main__":
-    env = get_environment()
-
-    d1 = env.from_elements(1, 6, 12)
-
-    d2 = env.from_elements((1, 0.5, "hello", True), (2, 0.4, "world", False))
-
-    d3 = env.from_elements(("hello",), ("world",))
-
-    d4 = env.from_elements((1, 0.5, "hello", True), (1, 0.4, "hello", False), (1, 0.5, "hello", True), (2, 0.4, "world", False))
-
-    d5 = env.from_elements((4.4, 4.3, 1), (4.3, 4.4, 1), (4.2, 4.1, 3), (4.1, 4.1, 3))
-
-    d1 \
-        .map((lambda x: x * x), INT).map(Mapper(), INT) \
-        .map_partition(Verify([1, 1296, 20736], "Map"), STRING).output()
-
-    d1 \
-        .map(Mapper(), INT).map((lambda x: x * x), INT) \
-        .map_partition(Verify([1, 1296, 20736], "Chained Lambda"), STRING).output()
-
-    d1 \
-        .filter(Filter(5)).filter(Filter(8)) \
-        .map_partition(Verify([12], "Filter"), STRING).output()
-
-    d1 \
-        .flat_map(FlatMap(), INT).flat_map(FlatMap(), INT) \
-        .map_partition(Verify([1, 2, 2, 4, 6, 12, 12, 24, 12, 24, 24, 48], "FlatMap"), STRING).output()
-
-    d1 \
-        .map_partition(MapPartition(), INT) \
-        .map_partition(Verify([2, 12, 24], "MapPartition"), STRING).output()
-
-    d1 \
-        .reduce(Reduce()) \
-        .map_partition(Verify([19], "AllReduce"), STRING).output()
-
-    d4 \
-        .group_by(2).reduce(Reduce2()) \
-        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "CombineReduce"), STRING).output()
-
-    d4 \
-        .map(Id(), (INT, FLOAT, STRING, BOOL)).group_by(2).reduce(Reduce2()) \
-        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "ChainedReduce"), STRING).output()
-
-    d1 \
-        .map(MapperBcv(), INT).with_broadcast_set("test", d2) \
-        .map_partition(Verify([1, 6, 12], "Broadcast"), STRING).output()
-
-    d1 \
-        .cross(d2).using(Cross(), (INT, BOOL)) \
-        .map_partition(Verify([(1, True), (1, False), (6, True), (6, False), (12, True), (12, False)], "Cross"), STRING).output()
-
-    d1 \
-        .cross(d3) \
-        .map_partition(Verify([(1, ("hello",)), (1, ("world",)), (6, ("hello",)), (6, ("world",)), (12, ("hello",)), (12, ("world",))], "Default Cross"), STRING).output()
-
-    d2 \
-        .cross(d3).project_second(0).project_first(0, 1) \
-        .map_partition(Verify([("hello", 1, 0.5), ("world", 1, 0.5), ("hello", 2, 0.4), ("world", 2, 0.4)], "Project Cross"), STRING).output()
-
-    d2 \
-        .join(d3).where(2).equal_to(0).using(Join(), STRING) \
-        .map_partition(Verify(["hello1", "world0.4"], "Join"), STRING).output()
-
-    d2 \
-        .join(d3).where(2).equal_to(0).project_first(0, 3).project_second(0) \
-        .map_partition(Verify([(1, True, "hello"), (2, False, "world")], "Project Join"), STRING).output()
-
-    d2 \
-        .join(d3).where(2).equal_to(0) \
-        .map_partition(Verify([((1, 0.5, "hello", True), ("hello",)), ((2, 0.4, "world", False), ("world",))], "Default Join"), STRING).output()
-
-    d2 \
-        .project(0, 1, 2) \
-        .map_partition(Verify([(1, 0.5, "hello"), (2, 0.4, "world")], "Project"), STRING).output()
-
-    d2 \
-        .union(d4) \
-        .map_partition(Verify2([(1, 0.5, "hello", True), (2, 0.4, "world", False), (1, 0.5, "hello", True), (1, 0.4, "hello", False), (1, 0.5, "hello", True), (2, 0.4, "world", False)], "Union"), STRING).output()
-
-    d4 \
-        .group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), combinable=False) \
-        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "AllGroupReduce"), STRING).output()
-
-    d4 \
-        .map(Id(), (INT, FLOAT, STRING, BOOL)).group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), combinable=True) \
-        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "ChainedGroupReduce"), STRING).output()
-
-    d4 \
-        .group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), combinable=True) \
-        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "CombineGroupReduce"), STRING).output()
-
-    d5 \
-        .group_by(2).sort_group(0, Order.DESCENDING).sort_group(1, Order.ASCENDING).reduce_group(GroupReduce3(), (FLOAT, FLOAT, INT), combinable=True) \
-        .map_partition(Verify([(4.3, 4.4, 1), (4.1, 4.1, 3)], "ChainedSortedGroupReduce"), STRING).output()
-
-    d4 \
-        .co_group(d5).where(0).equal_to(2).using(CoGroup(), ((INT, FLOAT, STRING, BOOL), (FLOAT, FLOAT, INT))) \
-        .map_partition(Verify([((1, 0.5, "hello", True), (4.4, 4.3, 1)), ((1, 0.4, "hello", False), (4.3, 4.4, 1))], "CoGroup"), STRING).output()
-
-    env.set_degree_of_parallelism(1)
-
-    env.execute(local=True)

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_text.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_text.py b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_text.py
deleted file mode 100644
index 83b78fa..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_text.py
+++ /dev/null
@@ -1,30 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-from flink.plan.Environment import get_environment
-from flink.plan.Constants import WriteMode
-
-if __name__ == "__main__":
-    env = get_environment()
-
-    d1 = env.read_text("src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_text")
-
-    d1.write_text("/tmp/flink/result", WriteMode.OVERWRITE)
-
-    env.set_degree_of_parallelism(1)
-
-    env.execute(local=True)

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_type_deduction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_type_deduction.py b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_type_deduction.py
deleted file mode 100644
index a3b8d07..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_type_deduction.py
+++ /dev/null
@@ -1,63 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-from flink.plan.Environment import get_environment
-from flink.plan.Constants import _Fields
-from flink.plan.Constants import INT, STRING, BOOL, FLOAT
-import sys
-
-if __name__ == "__main__":
-    env = get_environment()
-
-    d1 = env.from_elements(("hello", 4, 3.2, True))
-
-    d2 = env.from_elements("world")
-
-    direct_from_source = d1.filter(lambda x:True)
-
-    if direct_from_source._info[_Fields.TYPES] != ("hello", 4, 3.2, True):
-        sys.exit("Error deducting type directly from source.")
-
-    from_common_udf = d1.map(lambda x: x[3], BOOL).filter(lambda x:True)
-
-    if from_common_udf._info[_Fields.TYPES] != BOOL:
-        sys.exit("Error deducting type from common udf.")
-
-    through_projection = d1.project(3, 2).filter(lambda x:True)
-
-    if through_projection._info[_Fields.TYPES] != (True, 3.2):
-        sys.exit("Error deducting type through projection.")
-
-    through_default_op = d1.cross(d2).filter(lambda x:True)
-
-    if through_default_op._info[_Fields.TYPES] != (("hello", 4, 3.2, True), "world"):
-        sys.exit("Error deducting type through default J/C." +str(through_default_op._info[_Fields.TYPES]))
-
-    through_prj_op = d1.cross(d2).project_first(1, 0).project_second().project_first(3, 2).filter(lambda x:True)
-
-    if through_prj_op._info[_Fields.TYPES] != (4, "hello", "world", True, 3.2):
-        sys.exit("Error deducting type through projection J/C. "+str(through_prj_op._info[_Fields.TYPES]))
-
-
-    env = get_environment()
-
-    msg = env.from_elements("Type deduction test successful.")
-
-    msg.output()
-
-    env.execute()
-

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_types.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_types.py b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_types.py
deleted file mode 100644
index f5f3ee4..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_types.py
+++ /dev/null
@@ -1,70 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-from flink.plan.Environment import get_environment
-from flink.functions.MapFunction import MapFunction
-from flink.functions.MapPartitionFunction import MapPartitionFunction
-from flink.plan.Constants import BOOL, INT, FLOAT, STRING, BYTES
-
-
-class Verify(MapPartitionFunction):
-    def __init__(self, expected, name):
-        super(Verify, self).__init__()
-        self.expected = expected
-        self.name = name
-
-    def map_partition(self, iterator, collector):
-        index = 0
-        for value in iterator:
-            if value != self.expected[index]:
-                print(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value))
-                raise Exception(self.name + " failed!")
-            index += 1
-        collector.collect(self.name + " successful!")
-
-
-class Id(MapFunction):
-    def map(self, value):
-        return value
-
-
-if __name__ == "__main__":
-    env = get_environment()
-
-    d1 = env.from_elements(bytearray(b"hello"), bytearray(b"world"))
-
-    d1.map(Id(), BYTES).map_partition(Verify([bytearray(b"hello"), bytearray(b"world")], "Byte"), STRING).output()
-
-    d2 = env.from_elements(1,2,3,4,5)
-
-    d2.map(Id(), INT).map_partition(Verify([1,2,3,4,5], "Int"), STRING).output()
-
-    d3 = env.from_elements(True, True, False)
-
-    d3.map(Id(), BOOL).map_partition(Verify([True, True, False], "Bool"), STRING).output()
-
-    d4 = env.from_elements(1.4, 1.7, 12312.23)
-
-    d4.map(Id(), FLOAT).map_partition(Verify([1.4, 1.7, 12312.23], "Float"), STRING).output()
-
-    d5 = env.from_elements("hello", "world")
-
-    d5.map(Id(), STRING).map_partition(Verify(["hello", "world"], "String"), STRING).output()
-
-    env.set_degree_of_parallelism(1)
-
-    env.execute(local=True)
\ No newline at end of file


Mime
View raw message