flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [2/3] flink git commit: [FLINK-2501] [py] Remove the need to specify types for transformations
Date Wed, 20 Jan 2016 13:51:21 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
new file mode 100644
index 0000000..6c83a61
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.python.api.streaming.util;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.python.api.types.CustomTypeWrapper;
+
+public class SerializationUtils {
+	public static final byte TYPE_BOOLEAN = (byte) 34;
+	public static final byte TYPE_BYTE = (byte) 33;
+	public static final byte TYPE_INTEGER = (byte) 32;
+	public static final byte TYPE_LONG = (byte) 31;
+	public static final byte TYPE_DOUBLE = (byte) 30;
+	public static final byte TYPE_FLOAT = (byte) 29;
+	public static final byte TYPE_STRING = (byte) 28;
+	public static final byte TYPE_BYTES = (byte) 27;
+	public static final byte TYPE_NULL = (byte) 26;
+
+	private enum SupportedTypes {
+		TUPLE, BOOLEAN, BYTE, BYTES, INTEGER, LONG, FLOAT, DOUBLE, STRING, NULL, CUSTOMTYPEWRAPPER
+	}
+
+	public static Serializer getSerializer(Object value) {
+		String className = value.getClass().getSimpleName().toUpperCase();
+		if (className.startsWith("TUPLE")) {
+			className = "TUPLE";
+		}
+		if (className.startsWith("BYTE[]")) {
+			className = "BYTES";
+		}
+		SupportedTypes type = SupportedTypes.valueOf(className);
+		switch (type) {
+			case TUPLE:
+				return new TupleSerializer((Tuple) value);
+			case BOOLEAN:
+				return new BooleanSerializer();
+			case BYTE:
+				return new ByteSerializer();
+			case BYTES:
+				return new BytesSerializer();
+			case INTEGER:
+				return new IntSerializer();
+			case LONG:
+				return new LongSerializer();
+			case STRING:
+				return new StringSerializer();
+			case FLOAT:
+				return new FloatSerializer();
+			case DOUBLE:
+				return new DoubleSerializer();
+			case NULL:
+				return new NullSerializer();
+			case CUSTOMTYPEWRAPPER:
+				return new CustomTypeWrapperSerializer((CustomTypeWrapper) value);
+			default:
+				throw new IllegalArgumentException("Unsupported Type encountered: " + type);
+		}
+	}
+
+	public static abstract class Serializer<IN> {
+		private byte[] typeInfo = null;
+
+		public byte[] serialize(IN value) {
+			if (typeInfo == null) {
+				typeInfo = new byte[getTypeInfoSize()];
+				ByteBuffer typeBuffer = ByteBuffer.wrap(typeInfo);
+				putTypeInfo(typeBuffer);
+			}
+			byte[] bytes = serializeWithoutTypeInfo(value);
+			byte[] total = new byte[typeInfo.length + bytes.length];
+			ByteBuffer.wrap(total).put(typeInfo).put(bytes);
+			return total;
+		}
+
+		public abstract byte[] serializeWithoutTypeInfo(IN value);
+
+		protected abstract void putTypeInfo(ByteBuffer buffer);
+
+		protected int getTypeInfoSize() {
+			return 1;
+		}
+	}
+
+	public static class CustomTypeWrapperSerializer extends Serializer<CustomTypeWrapper> {
+		private final byte type;
+
+		public CustomTypeWrapperSerializer(CustomTypeWrapper value) {
+			this.type = value.getType();
+		}
+
+		@Override
+		public byte[] serializeWithoutTypeInfo(CustomTypeWrapper value) {
+			byte[] result = new byte[4 + value.getData().length];
+			ByteBuffer.wrap(result).putInt(value.getData().length).put(value.getData());
+			return result;
+		}
+
+		@Override
+		public void putTypeInfo(ByteBuffer buffer) {
+			buffer.put(type);
+		}
+	}
+
+	public static class ByteSerializer extends Serializer<Byte> {
+		@Override
+		public byte[] serializeWithoutTypeInfo(Byte value) {
+			return new byte[]{value};
+		}
+
+		@Override
+		public void putTypeInfo(ByteBuffer buffer) {
+			buffer.put(TYPE_BYTE);
+		}
+	}
+
+	public static class BooleanSerializer extends Serializer<Boolean> {
+		@Override
+		public byte[] serializeWithoutTypeInfo(Boolean value) {
+			return new byte[]{value ? (byte) 1 : (byte) 0};
+		}
+
+		@Override
+		public void putTypeInfo(ByteBuffer buffer) {
+			buffer.put(TYPE_BOOLEAN);
+		}
+	}
+
+	public static class IntSerializer extends Serializer<Integer> {
+		@Override
+		public byte[] serializeWithoutTypeInfo(Integer value) {
+			byte[] data = new byte[4];
+			ByteBuffer.wrap(data).putInt(value);
+			return data;
+		}
+
+		@Override
+		public void putTypeInfo(ByteBuffer buffer) {
+			buffer.put(TYPE_INTEGER);
+		}
+	}
+
+	public static class LongSerializer extends Serializer<Long> {
+		@Override
+		public byte[] serializeWithoutTypeInfo(Long value) {
+			byte[] data = new byte[8];
+			ByteBuffer.wrap(data).putLong(value);
+			return data;
+		}
+
+		@Override
+		public void putTypeInfo(ByteBuffer buffer) {
+			buffer.put(TYPE_LONG);
+		}
+	}
+
+	public static class StringSerializer extends Serializer<String> {
+		@Override
+		public byte[] serializeWithoutTypeInfo(String value) {
+			byte[] string = value.getBytes();
+			byte[] data = new byte[4 + string.length];
+			ByteBuffer.wrap(data).putInt(string.length).put(string);
+			return data;
+		}
+
+		@Override
+		public void putTypeInfo(ByteBuffer buffer) {
+			buffer.put(TYPE_STRING);
+		}
+	}
+
+	public static class FloatSerializer extends Serializer<Float> {
+		@Override
+		public byte[] serializeWithoutTypeInfo(Float value) {
+			byte[] data = new byte[4];
+			ByteBuffer.wrap(data).putFloat(value);
+			return data;
+		}
+
+		@Override
+		public void putTypeInfo(ByteBuffer buffer) {
+			buffer.put(TYPE_FLOAT);
+		}
+	}
+
+	public static class DoubleSerializer extends Serializer<Double> {
+		@Override
+		public byte[] serializeWithoutTypeInfo(Double value) {
+			byte[] data = new byte[8];
+			ByteBuffer.wrap(data).putDouble(value);
+			return data;
+		}
+
+		@Override
+		public void putTypeInfo(ByteBuffer buffer) {
+			buffer.put(TYPE_DOUBLE);
+		}
+	}
+
+	public static class NullSerializer extends Serializer<Object> {
+		@Override
+		public byte[] serializeWithoutTypeInfo(Object value) {
+			return new byte[0];
+		}
+
+		@Override
+		public void putTypeInfo(ByteBuffer buffer) {
+			buffer.put(TYPE_NULL);
+		}
+	}
+
+	public static class BytesSerializer extends Serializer<byte[]> {
+		@Override
+		public byte[] serializeWithoutTypeInfo(byte[] value) {
+			byte[] data = new byte[4 + value.length];
+			ByteBuffer.wrap(data).putInt(value.length).put(value);
+			return data;
+		}
+
+		@Override
+		public void putTypeInfo(ByteBuffer buffer) {
+			buffer.put(TYPE_BYTES);
+		}
+	}
+
+	public static class TupleSerializer extends Serializer<Tuple> {
+		private final Serializer[] serializer;
+
+		public TupleSerializer(Tuple value) {
+			serializer = new Serializer[value.getArity()];
+			for (int x = 0; x < serializer.length; x++) {
+				serializer[x] = getSerializer(value.getField(x));
+			}
+		}
+
+		@Override
+		public byte[] serializeWithoutTypeInfo(Tuple value) {
+			ArrayList<byte[]> bits = new ArrayList();
+
+			int totalSize = 0;
+			for (int x = 0; x < serializer.length; x++) {
+				byte[] bit = serializer[x].serializeWithoutTypeInfo(value.getField(x));
+				bits.add(bit);
+				totalSize += bit.length;
+			}
+			int pointer = 0;
+			byte[] data = new byte[totalSize];
+			for (byte[] bit : bits) {
+				System.arraycopy(bit, 0, data, pointer, bit.length);
+				pointer += bit.length;
+			}
+			return data;
+		}
+
+		@Override
+		public void putTypeInfo(ByteBuffer buffer) {
+			buffer.put((byte) serializer.length);
+			for (Serializer s : serializer) {
+				s.putTypeInfo(buffer);
+			}
+		}
+
+		@Override
+		public int getTypeInfoSize() {
+			int size = 1;
+			for (Serializer s : serializer) {
+				size += s.getTypeInfoSize();
+			}
+			return size;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Collector.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Collector.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Collector.py
index b5674b9..4c0db32 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Collector.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Collector.py
@@ -16,11 +16,11 @@
 # limitations under the License.
 ################################################################################
 from struct import pack
-import sys
-
 from flink.connection.Constants import Types
-from flink.plan.Constants import _Dummy
 
+
+#=====Compatibility====================================================================================================
+import sys
 PY2 = sys.version_info[0] == 2
 PY3 = sys.version_info[0] == 3
 
@@ -30,57 +30,128 @@ else:
     stringtype = str
 
 
+#=====Collector========================================================================================================
 class Collector(object):
-    def __init__(self, con, env):
+    def __init__(self, con, env, info):
         self._connection = con
         self._serializer = None
         self._env = env
+        self._as_array = isinstance(info.types, bytearray)
 
     def _close(self):
         self._connection.send_end_signal()
 
     def collect(self, value):
-        self._serializer = _get_serializer(self._connection.write, value, self._env._types)
+        self._serializer = ArraySerializer(value, self._env._types) if self._as_array else KeyValuePairSerializer(value, self._env._types)
         self.collect = self._collect
         self.collect(value)
 
     def _collect(self, value):
-        self._connection.write(self._serializer.serialize(value))
+        serialized_value = self._serializer.serialize(value)
+        self._connection.write(serialized_value)
+
+
+class PlanCollector(object):
+    def __init__(self, con, env):
+        self._connection = con
+        self._env = env
+
+    def _close(self):
+        self._connection.send_end_signal()
+
+    def collect(self, value):
+        type = _get_type_info(value, self._env._types)
+        serializer = _get_serializer(value, self._env._types)
+        self._connection.write(b"".join([type, serializer.serialize(value)]))
+
+
+#=====Serializer=======================================================================================================
+class Serializer(object):
+    def serialize(self, value):
+        pass
+
+
+class KeyValuePairSerializer(Serializer):
+    def __init__(self, value, custom_types):
+        self._typeK = [_get_type_info(key, custom_types) for key in value[0]]
+        self._typeV = _get_type_info(value[1], custom_types)
+        self._typeK_length = [len(type) for type in self._typeK]
+        self._typeV_length = len(self._typeV)
+        self._serializerK = [_get_serializer(key, custom_types) for key in value[0]]
+        self._serializerV = _get_serializer(value[1], custom_types)
+
+    def serialize(self, value):
+        bits = [pack(">i", len(value[0]))[3:4]]
+        for i in range(len(value[0])):
+            x = self._serializerK[i].serialize(value[0][i])
+            bits.append(pack(">i", len(x) + self._typeK_length[i]))
+            bits.append(self._typeK[i])
+            bits.append(x)
+        v = self._serializerV.serialize(value[1])
+        bits.append(pack(">i", len(v) + self._typeV_length))
+        bits.append(self._typeV)
+        bits.append(v)
+        return b"".join(bits)
+
 
+class ArraySerializer(Serializer):
+    def __init__(self, value, custom_types):
+        self._type = _get_type_info(value, custom_types)
+        self._type_length = len(self._type)
+        self._serializer = _get_serializer(value, custom_types)
 
-def _get_serializer(write, value, custom_types):
+    def serialize(self, value):
+        serialized_value = self._serializer.serialize(value)
+        return b"".join([pack(">i", len(serialized_value) + self._type_length), self._type, serialized_value])
+
+
+def _get_type_info(value, custom_types):
     if isinstance(value, (list, tuple)):
-        write(Types.TYPE_TUPLE)
-        write(pack(">I", len(value)))
-        return TupleSerializer(write, value, custom_types)
+        return b"".join([pack(">i", len(value))[3:4], b"".join([_get_type_info(field, custom_types) for field in value])])
+    elif value is None:
+        return Types.TYPE_NULL
+    elif isinstance(value, stringtype):
+        return Types.TYPE_STRING
+    elif isinstance(value, bool):
+        return Types.TYPE_BOOLEAN
+    elif isinstance(value, int) or PY2 and isinstance(value, long):
+        return Types.TYPE_LONG
+    elif isinstance(value, bytearray):
+        return Types.TYPE_BYTES
+    elif isinstance(value, float):
+        return Types.TYPE_DOUBLE
+    else:
+        for entry in custom_types:
+            if isinstance(value, entry[1]):
+                return entry[0]
+        raise Exception("Unsupported Type encountered.")
+
+
+def _get_serializer(value, custom_types):
+    if isinstance(value, (list, tuple)):
+        return TupleSerializer(value, custom_types)
     elif value is None:
-        write(Types.TYPE_NULL)
         return NullSerializer()
     elif isinstance(value, stringtype):
-        write(Types.TYPE_STRING)
         return StringSerializer()
     elif isinstance(value, bool):
-        write(Types.TYPE_BOOLEAN)
         return BooleanSerializer()
     elif isinstance(value, int) or PY2 and isinstance(value, long):
-        write(Types.TYPE_LONG)
         return LongSerializer()
     elif isinstance(value, bytearray):
-        write(Types.TYPE_BYTES)
         return ByteArraySerializer()
     elif isinstance(value, float):
-        write(Types.TYPE_DOUBLE)
         return FloatSerializer()
     else:
         for entry in custom_types:
             if isinstance(value, entry[1]):
-                write(entry[0])
-                return CustomTypeSerializer(entry[2])
+                return CustomTypeSerializer(entry[0], entry[2])
         raise Exception("Unsupported Type encountered.")
 
 
-class CustomTypeSerializer(object):
-    def __init__(self, serializer):
+class CustomTypeSerializer(Serializer):
+    def __init__(self, id, serializer):
+        self._id = id
         self._serializer = serializer
 
     def serialize(self, value):
@@ -88,9 +159,9 @@ class CustomTypeSerializer(object):
         return b"".join([pack(">i",len(msg)), msg])
 
 
-class TupleSerializer(object):
-    def __init__(self, write, value, custom_types):
-        self.serializer = [_get_serializer(write, field, custom_types) for field in value]
+class TupleSerializer(Serializer):
+    def __init__(self, value, custom_types):
+        self.serializer = [_get_serializer(field, custom_types) for field in value]
 
     def serialize(self, value):
         bits = []
@@ -99,83 +170,33 @@ class TupleSerializer(object):
         return b"".join(bits)
 
 
-class BooleanSerializer(object):
+class BooleanSerializer(Serializer):
     def serialize(self, value):
         return pack(">?", value)
 
 
-class FloatSerializer(object):
+class FloatSerializer(Serializer):
     def serialize(self, value):
         return pack(">d", value)
 
 
-class LongSerializer(object):
+class LongSerializer(Serializer):
     def serialize(self, value):
         return pack(">q", value)
 
 
-class ByteArraySerializer(object):
+class ByteArraySerializer(Serializer):
     def serialize(self, value):
         value = bytes(value)
         return pack(">I", len(value)) + value
 
 
-class StringSerializer(object):
+class StringSerializer(Serializer):
     def serialize(self, value):
         value = value.encode("utf-8")
         return pack(">I", len(value)) + value
 
 
-class NullSerializer(object):
+class NullSerializer(Serializer):
     def serialize(self, value):
-        return b""
-
-
-class TypedCollector(object):
-    def __init__(self, con, env):
-        self._connection = con
-        self._env = env
-
-    def collect(self, value):
-        if not isinstance(value, (list, tuple)):
-            self._send_field(value)
-        else:
-            self._connection.write(Types.TYPE_TUPLE)
-            meta = pack(">I", len(value))
-            self._connection.write(bytes([meta[3]]) if PY3 else meta[3])
-            for field in value:
-                self.collect(field)
-
-    def _send_field(self, value):
-        if value is None:
-            self._connection.write(Types.TYPE_NULL)
-        elif isinstance(value, stringtype):
-            value = value.encode("utf-8")
-            size = pack(">I", len(value))
-            self._connection.write(b"".join([Types.TYPE_STRING, size, value]))
-        elif isinstance(value, bytes):
-            size = pack(">I", len(value))
-            self._connection.write(b"".join([Types.TYPE_BYTES, size, value]))
-        elif isinstance(value, bool):
-            data = pack(">?", value)
-            self._connection.write(b"".join([Types.TYPE_BOOLEAN, data]))
-        elif isinstance(value, int) or PY2 and isinstance(value, long):
-            data = pack(">q", value)
-            self._connection.write(b"".join([Types.TYPE_LONG, data]))
-        elif isinstance(value, float):
-            data = pack(">d", value)
-            self._connection.write(b"".join([Types.TYPE_DOUBLE, data]))
-        elif isinstance(value, bytearray):
-            value = bytes(value)
-            size = pack(">I", len(value))
-            self._connection.write(b"".join([Types.TYPE_BYTES, size, value]))
-        elif isinstance(value, _Dummy):
-            self._connection.write(pack(">i", 127)[3:])
-            self._connection.write(pack(">i", 0))
-        else:
-            for entry in self._env._types:
-                if isinstance(value, entry[1]):
-                    self._connection.write(entry[0])
-                    self._connection.write(CustomTypeSerializer(entry[2]).serialize(value))
-                    return
-            raise Exception("Unsupported Type encountered.")
\ No newline at end of file
+        return b""
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
index 680f495..293f5e9 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
@@ -115,6 +115,8 @@ class BufferingTCPMappedFileConnection(object):
             self._read_buffer()
         old_offset = self._input_offset
         self._input_offset += des_size
+        if self._input_offset > self._input_size:
+            raise Exception("BufferUnderFlowException")
         return self._input[old_offset:self._input_offset]
 
     def _read_buffer(self):
@@ -140,6 +142,12 @@ class BufferingTCPMappedFileConnection(object):
         self._input_offset = 0
         self._input = b""
 
+    def read_secondary(self, des_size):
+        return recv_all(self._socket, des_size)
+
+    def write_secondary(self, data):
+        self._socket.send(data)
+
 
 class TwinBufferingTCPMappedFileConnection(BufferingTCPMappedFileConnection):
     def __init__(self, input_file, output_file, port):

http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Constants.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Constants.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Constants.py
index 0ca2232..01a16fa 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Constants.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Constants.py
@@ -18,14 +18,15 @@
 
 
 class Types(object):
-    TYPE_TUPLE = b'\x0B'
-    TYPE_BOOLEAN = b'\x0A'
-    TYPE_BYTE = b'\x09'
-    TYPE_SHORT = b'\x08'
-    TYPE_INTEGER = b'\x07'
-    TYPE_LONG = b'\x06'
-    TYPE_DOUBLE = b'\x04'
-    TYPE_FLOAT = b'\x05'
-    TYPE_STRING = b'\x02'
-    TYPE_NULL = b'\x00'
-    TYPE_BYTES = b'\x01'
+    TYPE_ARRAY = b'\x3F'
+    TYPE_KEY_VALUE = b'\x3E'
+    TYPE_VALUE_VALUE = b'\x3D'
+    TYPE_BOOLEAN = b'\x22'
+    TYPE_BYTE = b'\x21'
+    TYPE_INTEGER = b'\x20'
+    TYPE_LONG = b'\x1F'
+    TYPE_DOUBLE = b'\x1E'
+    TYPE_FLOAT = b'\x1D'
+    TYPE_STRING = b'\x1C'
+    TYPE_BYTES = b'\x1B'
+    TYPE_NULL = b'\x1A'

http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Iterator.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Iterator.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Iterator.py
index 3425cfa..2532194 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Iterator.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Iterator.py
@@ -26,6 +26,7 @@ except:
 from flink.connection.Constants import Types
 
 
+#=====Iterator==========================================================================================================
 class ListIterator(defIter.Iterator):
     def __init__(self, values):
         super(ListIterator, self).__init__()
@@ -76,7 +77,7 @@ class GroupIterator(defIter.Iterator):
             else:
                 self.cur = None
                 self.empty = True
-            return tmp
+            return tmp[1]
         else:
             raise StopIteration
 
@@ -93,7 +94,7 @@ class GroupIterator(defIter.Iterator):
         self.empty = False
 
     def _extract_keys(self, x):
-        return [x[k] for k in self.keys]
+        return [x[0][k] for k in self.keys]
 
     def _extract_keys_id(self, x):
         return x
@@ -175,6 +176,7 @@ class Iterator(defIter.Iterator):
         self._group = group
         self._deserializer = None
         self._env = env
+        self._size = 0
 
     def __next__(self):
         return self.next()
@@ -184,8 +186,35 @@ class Iterator(defIter.Iterator):
 
     def next(self):
         if self.has_next():
+            custom_types = self._env._types
+            read = self._read
             if self._deserializer is None:
-                self._deserializer = _get_deserializer(self._group, self._connection.read, self._env._types)
+                type = read(1)
+                if type == Types.TYPE_ARRAY:
+                    key_des = _get_deserializer(read, custom_types)
+                    self._deserializer = ArrayDeserializer(key_des)
+                    return key_des.deserialize(read)
+                elif type == Types.TYPE_KEY_VALUE:
+                    size = ord(read(1))
+                    key_des = []
+                    keys = []
+                    for _ in range(size):
+                        new_d = _get_deserializer(read, custom_types)
+                        key_des.append(new_d)
+                        keys.append(new_d.deserialize(read))
+                    val_des = _get_deserializer(read, custom_types)
+                    val = val_des.deserialize(read)
+                    self._deserializer = KeyValueDeserializer(key_des, val_des)
+                    return (tuple(keys), val)
+                elif type == Types.TYPE_VALUE_VALUE:
+                    des1 = _get_deserializer(read, custom_types)
+                    field1 = des1.deserialize(read)
+                    des2 = _get_deserializer(read, custom_types)
+                    field2 = des2.deserialize(read)
+                    self._deserializer = ValueValueDeserializer(des1, des2)
+                    return (field1, field2)
+                else:
+                    raise Exception("Invalid type ID encountered: " + str(ord(type)))
             return self._deserializer.deserialize(self._read)
         else:
             raise StopIteration
@@ -197,6 +226,16 @@ class Iterator(defIter.Iterator):
         self._deserializer = None
 
 
+class PlanIterator(object):
+    def __init__(self, con, env):
+        self._connection = con
+        self._env = env
+
+    def next(self):
+        deserializer = _get_deserializer(self._connection.read, self._env._types)
+        return deserializer.deserialize(self._connection.read)
+
+
 class DummyIterator(Iterator):
     def __init__(self):
         super(Iterator, self).__init__()
@@ -211,12 +250,11 @@ class DummyIterator(Iterator):
         return False
 
 
-def _get_deserializer(group, read, custom_types, type=None):
-    if type is None:
-        type = read(1, group)
-        return _get_deserializer(group, read, custom_types, type)
-    elif type == Types.TYPE_TUPLE:
-        return TupleDeserializer(read, group, custom_types)
+#=====Deserializer======================================================================================================
+def _get_deserializer(read, custom_types):
+    type = read(1)
+    if 0 < ord(type) < 26:
+        return TupleDeserializer([_get_deserializer(read, custom_types) for _ in range(ord(type))])
     elif type == Types.TYPE_BYTE:
         return ByteDeserializer()
     elif type == Types.TYPE_BYTES:
@@ -238,101 +276,125 @@ def _get_deserializer(group, read, custom_types, type=None):
     else:
         for entry in custom_types:
             if type == entry[0]:
-                return entry[3]
-        raise Exception("Unable to find deserializer for type ID " + str(type))
+                return CustomTypeDeserializer(entry[3])
+        raise Exception("Unable to find deserializer for type ID " + str(ord(type)))
 
 
-class TupleDeserializer(object):
-    def __init__(self, read, group, custom_types):
-        size = unpack(">I", read(4, group))[0]
-        self.deserializer = [_get_deserializer(group, read, custom_types) for _ in range(size)]
+class Deserializer(object):
+    def get_type_info_size(self):
+        return 1
 
     def deserialize(self, read):
-        return tuple([s.deserialize(read) for s in self.deserializer])
+        pass
+
 
+class ArrayDeserializer(Deserializer):
+    def __init__(self, deserializer):
+        self._deserializer = deserializer
+        self._d_skip = deserializer.get_type_info_size()
 
-class ByteDeserializer(object):
+    def deserialize(self, read):
+        read(1) #array type
+        read(self._d_skip)
+        return self._deserializer.deserialize(read)
+
+
+class KeyValueDeserializer(Deserializer):
+    def __init__(self, key_deserializer, value_deserializer):
+        self._key_deserializer = [(k, k.get_type_info_size()) for k in key_deserializer]
+        self._value_deserializer = value_deserializer
+        self._value_deserializer_skip = value_deserializer.get_type_info_size()
+
+    def deserialize(self, read):
+        fields = []
+        read(1) #key value type
+        read(1) #key count
+        for dk in self._key_deserializer:
+            read(dk[1])
+            fields.append(dk[0].deserialize(read))
+        dv = self._value_deserializer
+        read(self._value_deserializer_skip)
+        return (tuple(fields), dv.deserialize(read))
+
+
+class ValueValueDeserializer(Deserializer):
+    def __init__(self, d1, d2):
+        self._d1 = d1
+        self._d1_skip = self._d1.get_type_info_size()
+        self._d2 = d2
+        self._d2_skip = self._d2.get_type_info_size()
+
+    def deserialize(self, read):
+        read(1)
+        read(self._d1_skip)
+        f1 = self._d1.deserialize(read)
+        read(self._d2_skip)
+        f2 = self._d2.deserialize(read)
+        return (f1, f2)
+
+
+class CustomTypeDeserializer(Deserializer):
+    def __init__(self, deserializer):
+        self._deserializer = deserializer
+
+    def deserialize(self, read):
+        read(4) #discard binary size
+        return self._deserializer.deserialize(read)
+
+
+class TupleDeserializer(Deserializer):
+    def __init__(self, deserializer):
+        self._deserializer = deserializer
+
+    def get_type_info_size(self):
+        return 1 + sum([d.get_type_info_size() for d in self._deserializer])
+
+    def deserialize(self, read):
+        return tuple([s.deserialize(read) for s in self._deserializer])
+
+
+class ByteDeserializer(Deserializer):
     def deserialize(self, read):
         return unpack(">c", read(1))[0]
 
 
-class ByteArrayDeserializer(object):
+class ByteArrayDeserializer(Deserializer):
     def deserialize(self, read):
         size = unpack(">i", read(4))[0]
         return bytearray(read(size)) if size else bytearray(b"")
 
 
-class BooleanDeserializer(object):
+class BooleanDeserializer(Deserializer):
     def deserialize(self, read):
         return unpack(">?", read(1))[0]
 
 
-class FloatDeserializer(object):
+class FloatDeserializer(Deserializer):
     def deserialize(self, read):
         return unpack(">f", read(4))[0]
 
 
-class DoubleDeserializer(object):
+class DoubleDeserializer(Deserializer):
     def deserialize(self, read):
         return unpack(">d", read(8))[0]
 
 
-class IntegerDeserializer(object):
+class IntegerDeserializer(Deserializer):
     def deserialize(self, read):
         return unpack(">i", read(4))[0]
 
 
-class LongDeserializer(object):
+class LongDeserializer(Deserializer):
     def deserialize(self, read):
         return unpack(">q", read(8))[0]
 
 
-class StringDeserializer(object):
+class StringDeserializer(Deserializer):
     def deserialize(self, read):
         length = unpack(">i", read(4))[0]
         return read(length).decode("utf-8") if length else ""
 
 
-class NullDeserializer(object):
-    def deserialize(self):
+class NullDeserializer(Deserializer):
+    def deserialize(self, read):
         return None
-
-
-class TypedIterator(object):
-    def __init__(self, con, env):
-        self._connection = con
-        self._env = env
-
-    def next(self):
-        read = self._connection.read
-        type = read(1)
-        if type == Types.TYPE_TUPLE:
-            size = unpack(">i", read(4))[0]
-            return tuple([self.next() for x in range(size)])
-        elif type == Types.TYPE_BYTE:
-            return unpack(">c", read(1))[0]
-        elif type == Types.TYPE_BYTES:
-            size = unpack(">i", read(4))[0]
-            return bytearray(read(size)) if size else bytearray(b"")
-        elif type == Types.TYPE_BOOLEAN:
-            return unpack(">?", read(1))[0]
-        elif type == Types.TYPE_FLOAT:
-            return unpack(">f", read(4))[0]
-        elif type == Types.TYPE_DOUBLE:
-            return unpack(">d", read(8))[0]
-        elif type == Types.TYPE_INTEGER:
-            return unpack(">i", read(4))[0]
-        elif type == Types.TYPE_LONG:
-            return unpack(">q", read(8))[0]
-        elif type == Types.TYPE_STRING:
-            length = unpack(">i", read(4))[0]
-            return read(length).decode("utf-8") if length else ""
-        elif type == Types.TYPE_NULL:
-            return None
-        else:
-            for entry in self._env._types:
-                if type == entry[0]:
-                    return entry[3]()
-            raise Exception("Unable to find deserializer for type ID " + str(type))
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery10.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery10.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery10.py
index cc9e7cf..c107987 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery10.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery10.py
@@ -76,7 +76,7 @@ if __name__ == "__main__":
              STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING], '\n', '|') \
         .project(0,5,6,8) \
         .filter(LineitemFilter()) \
-        .map(ComputeRevenue(), [INT, FLOAT])
+        .map(ComputeRevenue())
 
 	nation = env \
     	.read_csv(sys.argv[4], [INT, STRING, INT, STRING], '\n', '|') \

http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery3.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery3.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery3.py
index 3eb72c9..aaa4e55 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery3.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery3.py
@@ -87,13 +87,13 @@ if __name__ == "__main__":
         .join(order) \
         .where(0) \
         .equal_to(1) \
-        .using(CustomerOrderJoin(),[INT, FLOAT, STRING, INT])
+        .using(CustomerOrderJoin())
 
     result = customerWithOrder \
         .join(lineitem) \
         .where(0) \
         .equal_to(0) \
-        .using(CustomerOrderLineitemJoin(), [INT, FLOAT, STRING, INT]) \
+        .using(CustomerOrderLineitemJoin()) \
         .group_by(0, 2, 3) \
         .reduce(SumReducer())
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TriangleEnumeration.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TriangleEnumeration.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TriangleEnumeration.py
index b1b3ef4..f13cc04 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TriangleEnumeration.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TriangleEnumeration.py
@@ -16,7 +16,7 @@
 # limitations under the License.
 ################################################################################
 from flink.plan.Environment import get_environment
-from flink.plan.Constants import INT, Order
+from flink.plan.Constants import Order
 from flink.functions.FlatMapFunction import FlatMapFunction
 from flink.functions.GroupReduceFunction import GroupReduceFunction
 from flink.functions.ReduceFunction import ReduceFunction
@@ -123,27 +123,27 @@ if __name__ == "__main__":
         (1, 2), (1, 3), (1, 4), (1, 5), (2, 3), (2, 5), (3, 4), (3, 7), (3, 8), (5, 6), (7, 8))
 
     edges_with_degrees = edges \
-        .flat_map(EdgeDuplicator(), [INT, INT]) \
+        .flat_map(EdgeDuplicator()) \
         .group_by(0) \
         .sort_group(1, Order.ASCENDING) \
-        .reduce_group(DegreeCounter(), [INT, INT, INT, INT]) \
+        .reduce_group(DegreeCounter()) \
         .group_by(0, 2) \
         .reduce(DegreeJoiner())
 
     edges_by_degree = edges_with_degrees \
-        .map(EdgeByDegreeProjector(), [INT, INT])
+        .map(EdgeByDegreeProjector())
 
     edges_by_id = edges_by_degree \
-        .map(EdgeByIdProjector(), [INT, INT])
+        .map(EdgeByIdProjector())
 
     triangles = edges_by_degree \
         .group_by(0) \
         .sort_group(1, Order.ASCENDING) \
-        .reduce_group(TriadBuilder(), [INT, INT, INT]) \
+        .reduce_group(TriadBuilder()) \
         .join(edges_by_id) \
         .where(1, 2) \
         .equal_to(0, 1) \
-        .using(TriadFilter(), [INT, INT, INT])
+        .using(TriadFilter())
 
     triangles.output()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WebLogAnalysis.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WebLogAnalysis.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WebLogAnalysis.py
index 676043f..1ea3e78 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WebLogAnalysis.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WebLogAnalysis.py
@@ -19,7 +19,7 @@ import sys
 
 from datetime import datetime
 from flink.plan.Environment import get_environment
-from flink.plan.Constants import INT, STRING, FLOAT, WriteMode
+from flink.plan.Constants import WriteMode
 from flink.functions.CoGroupFunction import CoGroupFunction
 from flink.functions.FilterFunction import FilterFunction
 
@@ -54,16 +54,16 @@ if __name__ == "__main__":
     	sys.exit("Usage: ./bin/pyflink.sh WebLogAnalysis <docments path> <ranks path> <visits path> <output path>")
 
     documents = env \
-        .read_csv(sys.argv[1], [STRING, STRING], "\n", "|") \
+        .read_csv(sys.argv[1], "\n", "|") \
         .filter(DocumentFilter()) \
         .project(0)
 
     ranks = env \
-        .read_csv(sys.argv[2], [INT, STRING, INT], "\n", "|") \
+        .read_csv(sys.argv[2], "\n", "|") \
         .filter(RankFilter())
 
     visits = env \
-        .read_csv(sys.argv[3], [STRING, STRING, STRING, FLOAT, STRING, STRING, STRING, STRING, INT], "\n", "|") \
+        .read_csv(sys.argv[3], "\n", "|") \
         .project(1,2) \
         .filter(VisitFilter()) \
         .project(0)
@@ -78,7 +78,7 @@ if __name__ == "__main__":
         .co_group(visits) \
         .where(1) \
         .equal_to(0) \
-        .using(AntiJoinVisits(), [INT, STRING, INT])
+        .using(AntiJoinVisits())
 
     result.write_csv(sys.argv[4], '\n', '|', WriteMode.OVERWRITE)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WordCount.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WordCount.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WordCount.py
index 71c2e28..2ab724c 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WordCount.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WordCount.py
@@ -18,7 +18,6 @@
 import sys
 
 from flink.plan.Environment import get_environment
-from flink.plan.Constants import INT, STRING
 from flink.functions.FlatMapFunction import FlatMapFunction
 from flink.functions.GroupReduceFunction import GroupReduceFunction
 
@@ -47,9 +46,9 @@ if __name__ == "__main__":
         data = env.from_elements("hello","world","hello","car","tree","data","hello")
 
     result = data \
-        .flat_map(Tokenizer(), (INT, STRING)) \
+        .flat_map(Tokenizer()) \
         .group_by(1) \
-        .reduce_group(Adder(), (INT, STRING), combinable=True) \
+        .reduce_group(Adder(), combinable=True) \
 
     if len(sys.argv) == 3:
         result.write_csv(sys.argv[2])

http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py
index 9c55787..4cb337a 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py
@@ -25,13 +25,16 @@ class CoGroupFunction(Function.Function):
         self._keys1 = None
         self._keys2 = None
 
-    def _configure(self, input_file, output_file, port, env):
+    def _configure(self, input_file, output_file, port, env, info):
         self._connection = Connection.TwinBufferingTCPMappedFileConnection(input_file, output_file, port)
         self._iterator = Iterator.Iterator(self._connection, env, 0)
         self._iterator2 = Iterator.Iterator(self._connection, env, 1)
         self._cgiter = Iterator.CoGroupIterator(self._iterator, self._iterator2, self._keys1, self._keys2)
+        self._collector = Collector.Collector(self._connection, env, info)
         self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
-        self._configure_chain(Collector.Collector(self._connection, env))
+        if info.chained_info is not None:
+            info.chained_info.operator._configure_chain(self.context, self._collector, info.chained_info)
+            self._collector = info.chained_info.operator
 
     def _run(self):
         collector = self._collector

http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
index f874a25..dfe6a28 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
@@ -16,9 +16,9 @@
 # limitations under the License.
 ################################################################################
 from abc import ABCMeta, abstractmethod
-import sys
 from collections import deque
 from flink.connection import Connection, Iterator, Collector
+from flink.connection.Iterator import IntegerDeserializer, StringDeserializer, _get_deserializer
 from flink.functions import RuntimeContext
 
 
@@ -30,55 +30,56 @@ class Function(object):
         self._iterator = None
         self._collector = None
         self.context = None
-        self._chain_operator = None
+        self._env = None
 
-    def _configure(self, input_file, output_file, port, env):
+    def _configure(self, input_file, output_file, port, env, info):
         self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
         self._iterator = Iterator.Iterator(self._connection, env)
+        self._collector = Collector.Collector(self._connection, env, info)
         self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
-        self._configure_chain(Collector.Collector(self._connection, env))
+        self._env = env
+        if info.chained_info is not None:
+            info.chained_info.operator._configure_chain(self.context, self._collector, info.chained_info)
+            self._collector = info.chained_info.operator
 
-    def _configure_chain(self, collector):
-        if self._chain_operator is not None:
-            self._collector = self._chain_operator
-            self._collector.context = self.context
-            self._collector._configure_chain(collector)
-            self._collector._open()
-        else:
+    def _configure_chain(self, context, collector, info):
+        self.context = context
+        if info.chained_info is None:
             self._collector = collector
-
-    def _chain(self, operator):
-        self._chain_operator = operator
+        else:
+            self._collector = info.chained_info.operator
+            info.chained_info.operator._configure_chain(context, collector, info.chained_info)
 
     @abstractmethod
     def _run(self):
         pass
 
-    def _open(self):
-        pass
-
     def _close(self):
         self._collector._close()
-        self._connection.close()
+        if self._connection is not None:
+            self._connection.close()
 
     def _go(self):
         self._receive_broadcast_variables()
         self._run()
 
     def _receive_broadcast_variables(self):
-        broadcast_count = self._iterator.next()
-        self._iterator._reset()
-        self._connection.reset()
+        con = self._connection
+        deserializer_int = IntegerDeserializer()
+        broadcast_count = deserializer_int.deserialize(con.read_secondary)
+        deserializer_string = StringDeserializer()
         for _ in range(broadcast_count):
-            name = self._iterator.next()
-            self._iterator._reset()
-            self._connection.reset()
+            name = deserializer_string.deserialize(con.read_secondary)
             bc = deque()
-            while(self._iterator.has_next()):
-                bc.append(self._iterator.next())
+            if con.read_secondary(1) == b"\x01":
+                serializer_data = _get_deserializer(con.read_secondary, self._env._types)
+                value = serializer_data.deserialize(con.read_secondary)
+                bc.append(value)
+                while con.read_secondary(1) == b"\x01":
+                    con.read_secondary(serializer_data.get_type_info_size()) #skip type info
+                    value = serializer_data.deserialize(con.read_secondary)
+                    bc.append(value)
             self.context._add_broadcast_variable(name, bc)
-            self._iterator._reset()
-            self._connection.reset()
 
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
index b758c19..340497d 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
@@ -24,21 +24,14 @@ from flink.plan.Constants import Order
 class GroupReduceFunction(Function.Function):
     def __init__(self):
         super(GroupReduceFunction, self).__init__()
-        self._keys = None
 
-    def _configure(self, input_file, output_file, port, env):
-        self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
-        self._iterator = Iterator.Iterator(self._connection, env)
-        if self._keys is None:
+    def _configure(self, input_file, output_file, port, env, info):
+        super(GroupReduceFunction, self)._configure(input_file, output_file, port, env, info)
+        if info.key1 is None:
             self._run = self._run_all_group_reduce
         else:
             self._run = self._run_grouped_group_reduce
-            self._group_iterator = Iterator.GroupIterator(self._iterator, self._keys)
-        self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
-        self._collector = Collector.Collector(self._connection, env)
-
-    def _set_grouping_keys(self, keys):
-        self._keys = keys
+            self._group_iterator = Iterator.GroupIterator(self._iterator, info.key1)
 
     def _run(self):
         pass

http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/KeySelectorFunction.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/KeySelectorFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/KeySelectorFunction.py
new file mode 100644
index 0000000..a75961f
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/KeySelectorFunction.py
@@ -0,0 +1,28 @@
+# ###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+class KeySelectorFunction:
+    def __call__(self, value):
+        return self.get_key(value)
+
+    def callable(self):
+        return True
+
+    def get_key(self, value):
+        pass

http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
index 45a22da..95e8b8a 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
@@ -23,21 +23,14 @@ from flink.connection import Connection, Iterator, Collector
 class ReduceFunction(Function.Function):
     def __init__(self):
         super(ReduceFunction, self).__init__()
-        self._keys = None
 
-    def _configure(self, input_file, output_file, port, env):
-        self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
-        self._iterator = Iterator.Iterator(self._connection, env)
-        if self._keys is None:
+    def _configure(self, input_file, output_file, port, env, info):
+        super(ReduceFunction, self)._configure(input_file, output_file, port, env, info)
+        if info.key1 is None:
             self._run = self._run_all_reduce
         else:
             self._run = self._run_grouped_reduce
-            self._group_iterator = Iterator.GroupIterator(self._iterator, self._keys)
-        self._collector = Collector.Collector(self._connection, env)
-        self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
-
-    def _set_grouping_keys(self, keys):
-        self._keys = keys
+            self._group_iterator = Iterator.GroupIterator(self._iterator, info.key1)
 
     def _run(self):
         pass

http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
index 0c9fe80..8881463 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
@@ -63,11 +63,6 @@ import sys
 PY2 = sys.version_info[0] == 2
 PY3 = sys.version_info[0] == 3
 
-
-class _Dummy(object):
-    pass
-
-
 if PY2:
     BOOL = True
     INT = 1
@@ -75,11 +70,17 @@ if PY2:
     FLOAT = 2.5
     STRING = "type"
     BYTES = bytearray(b"byte")
-    CUSTOM = _Dummy()
 elif PY3:
     BOOL = True
     INT = 1
     FLOAT = 2.5
     STRING = "type"
     BYTES = bytearray(b"byte")
-    CUSTOM = _Dummy()
+
+
+def _createKeyValueTypeInfo(keyCount):
+    return (tuple([BYTES for _ in range(keyCount)]), BYTES)
+
+
+def _createArrayTypeInfo():
+    return BYTES

http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
index 25ec8b8..eda8d02 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
@@ -15,11 +15,10 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-import inspect
 import copy
 import types as TYPES
 
-from flink.plan.Constants import _Identifier, WriteMode, STRING
+from flink.plan.Constants import _Identifier, WriteMode, _createKeyValueTypeInfo, _createArrayTypeInfo
 from flink.plan.OperationInfo import OperationInfo
 from flink.functions.CoGroupFunction import CoGroupFunction
 from flink.functions.FilterFunction import FilterFunction
@@ -30,52 +29,33 @@ from flink.functions.JoinFunction import JoinFunction
 from flink.functions.MapFunction import MapFunction
 from flink.functions.MapPartitionFunction import MapPartitionFunction
 from flink.functions.ReduceFunction import ReduceFunction
+from flink.functions.KeySelectorFunction import KeySelectorFunction
 
-def deduct_output_type(dataset):
-    skip = set([_Identifier.GROUP, _Identifier.SORT, _Identifier.UNION])
-    source = set([_Identifier.SOURCE_CSV, _Identifier.SOURCE_TEXT, _Identifier.SOURCE_VALUE])
-    default = set([_Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST, _Identifier.JOINT, _Identifier.JOINH, _Identifier.JOIN])
-
-    while True:
-        dataset_type = dataset.identifier
-        if dataset_type in skip:
-            dataset = dataset.parent
-            continue
-        if dataset_type in source:
-            if dataset_type == _Identifier.SOURCE_TEXT:
-                return STRING
-            if dataset_type == _Identifier.SOURCE_VALUE:
-                return dataset.values[0]
-            if dataset_type == _Identifier.SOURCE_CSV:
-                return dataset.types
-        if dataset_type == _Identifier.PROJECTION:
-            return tuple([deduct_output_type(dataset.parent)[k] for k in dataset.keys])
-        if dataset_type in default:
-            if dataset.operator is not None: #udf-join/cross
-                return dataset.types
-            if len(dataset.projections) == 0: #defaultjoin/-cross
-                return (deduct_output_type(dataset.parent), deduct_output_type(dataset.other))
-            else: #projectjoin/-cross
-                t1 = deduct_output_type(dataset.parent)
-                t2 = deduct_output_type(dataset.other)
-                out_type = []
-                for prj in dataset.projections:
-                    if len(prj[1]) == 0: #projection on non-tuple dataset
-                        if prj[0] == "first":
-                            out_type.append(t1)
-                        else:
-                            out_type.append(t2)
-                    else: #projection on tuple dataset
-                        for key in prj[1]:
-                            if prj[0] == "first":
-                                out_type.append(t1[key])
-                            else:
-                                out_type.append(t2[key])
-                return tuple(out_type)
-        return dataset.types
-
-
-class Set(object):
+
+class Stringify(MapFunction):
+    def map(self, value):
+        if isinstance(value, (tuple, list)):
+            return "(" + b", ".join([self.map(x) for x in value]) + ")"
+        else:
+            return str(value)
+
+
+class CsvStringify(MapFunction):
+    def __init__(self, f_delim):
+        super(CsvStringify, self).__init__()
+        self.delim = f_delim
+
+    def map(self, value):
+        return self.delim.join([self._map(field) for field in value])
+
+    def _map(self, value):
+        if isinstance(value, (tuple, list)):
+            return "(" + b", ".join([self.map(x) for x in value]) + ")"
+        else:
+            return str(value)
+
+
+class DataSet(object):
     def __init__(self, env, info):
         self._env = env
         self._info = info
@@ -86,6 +66,9 @@ class Set(object):
         """
         Writes a DataSet to the standard output stream (stdout).
         """
+        self.map(Stringify())._output(to_error)
+
+    def _output(self, to_error):
         child = OperationInfo()
         child.identifier = _Identifier.SINK_PRINT
         child.parent = self._info
@@ -100,6 +83,9 @@ class Set(object):
         :param path: he path pointing to the location the text file is written to.
         :param write_mode: OutputFormat.WriteMode value, indicating whether files should be overwritten
         """
+        return self.map(Stringify())._write_text(path, write_mode)
+
+    def _write_text(self, path, write_mode):
         child = OperationInfo()
         child.identifier = _Identifier.SINK_TEXT
         child.parent = self._info
@@ -116,6 +102,9 @@ class Set(object):
         :param path: The path pointing to the location the CSV file is written to.
         :param write_mode: OutputFormat.WriteMode value, indicating whether files should be overwritten
         """
+        return self.map(CsvStringify(field_delimiter))._write_csv(path, line_delimiter, field_delimiter, write_mode)
+
+    def _write_csv(self, path, line_delimiter, field_delimiter, write_mode):
         child = OperationInfo()
         child.identifier = _Identifier.SINK_CSV
         child.path = path
@@ -126,7 +115,7 @@ class Set(object):
         self._info.sinks.append(child)
         self._env._sinks.append(child)
 
-    def reduce_group(self, operator, types, combinable=False):
+    def reduce_group(self, operator, combinable=False):
         """
         Applies a GroupReduce transformation.
 
@@ -136,7 +125,6 @@ class Set(object):
         emit any number of output elements including none.
 
         :param operator: The GroupReduceFunction that is applied on the DataSet.
-        :param types: The type of the resulting DataSet.
         :return:A GroupReduceOperator that represents the reduced DataSet.
         """
         if isinstance(operator, TYPES.FunctionType):
@@ -148,17 +136,12 @@ class Set(object):
         child.identifier = _Identifier.GROUPREDUCE
         child.parent = self._info
         child.operator = operator
-        child.types = types
+        child.types = _createArrayTypeInfo()
         child.name = "PythonGroupReduce"
         self._info.children.append(child)
         self._env._sets.append(child)
         return child_set
 
-
-class ReduceSet(Set):
-    def __init__(self, env, info):
-        super(ReduceSet, self).__init__(env, info)
-
     def reduce(self, operator):
         """
         Applies a Reduce transformation on a non-grouped DataSet.
@@ -179,16 +162,11 @@ class ReduceSet(Set):
         child.parent = self._info
         child.operator = operator
         child.name = "PythonReduce"
-        child.types = deduct_output_type(self._info)
+        child.types = _createArrayTypeInfo()
         self._info.children.append(child)
         self._env._sets.append(child)
         return child_set
 
-
-class DataSet(ReduceSet):
-    def __init__(self, env, info):
-        super(DataSet, self).__init__(env, info)
-
     def project(self, *fields):
         """
         Applies a Project transformation on a Tuple DataSet.
@@ -201,14 +179,7 @@ class DataSet(ReduceSet):
         :return: The projected DataSet.
 
         """
-        child = OperationInfo()
-        child_set = DataSet(self._env, child)
-        child.identifier = _Identifier.PROJECTION
-        child.parent = self._info
-        child.keys = fields
-        self._info.children.append(child)
-        self._env._sets.append(child)
-        return child_set
+        return self.map(lambda x: tuple([x[key] for key in fields]))
 
     def group_by(self, *keys):
         """
@@ -223,6 +194,9 @@ class DataSet(ReduceSet):
         :param keys: One or more field positions on which the DataSet will be grouped.
         :return:A Grouping on which a transformation needs to be applied to obtain a transformed DataSet.
         """
+        return self.map(lambda x: x)._group_by(keys)
+
+    def _group_by(self, keys):
         child = OperationInfo()
         child_chain = []
         child_set = UnsortedGrouping(self._env, child, child_chain)
@@ -251,9 +225,8 @@ class DataSet(ReduceSet):
         other_set._info.children.append(child)
         child_set = CoGroupOperatorWhere(self._env, child)
         child.identifier = _Identifier.COGROUP
-        child.parent = self._info
-        child.other = other_set._info
-        self._info.children.append(child)
+        child.parent_set = self
+        child.other_set = other_set
         return child_set
 
     def cross(self, other_set):
@@ -323,14 +296,13 @@ class DataSet(ReduceSet):
         child.identifier = _Identifier.FILTER
         child.parent = self._info
         child.operator = operator
-        child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
         child.name = "PythonFilter"
-        child.types = deduct_output_type(self._info)
+        child.types = _createArrayTypeInfo()
         self._info.children.append(child)
         self._env._sets.append(child)
         return child_set
 
-    def flat_map(self, operator, types):
+    def flat_map(self, operator):
         """
         Applies a FlatMap transformation on a DataSet.
 
@@ -338,7 +310,6 @@ class DataSet(ReduceSet):
         Each FlatMapFunction call can return any number of elements including none.
 
         :param operator: The FlatMapFunction that is called for each element of the DataSet.
-        :param types: The type of the resulting DataSet.
         :return:A FlatMapOperator that represents the transformed DataSe
         """
         if isinstance(operator, TYPES.FunctionType):
@@ -350,8 +321,7 @@ class DataSet(ReduceSet):
         child.identifier = _Identifier.FLATMAP
         child.parent = self._info
         child.operator = operator
-        child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        child.types = types
+        child.types = _createArrayTypeInfo()
         child.name = "PythonFlatMap"
         self._info.children.append(child)
         self._env._sets.append(child)
@@ -398,14 +368,11 @@ class DataSet(ReduceSet):
         child = OperationInfo()
         child_set = JoinOperatorWhere(self._env, child)
         child.identifier = identifier
-        child.parent = self._info
-        child.other = other_set._info
-        self._info.children.append(child)
-        other_set._info.children.append(child)
-        self._env._sets.append(child)
+        child.parent_set = self
+        child.other_set = other_set
         return child_set
 
-    def map(self, operator, types):
+    def map(self, operator):
         """
         Applies a Map transformation on a DataSet.
 
@@ -413,7 +380,6 @@ class DataSet(ReduceSet):
         Each MapFunction call returns exactly one element.
 
         :param operator: The MapFunction that is called for each element of the DataSet.
-        :param types: The type of the resulting DataSet
         :return:A MapOperator that represents the transformed DataSet
         """
         if isinstance(operator, TYPES.FunctionType):
@@ -425,14 +391,13 @@ class DataSet(ReduceSet):
         child.identifier = _Identifier.MAP
         child.parent = self._info
         child.operator = operator
-        child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        child.types = types
+        child.types = _createArrayTypeInfo()
         child.name = "PythonMap"
         self._info.children.append(child)
         self._env._sets.append(child)
         return child_set
 
-    def map_partition(self, operator, types):
+    def map_partition(self, operator):
         """
         Applies a MapPartition transformation on a DataSet.
 
@@ -444,7 +409,6 @@ class DataSet(ReduceSet):
         sees is non deterministic and depends on the degree of parallelism of the operation.
 
         :param operator: The MapFunction that is called for each element of the DataSet.
-        :param types: The type of the resulting DataSet
         :return:A MapOperator that represents the transformed DataSet
         """
         if isinstance(operator, TYPES.FunctionType):
@@ -456,8 +420,7 @@ class DataSet(ReduceSet):
         child.identifier = _Identifier.MAPPARTITION
         child.parent = self._info
         child.operator = operator
-        child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        child.types = types
+        child.types = _createArrayTypeInfo()
         child.name = "PythonMapPartition"
         self._info.children.append(child)
         self._env._sets.append(child)
@@ -506,7 +469,10 @@ class Grouping(object):
         info.id = env._counter
         env._counter += 1
 
-    def reduce_group(self, operator, types, combinable=False):
+    def _finalize(self):
+        pass
+
+    def reduce_group(self, operator, combinable=False):
         """
         Applies a GroupReduce transformation.
 
@@ -516,21 +482,21 @@ class Grouping(object):
         emit any number of output elements including none.
 
         :param operator: The GroupReduceFunction that is applied on the DataSet.
-        :param types: The type of the resulting DataSet.
         :return:A GroupReduceOperator that represents the reduced DataSet.
         """
+        self._finalize()
         if isinstance(operator, TYPES.FunctionType):
             f = operator
             operator = GroupReduceFunction()
             operator.reduce = f
-        operator._set_grouping_keys(self._child_chain[0].keys)
         child = OperationInfo()
         child_set = OperatorSet(self._env, child)
         child.identifier = _Identifier.GROUPREDUCE
         child.parent = self._info
         child.operator = operator
-        child.types = types
+        child.types = _createArrayTypeInfo()
         child.name = "PythonGroupReduce"
+        child.key1 = self._child_chain[0].keys
         self._info.children.append(child)
         self._env._sets.append(child)
 
@@ -573,26 +539,96 @@ class UnsortedGrouping(Grouping):
         :param operator:The ReduceFunction that is applied on the DataSet.
         :return:A ReduceOperator that represents the reduced DataSet.
         """
-        operator._set_grouping_keys(self._child_chain[0].keys)
-        for i in self._child_chain:
-            self._env._sets.append(i)
+        self._finalize()
+        if isinstance(operator, TYPES.FunctionType):
+            f = operator
+            operator = ReduceFunction()
+            operator.reduce = f
         child = OperationInfo()
         child_set = OperatorSet(self._env, child)
         child.identifier = _Identifier.REDUCE
         child.parent = self._info
         child.operator = operator
         child.name = "PythonReduce"
-        child.types = deduct_output_type(self._info)
+        child.types = _createArrayTypeInfo()
+        child.key1 = self._child_chain[0].keys
         self._info.children.append(child)
         self._env._sets.append(child)
 
         return child_set
 
+    def _finalize(self):
+        grouping = self._child_chain[0]
+        keys = grouping.keys
+        f = None
+        if isinstance(keys[0], TYPES.FunctionType):
+            f = lambda x: (keys[0](x),)
+        if isinstance(keys[0], KeySelectorFunction):
+            f = lambda x: (keys[0].get_key(x),)
+        if f is None:
+            f = lambda x: tuple([x[key] for key in keys])
+
+        grouping.parent.operator.map = lambda x: (f(x), x)
+        grouping.parent.types = _createKeyValueTypeInfo(len(keys))
+        grouping.keys = tuple([i for i in range(len(grouping.keys))])
+
 
 class SortedGrouping(Grouping):
     def __init__(self, env, info, child_chain):
         super(SortedGrouping, self).__init__(env, info, child_chain)
 
+    def _finalize(self):
+        grouping = self._child_chain[0]
+        sortings = self._child_chain[1:]
+
+        #list of used index keys to prevent duplicates and determine final index
+        index_keys = set()
+
+        if not isinstance(grouping.keys[0], (TYPES.FunctionType, KeySelectorFunction)):
+            index_keys = index_keys.union(set(grouping.keys))
+
+        #list of sorts using indices
+        index_sorts = []
+        #list of sorts using functions
+        ksl_sorts = []
+        for s in sortings:
+            if not isinstance(s.field, (TYPES.FunctionType, KeySelectorFunction)):
+                index_keys.add(s.field)
+                index_sorts.append(s)
+            else:
+                ksl_sorts.append(s)
+
+        used_keys = sorted(index_keys)
+        #all data gathered
+
+        #construct list of extractor lambdas
+        lambdas = []
+        i = 0
+        for key in used_keys:
+            lambdas.append(lambda x, k=key: x[k])
+            i += 1
+        if isinstance(grouping.keys[0], (TYPES.FunctionType, KeySelectorFunction)):
+            lambdas.append(grouping.keys[0])
+        for ksl_op in ksl_sorts:
+            lambdas.append(ksl_op.field)
+
+        grouping.parent.operator.map = lambda x: (tuple([l(x) for l in lambdas]), x)
+        grouping.parent.types = _createKeyValueTypeInfo(len(lambdas))
+        #modify keys
+        ksl_offset = len(used_keys)
+        if not isinstance(grouping.keys[0], (TYPES.FunctionType, KeySelectorFunction)):
+            grouping.keys = tuple([used_keys.index(key) for key in grouping.keys])
+        else:
+            grouping.keys = (ksl_offset,)
+            ksl_offset += 1
+
+        for iop in index_sorts:
+            iop.field = used_keys.index(iop.field)
+
+        for kop in ksl_sorts:
+            kop.field = ksl_offset
+            ksl_offset += 1
+
 
 class CoGroupOperatorWhere(object):
     def __init__(self, env, info):
@@ -609,6 +645,18 @@ class CoGroupOperatorWhere(object):
         :param fields: The indexes of the Tuple fields of the first co-grouped DataSets that should be used as keys.
         :return: An incomplete CoGroup transformation.
         """
+        f = None
+        if isinstance(fields[0], TYPES.FunctionType):
+            f = lambda x: (fields[0](x),)
+        if isinstance(fields[0], KeySelectorFunction):
+            f = lambda x: (fields[0].get_key(x),)
+        if f is None:
+            f = lambda x: tuple([x[key] for key in fields])
+
+        new_parent_set = self._info.parent_set.map(lambda x: (f(x), x))
+        new_parent_set._info.types = _createKeyValueTypeInfo(len(fields))
+        self._info.parent = new_parent_set._info
+        self._info.parent.children.append(self._info)
         self._info.key1 = fields
         return CoGroupOperatorTo(self._env, self._info)
 
@@ -628,6 +676,18 @@ class CoGroupOperatorTo(object):
         :param fields: The indexes of the Tuple fields of the second co-grouped DataSet that should be used as keys.
         :return: An incomplete CoGroup transformation.
         """
+        f = None
+        if isinstance(fields[0], TYPES.FunctionType):
+            f = lambda x: (fields[0](x),)
+        if isinstance(fields[0], KeySelectorFunction):
+            f = lambda x: (fields[0].get_key(x),)
+        if f is None:
+            f = lambda x: tuple([x[key] for key in fields])
+
+        new_other_set = self._info.other_set.map(lambda x: (f(x), x))
+        new_other_set._info.types = _createKeyValueTypeInfo(len(fields))
+        self._info.other = new_other_set._info
+        self._info.other.children.append(self._info)
         self._info.key2 = fields
         return CoGroupOperatorUsing(self._env, self._info)
 
@@ -637,7 +697,7 @@ class CoGroupOperatorUsing(object):
         self._env = env
         self._info = info
 
-    def using(self, operator, types):
+    def using(self, operator):
         """
         Finalizes a CoGroup transformation.
 
@@ -645,7 +705,6 @@ class CoGroupOperatorUsing(object):
         Each CoGroupFunction call returns an arbitrary number of keys.
 
         :param operator: The CoGroupFunction that is called for all groups of elements with identical keys.
-        :param types: The type of the resulting DataSet.
         :return:An CoGroupOperator that represents the co-grouped result DataSet.
         """
         if isinstance(operator, TYPES.FunctionType):
@@ -653,11 +712,12 @@ class CoGroupOperatorUsing(object):
             operator = CoGroupFunction()
             operator.co_group = f
         new_set = OperatorSet(self._env, self._info)
+        self._info.key1 = tuple([x for x in range(len(self._info.key1))])
+        self._info.key2 = tuple([x for x in range(len(self._info.key2))])
         operator._keys1 = self._info.key1
         operator._keys2 = self._info.key2
         self._info.operator = operator
-        self._info.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        self._info.types = types
+        self._info.types = _createArrayTypeInfo()
         self._info.name = "PythonCoGroup"
         self._env._sets.append(self._info)
         return new_set
@@ -679,7 +739,19 @@ class JoinOperatorWhere(object):
         :return:An incomplete Join transformation.
 
         """
-        self._info.key1 = fields
+        f = None
+        if isinstance(fields[0], TYPES.FunctionType):
+            f = lambda x: (fields[0](x),)
+        if isinstance(fields[0], KeySelectorFunction):
+            f = lambda x: (fields[0].get_key(x),)
+        if f is None:
+            f = lambda x: tuple([x[key] for key in fields])
+
+        new_parent_set = self._info.parent_set.map(lambda x: (f(x), x))
+        new_parent_set._info.types = _createKeyValueTypeInfo(len(fields))
+        self._info.parent = new_parent_set._info
+        self._info.parent.children.append(self._info)
+        self._info.key1 = tuple([x for x in range(len(fields))])
         return JoinOperatorTo(self._env, self._info)
 
 
@@ -698,81 +770,115 @@ class JoinOperatorTo(object):
         :param fields:The indexes of the Tuple fields of the second join DataSet that should be used as keys.
         :return:An incomplete Join Transformation.
         """
-        self._info.key2 = fields
+        f = None
+        if isinstance(fields[0], TYPES.FunctionType):
+            f = lambda x: (fields[0](x),)
+        if isinstance(fields[0], KeySelectorFunction):
+            f = lambda x: (fields[0].get_key(x),)
+        if f is None:
+            f = lambda x: tuple([x[key] for key in fields])
+
+        new_other_set = self._info.other_set.map(lambda x: (f(x), x))
+        new_other_set._info.types = _createKeyValueTypeInfo(len(fields))
+        self._info.other = new_other_set._info
+        self._info.other.children.append(self._info)
+        self._info.key2 = tuple([x for x in range(len(fields))])
+        self._env._sets.append(self._info)
         return JoinOperator(self._env, self._info)
 
 
-class JoinOperatorProjection(DataSet):
+class Projector(DataSet):
     def __init__(self, env, info):
-        super(JoinOperatorProjection, self).__init__(env, info)
+        super(Projector, self).__init__(env, info)
 
     def project_first(self, *fields):
         """
-        Initiates a ProjectJoin transformation.
+        Initiates a Project transformation.
 
-        Projects the first join input.
-        If the first join input is a Tuple DataSet, fields can be selected by their index.
-        If the first join input is not a Tuple DataSet, no parameters should be passed.
+        Projects the first input.
+        If the first input is a Tuple DataSet, fields can be selected by their index.
+        If the first input is not a Tuple DataSet, no parameters should be passed.
 
         :param fields: The indexes of the selected fields.
-        :return: An incomplete JoinProjection.
+        :return: An incomplete Projection.
         """
-        self._info.projections.append(("first", fields))
+        for field in fields:
+            self._info.projections.append((0, field))
+        self._info.operator.map = lambda x : tuple([x[side][index] for side, index in self._info.projections])
         return self
 
     def project_second(self, *fields):
         """
-        Initiates a ProjectJoin transformation.
+        Initiates a Project transformation.
 
-        Projects the second join input.
-        If the second join input is a Tuple DataSet, fields can be selected by their index.
-        If the second join input is not a Tuple DataSet, no parameters should be passed.
+        Projects the second input.
+        If the second input is a Tuple DataSet, fields can be selected by their index.
+        If the second input is not a Tuple DataSet, no parameters should be passed.
 
         :param fields: The indexes of the selected fields.
-        :return: An incomplete JoinProjection.
+        :return: An incomplete Projection.
         """
-        self._info.projections.append(("second", fields))
+        for field in fields:
+            self._info.projections.append((1, field))
+        self._info.operator.map = lambda x : tuple([x[side][index] for side, index in self._info.projections])
         return self
 
 
-class JoinOperator(DataSet):
-    def __init__(self, env, info):
-        super(JoinOperator, self).__init__(env, info)
+class Projectable:
+    def __init__(self):
+        pass
 
     def project_first(self, *fields):
         """
-        Initiates a ProjectJoin transformation.
+        Initiates a Project  transformation.
 
-        Projects the first join input.
-        If the first join input is a Tuple DataSet, fields can be selected by their index.
-        If the first join input is not a Tuple DataSet, no parameters should be passed.
+        Projects the first  input.
+        If the first input is a Tuple DataSet, fields can be selected by their index.
+        If the first input is not a Tuple DataSet, no parameters should be passed.
 
         :param fields: The indexes of the selected fields.
-        :return: An incomplete JoinProjection.
+        :return: An incomplete Projection.
         """
-        return JoinOperatorProjection(self._env, self._info).project_first(*fields)
+        return Projectable._createProjector(self._env, self._info).project_first(*fields)
 
     def project_second(self, *fields):
         """
-        Initiates a ProjectJoin transformation.
+        Initiates a Project transformation.
 
-        Projects the second join input.
-        If the second join input is a Tuple DataSet, fields can be selected by their index.
-        If the second join input is not a Tuple DataSet, no parameters should be passed.
+        Projects the second input.
+        If the second input is a Tuple DataSet, fields can be selected by their index.
+        If the second input is not a Tuple DataSet, no parameters should be passed.
 
         :param fields: The indexes of the selected fields.
-        :return: An incomplete JoinProjection.
+        :return: An incomplete Projection.
         """
-        return JoinOperatorProjection(self._env, self._info).project_second(*fields)
+        return Projectable._createProjector(self._env, self._info).project_second(*fields)
+
+    @staticmethod
+    def _createProjector(env, info):
+        child = OperationInfo()
+        child_set = Projector(env, child)
+        child.identifier = _Identifier.MAP
+        child.operator = MapFunction()
+        child.parent = info
+        child.types = _createArrayTypeInfo()
+        child.name = "Projector"
+        info.children.append(child)
+        env._sets.append(child)
+        return child_set
+
 
-    def using(self, operator, types):
+class JoinOperator(DataSet, Projectable):
+    def __init__(self, env, info):
+        super(JoinOperator, self).__init__(env, info)
+
+    def using(self, operator):
         """
         Finalizes a Join transformation.
 
         Applies a JoinFunction to each pair of joined elements. Each JoinFunction call returns exactly one element.
 
         :param operator:The JoinFunction that is called for each pair of joined elements.
-        :param types:
         :return:An Set that represents the joined result DataSet.
         """
         if isinstance(operator, TYPES.FunctionType):
@@ -780,84 +886,23 @@ class JoinOperator(DataSet):
             operator = JoinFunction()
             operator.join = f
         self._info.operator = operator
-        self._info.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        self._info.types = types
+        self._info.types = _createArrayTypeInfo()
         self._info.name = "PythonJoin"
-        self._env._sets.append(self._info)
+        self._info.uses_udf = True
         return OperatorSet(self._env, self._info)
 
 
-class CrossOperatorProjection(DataSet):
-    def __init__(self, env, info):
-        super(CrossOperatorProjection, self).__init__(env, info)
-
-    def project_first(self, *fields):
-        """
-        Initiates a ProjectCross transformation.
-
-        Projects the first join input.
-        If the first join input is a Tuple DataSet, fields can be selected by their index.
-        If the first join input is not a Tuple DataSet, no parameters should be passed.
-
-        :param fields: The indexes of the selected fields.
-        :return: An incomplete CrossProjection.
-        """
-        self._info.projections.append(("first", fields))
-        return self
-
-    def project_second(self, *fields):
-        """
-        Initiates a ProjectCross transformation.
-
-        Projects the second join input.
-        If the second join input is a Tuple DataSet, fields can be selected by their index.
-        If the second join input is not a Tuple DataSet, no parameters should be passed.
-
-        :param fields: The indexes of the selected fields.
-        :return: An incomplete CrossProjection.
-        """
-        self._info.projections.append(("second", fields))
-        return self
-
-
-class CrossOperator(DataSet):
+class CrossOperator(DataSet, Projectable):
     def __init__(self, env, info):
         super(CrossOperator, self).__init__(env, info)
 
-    def project_first(self, *fields):
-        """
-        Initiates a ProjectCross transformation.
-
-        Projects the first join input.
-        If the first join input is a Tuple DataSet, fields can be selected by their index.
-        If the first join input is not a Tuple DataSet, no parameters should be passed.
-
-        :param fields: The indexes of the selected fields.
-        :return: An incomplete CrossProjection.
-        """
-        return CrossOperatorProjection(self._env, self._info).project_first(*fields)
-
-    def project_second(self, *fields):
-        """
-        Initiates a ProjectCross transformation.
-
-        Projects the second join input.
-        If the second join input is a Tuple DataSet, fields can be selected by their index.
-        If the second join input is not a Tuple DataSet, no parameters should be passed.
-
-        :param fields: The indexes of the selected fields.
-        :return: An incomplete CrossProjection.
-        """
-        return CrossOperatorProjection(self._env, self._info).project_second(*fields)
-
-    def using(self, operator, types):
+    def using(self, operator):
         """
         Finalizes a Cross transformation.
 
         Applies a CrossFunction to each pair of joined elements. Each CrossFunction call returns exactly one element.
 
         :param operator:The CrossFunction that is called for each pair of joined elements.
-        :param types: The type of the resulting DataSet.
         :return:An Set that represents the joined result DataSet.
         """
         if isinstance(operator, TYPES.FunctionType):
@@ -865,7 +910,7 @@ class CrossOperator(DataSet):
             operator = CrossFunction()
             operator.cross = f
         self._info.operator = operator
-        self._info.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        self._info.types = types
+        self._info.types = _createArrayTypeInfo()
         self._info.name = "PythonCross"
+        self._info.uses_udf = True
         return OperatorSet(self._env, self._info)


Mime
View raw message