flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [8/8] flink git commit: [FLINK-2901] Move PythonAPI to flink-libraries
Date Thu, 12 Nov 2015 20:09:23 GMT
[FLINK-2901] Move PythonAPI to flink-libraries

This closes #1257


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/824074aa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/824074aa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/824074aa

Branch: refs/heads/master
Commit: 824074aac620a940d33cb484f97e16c640d5d6b7
Parents: 2063fa1
Author: zentol <s.motsu@web.de>
Authored: Thu Nov 12 19:04:42 2015 +0100
Committer: zentol <s.motsu@web.de>
Committed: Thu Nov 12 19:04:42 2015 +0100

----------------------------------------------------------------------
 flink-dist/src/main/assemblies/bin.xml          |   2 +-
 flink-libraries/flink-python/pom.xml            |  81 ++
 .../flink/python/api/PythonOperationInfo.java   | 344 +++++++
 .../flink/python/api/PythonPlanBinder.java      | 750 +++++++++++++++
 .../python/api/functions/PythonCoGroup.java     |  78 ++
 .../api/functions/PythonCombineIdentity.java    |  79 ++
 .../api/functions/PythonMapPartition.java       |  70 ++
 .../python/api/streaming/PythonStreamer.java    | 374 ++++++++
 .../flink/python/api/streaming/Receiver.java    | 368 ++++++++
 .../flink/python/api/streaming/Sender.java      | 411 +++++++++
 .../python/api/streaming/StreamPrinter.java     |  55 ++
 .../org/apache/flink/python/api/__init__.py     |  17 +
 .../apache/flink/python/api/flink/__init__.py   |  17 +
 .../python/api/flink/connection/Collector.py    | 157 ++++
 .../python/api/flink/connection/Connection.py   | 166 ++++
 .../python/api/flink/connection/Constants.py    |  31 +
 .../python/api/flink/connection/Iterator.py     | 327 +++++++
 .../python/api/flink/connection/__init__.py     |  17 +
 .../python/api/flink/example/TPCHQuery10.py     | 115 +++
 .../python/api/flink/example/TPCHQuery3.py      | 106 +++
 .../api/flink/example/TriangleEnumeration.py    | 152 ++++
 .../python/api/flink/example/WebLogAnalysis.py  |  87 ++
 .../flink/python/api/flink/example/WordCount.py |  61 ++
 .../flink/python/api/flink/example/__init__.py  |  17 +
 .../api/flink/functions/CoGroupFunction.py      |  53 ++
 .../python/api/flink/functions/CrossFunction.py |  34 +
 .../api/flink/functions/FilterFunction.py       |  38 +
 .../api/flink/functions/FlatMapFunction.py      |  44 +
 .../python/api/flink/functions/Function.py      |  83 ++
 .../api/flink/functions/GroupReduceFunction.py  | 127 +++
 .../python/api/flink/functions/JoinFunction.py  |  33 +
 .../python/api/flink/functions/MapFunction.py   |  36 +
 .../api/flink/functions/MapPartitionFunction.py |  34 +
 .../api/flink/functions/ReduceFunction.py       | 123 +++
 .../api/flink/functions/RuntimeContext.py       |  30 +
 .../python/api/flink/functions/__init__.py      |  17 +
 .../flink/python/api/flink/plan/Constants.py    | 106 +++
 .../flink/python/api/flink/plan/DataSet.py      | 906 +++++++++++++++++++
 .../flink/python/api/flink/plan/Environment.py  | 345 +++++++
 .../flink/python/api/flink/plan/__init__.py     |  17 +
 .../python/api/flink/utilities/__init__.py      |  36 +
 .../python/org/apache/flink/python/api/setup.py |  33 +
 .../flink/python/api/PythonPlanBinderTest.java  |  89 ++
 .../python/org/apache/flink/python/api/data_csv |   2 +
 .../org/apache/flink/python/api/data_text       |   2 +
 .../org/apache/flink/python/api/test_csv.py     |  31 +
 .../org/apache/flink/python/api/test_main.py    | 264 ++++++
 .../org/apache/flink/python/api/test_text.py    |  30 +
 .../flink/python/api/test_type_deduction.py     |  63 ++
 .../org/apache/flink/python/api/test_types.py   |  70 ++
 flink-libraries/pom.xml                         |   1 +
 .../flink-language-binding-generic/pom.xml      |  61 --
 .../api/java/common/OperationInfo.java          | 242 -----
 .../api/java/common/PlanBinder.java             | 582 ------------
 .../api/java/common/streaming/Receiver.java     | 360 --------
 .../api/java/common/streaming/Sender.java       | 411 ---------
 .../java/common/streaming/StreamPrinter.java    |  55 --
 .../api/java/common/streaming/Streamer.java     | 258 ------
 .../flink-language-binding/flink-python/pom.xml |  86 --
 .../api/java/python/PythonPlanBinder.java       | 442 ---------
 .../java/python/functions/PythonCoGroup.java    |  78 --
 .../python/functions/PythonCombineIdentity.java |  79 --
 .../python/functions/PythonMapPartition.java    |  70 --
 .../java/python/streaming/PythonStreamer.java   | 167 ----
 .../languagebinding/api/python/__init__.py      |  17 -
 .../api/python/flink/__init__.py                |  17 -
 .../api/python/flink/connection/Collector.py    | 157 ----
 .../api/python/flink/connection/Connection.py   | 166 ----
 .../api/python/flink/connection/Constants.py    |  31 -
 .../api/python/flink/connection/Iterator.py     | 327 -------
 .../api/python/flink/connection/__init__.py     |  17 -
 .../api/python/flink/example/TPCHQuery10.py     | 115 ---
 .../api/python/flink/example/TPCHQuery3.py      | 106 ---
 .../python/flink/example/TriangleEnumeration.py | 152 ----
 .../api/python/flink/example/WebLogAnalysis.py  |  87 --
 .../api/python/flink/example/WordCount.py       |  61 --
 .../api/python/flink/example/__init__.py        |  17 -
 .../python/flink/functions/CoGroupFunction.py   |  53 --
 .../api/python/flink/functions/CrossFunction.py |  34 -
 .../python/flink/functions/FilterFunction.py    |  38 -
 .../python/flink/functions/FlatMapFunction.py   |  44 -
 .../api/python/flink/functions/Function.py      |  83 --
 .../flink/functions/GroupReduceFunction.py      | 127 ---
 .../api/python/flink/functions/JoinFunction.py  |  33 -
 .../api/python/flink/functions/MapFunction.py   |  36 -
 .../flink/functions/MapPartitionFunction.py     |  34 -
 .../python/flink/functions/ReduceFunction.py    | 123 ---
 .../python/flink/functions/RuntimeContext.py    |  30 -
 .../api/python/flink/functions/__init__.py      |  17 -
 .../api/python/flink/plan/Constants.py          | 106 ---
 .../api/python/flink/plan/DataSet.py            | 906 -------------------
 .../api/python/flink/plan/Environment.py        | 345 -------
 .../api/python/flink/plan/__init__.py           |  17 -
 .../api/python/flink/utilities/__init__.py      |  36 -
 .../flink/languagebinding/api/python/setup.py   |  33 -
 .../api/java/python/PythonPlanBinderTest.java   |  89 --
 .../api/python/flink/test/data_csv              |   2 -
 .../api/python/flink/test/data_text             |   2 -
 .../api/python/flink/test/test_csv.py           |  31 -
 .../api/python/flink/test/test_main.py          | 264 ------
 .../api/python/flink/test/test_text.py          |  30 -
 .../python/flink/test/test_type_deduction.py    |  63 --
 .../api/python/flink/test/test_types.py         |  70 --
 flink-staging/flink-language-binding/pom.xml    |  40 -
 flink-staging/pom.xml                           |   1 -
 pom.xml                                         |   4 +-
 106 files changed, 6530 insertions(+), 6851 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-dist/src/main/assemblies/bin.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index fdfff9c..602af68 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -170,7 +170,7 @@ under the License.
 
 		<!-- copy python package -->
 		<fileSet>
-			<directory>../flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python</directory>
+			<directory>../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api</directory>
 			<outputDirectory>resources/python</outputDirectory>
 			<fileMode>0755</fileMode>
 		</fileSet>

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/pom.xml b/flink-libraries/flink-python/pom.xml
new file mode 100644
index 0000000..747ff40
--- /dev/null
+++ b/flink-libraries/flink-python/pom.xml
@@ -0,0 +1,81 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-libraries</artifactId>
+        <version>1.0-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-python</artifactId>
+    <name>flink-python</name>
+    <packaging>jar</packaging>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                    <archive>
+                        <manifest>
+                            <addClasspath>true</addClasspath>
+                            <mainClass>org.apache.flink.python.api.PythonPlanBinder</mainClass>
+                        </manifest>
+                    </archive>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-optimizer</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
new file mode 100644
index 0000000..0ccf568
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.python.api;
+
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.tuple.Tuple;
+import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.python.api.PythonPlanBinder.Operation;
+import org.apache.flink.python.api.streaming.Receiver;
+
+public class PythonOperationInfo {
+	public int parentID; //DataSet that an operation is applied on
+	public int otherID; //secondary DataSet
+	public int setID; //ID for new DataSet
+	public String[] keys;
+	public String[] keys1; //join/cogroup keys
+	public String[] keys2; //join/cogroup keys
+	public TypeInformation<?> types; //typeinformation about output type
+	public AggregationEntry[] aggregates;
+	public ProjectionEntry[] projections; //projectFirst/projectSecond
+	public boolean combine;
+	public Object[] values;
+	public int count;
+	public int field;
+	public int[] fields;
+	public Order order;
+	public String path;
+	public String fieldDelimiter;
+	public String lineDelimiter;
+	public long from;
+	public long to;
+	public WriteMode writeMode;
+	public boolean toError;
+	public String name;
+
+	public PythonOperationInfo(Receiver receiver, Operation identifier) throws IOException {
+		Object tmpType;
+		switch (identifier) {
+			case SOURCE_CSV:
+				setID = (Integer) receiver.getRecord(true);
+				path = (String) receiver.getRecord();
+				fieldDelimiter = (String) receiver.getRecord();
+				lineDelimiter = (String) receiver.getRecord();
+				tmpType = (Tuple) receiver.getRecord();
+				types = tmpType == null ? null : getForObject(tmpType);
+				return;
+			case SOURCE_TEXT:
+				setID = (Integer) receiver.getRecord(true);
+				path = (String) receiver.getRecord();
+				return;
+			case SOURCE_VALUE:
+				setID = (Integer) receiver.getRecord(true);
+				int valueCount = (Integer) receiver.getRecord(true);
+				values = new Object[valueCount];
+				for (int x = 0; x < valueCount; x++) {
+					values[x] = receiver.getRecord();
+				}
+				return;
+			case SOURCE_SEQ:
+				setID = (Integer) receiver.getRecord(true);
+				from = (Long) receiver.getRecord();
+				to = (Long) receiver.getRecord();
+				return;
+			case SINK_CSV:
+				parentID = (Integer) receiver.getRecord(true);
+				path = (String) receiver.getRecord();
+				fieldDelimiter = (String) receiver.getRecord();
+				lineDelimiter = (String) receiver.getRecord();
+				writeMode = ((Integer) receiver.getRecord(true)) == 1
+						? WriteMode.OVERWRITE
+						: WriteMode.NO_OVERWRITE;
+				return;
+			case SINK_TEXT:
+				parentID = (Integer) receiver.getRecord(true);
+				path = (String) receiver.getRecord();
+				writeMode = ((Integer) receiver.getRecord(true)) == 1
+						? WriteMode.OVERWRITE
+						: WriteMode.NO_OVERWRITE;
+				return;
+			case SINK_PRINT:
+				parentID = (Integer) receiver.getRecord(true);
+				toError = (Boolean) receiver.getRecord();
+				return;
+			case BROADCAST:
+				parentID = (Integer) receiver.getRecord(true);
+				otherID = (Integer) receiver.getRecord(true);
+				name = (String) receiver.getRecord();
+				return;
+		}
+		setID = (Integer) receiver.getRecord(true);
+		parentID = (Integer) receiver.getRecord(true);
+		switch (identifier) {
+			case AGGREGATE:
+				count = (Integer) receiver.getRecord(true);
+				aggregates = new AggregationEntry[count];
+				for (int x = 0; x < count; x++) {
+					int encodedAgg = (Integer) receiver.getRecord(true);
+					int field = (Integer) receiver.getRecord(true);
+					aggregates[x] = new AggregationEntry(encodedAgg, field);
+				}
+				return;
+			case FIRST:
+				count = (Integer) receiver.getRecord(true);
+				return;
+			case DISTINCT:
+			case GROUPBY:
+			case PARTITION_HASH:
+				keys = normalizeKeys(receiver.getRecord(true));
+				return;
+			case PROJECTION:
+				fields = toIntArray(receiver.getRecord(true));
+				return;
+			case REBALANCE:
+				return;
+			case SORT:
+				field = (Integer) receiver.getRecord(true);
+				int encodedOrder = (Integer) receiver.getRecord(true);
+				switch (encodedOrder) {
+					case 0:
+						order = Order.NONE;
+						break;
+					case 1:
+						order = Order.ASCENDING;
+						break;
+					case 2:
+						order = Order.DESCENDING;
+						break;
+					case 3:
+						order = Order.ANY;
+						break;
+					default:
+						order = Order.NONE;
+						break;
+				}
+				return;
+			case UNION:
+				otherID = (Integer) receiver.getRecord(true);
+				return;
+			case COGROUP:
+				otherID = (Integer) receiver.getRecord(true);
+				keys1 = normalizeKeys(receiver.getRecord(true));
+				keys2 = normalizeKeys(receiver.getRecord(true));
+				tmpType = receiver.getRecord();
+				types = tmpType == null ? null : getForObject(tmpType);
+				name = (String) receiver.getRecord();
+				return;
+			case CROSS:
+			case CROSS_H:
+			case CROSS_T:
+				otherID = (Integer) receiver.getRecord(true);
+				tmpType = receiver.getRecord();
+				types = tmpType == null ? null : getForObject(tmpType);
+				int cProjectCount = (Integer) receiver.getRecord(true);
+				projections = new ProjectionEntry[cProjectCount];
+				for (int x = 0; x < cProjectCount; x++) {
+					String side = (String) receiver.getRecord();
+					int[] keys = toIntArray((Tuple) receiver.getRecord(true));
+					projections[x] = new ProjectionEntry(ProjectionSide.valueOf(side.toUpperCase()), keys);
+				}
+				name = (String) receiver.getRecord();
+				return;
+			case REDUCE:
+			case GROUPREDUCE:
+				tmpType = receiver.getRecord();
+				types = tmpType == null ? null : getForObject(tmpType);
+				combine = (Boolean) receiver.getRecord();
+				name = (String) receiver.getRecord();
+				return;
+			case JOIN:
+			case JOIN_H:
+			case JOIN_T:
+				keys1 = normalizeKeys(receiver.getRecord(true));
+				keys2 = normalizeKeys(receiver.getRecord(true));
+				otherID = (Integer) receiver.getRecord(true);
+				tmpType = receiver.getRecord();
+				types = tmpType == null ? null : getForObject(tmpType);
+				int jProjectCount = (Integer) receiver.getRecord(true);
+				projections = new ProjectionEntry[jProjectCount];
+				for (int x = 0; x < jProjectCount; x++) {
+					String side = (String) receiver.getRecord();
+					int[] keys = toIntArray((Tuple) receiver.getRecord(true));
+					projections[x] = new ProjectionEntry(ProjectionSide.valueOf(side.toUpperCase()), keys);
+				}
+				name = (String) receiver.getRecord();
+				return;
+			case MAPPARTITION:
+			case FLATMAP:
+			case MAP:
+			case FILTER:
+				tmpType = receiver.getRecord();
+				types = tmpType == null ? null : getForObject(tmpType);
+				name = (String) receiver.getRecord();
+				return;
+			default:
+				throw new UnsupportedOperationException("This operation is not implemented in the Python API: " + identifier);
+		}
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+		sb.append("SetID: ").append(setID).append("\n");
+		sb.append("ParentID: ").append(parentID).append("\n");
+		sb.append("OtherID: ").append(otherID).append("\n");
+		sb.append("Name: ").append(name).append("\n");
+		sb.append("Types: ").append(types).append("\n");
+		sb.append("Keys1: ").append(Arrays.toString(keys1)).append("\n");
+		sb.append("Keys2: ").append(Arrays.toString(keys2)).append("\n");
+		sb.append("Keys: ").append(Arrays.toString(keys)).append("\n");
+		sb.append("Aggregates: ").append(Arrays.toString(aggregates)).append("\n");
+		sb.append("Projections: ").append(Arrays.toString(projections)).append("\n");
+		sb.append("Combine: ").append(combine).append("\n");
+		sb.append("Count: ").append(count).append("\n");
+		sb.append("Field: ").append(field).append("\n");
+		sb.append("Order: ").append(order.toString()).append("\n");
+		sb.append("Path: ").append(path).append("\n");
+		sb.append("FieldDelimiter: ").append(fieldDelimiter).append("\n");
+		sb.append("LineDelimiter: ").append(lineDelimiter).append("\n");
+		sb.append("From: ").append(from).append("\n");
+		sb.append("To: ").append(to).append("\n");
+		sb.append("WriteMode: ").append(writeMode).append("\n");
+		sb.append("toError: ").append(toError).append("\n");
+		return sb.toString();
+	}
+
+	public static class AggregationEntry {
+		public Aggregations agg;
+		public int field;
+
+		public AggregationEntry(int encodedAgg, int field) {
+			switch (encodedAgg) {
+				case 0:
+					agg = Aggregations.MAX;
+					break;
+				case 1:
+					agg = Aggregations.MIN;
+					break;
+				case 2:
+					agg = Aggregations.SUM;
+					break;
+			}
+			this.field = field;
+		}
+
+		@Override
+		public String toString() {
+			return agg.toString() + " - " + field;
+		}
+	}
+
+	public static class ProjectionEntry {
+		public ProjectionSide side;
+		public int[] keys;
+
+		public ProjectionEntry(ProjectionSide side, int[] keys) {
+			this.side = side;
+			this.keys = keys;
+		}
+
+		@Override
+		public String toString() {
+			return side + " - " + Arrays.toString(keys);
+		}
+	}
+
+	public enum ProjectionSide {
+		FIRST,
+		SECOND
+	}
+
+	public enum DatasizeHint {
+		NONE,
+		TINY,
+		HUGE
+	}
+
+	//====Utility=======================================================================================================
+	private static String[] normalizeKeys(Object keys) {
+		if (keys instanceof Tuple) {
+			Tuple tupleKeys = (Tuple) keys;
+			if (tupleKeys.getArity() == 0) {
+				return new String[0];
+			}
+			if (tupleKeys.getField(0) instanceof Integer) {
+				String[] stringKeys = new String[tupleKeys.getArity()];
+				for (int x = 0; x < stringKeys.length; x++) {
+					stringKeys[x] = "f" + (Integer) tupleKeys.getField(x);
+				}
+				return stringKeys;
+			}
+			if (tupleKeys.getField(0) instanceof String) {
+				return tupleToStringArray(tupleKeys);
+			}
+			throw new RuntimeException("Key argument contains field that is neither an int nor a String.");
+		}
+		if (keys instanceof int[]) {
+			int[] intKeys = (int[]) keys;
+			String[] stringKeys = new String[intKeys.length];
+			for (int x = 0; x < stringKeys.length; x++) {
+				stringKeys[x] = "f" + intKeys[x];
+			}
+			return stringKeys;
+		}
+		throw new RuntimeException("Key argument is neither an int[] nor a Tuple.");
+	}
+
+	private static int[] toIntArray(Object key) {
+		if (key instanceof Tuple) {
+			Tuple tuple = (Tuple) key;
+			int[] keys = new int[tuple.getArity()];
+			for (int y = 0; y < tuple.getArity(); y++) {
+				keys[y] = (Integer) tuple.getField(y);
+			}
+			return keys;
+		}
+		if (key instanceof int[]) {
+			return (int[]) key;
+		}
+		throw new RuntimeException("Key argument is neither an int[] nor a Tuple.");
+	}
+
+	private static String[] tupleToStringArray(Tuple tuple) {
+		String[] keys = new String[tuple.getArity()];
+		for (int y = 0; y < tuple.getArity(); y++) {
+			keys[y] = (String) tuple.getField(y);
+		}
+		return keys;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
new file mode 100644
index 0000000..6c0a375
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -0,0 +1,750 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.python.api;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.HashMap;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.LocalEnvironment;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.io.PrintingOutputFormat;
+import org.apache.flink.api.java.operators.AggregateOperator;
+import org.apache.flink.api.java.operators.CoGroupRawOperator;
+import org.apache.flink.api.java.operators.CrossOperator.DefaultCross;
+import org.apache.flink.api.java.operators.CrossOperator.ProjectCross;
+import org.apache.flink.api.java.operators.Grouping;
+import org.apache.flink.api.java.operators.JoinOperator.DefaultJoin;
+import org.apache.flink.api.java.operators.JoinOperator.ProjectJoin;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.operators.SortedGrouping;
+import org.apache.flink.api.java.operators.UdfOperator;
+import org.apache.flink.api.java.operators.UnsortedGrouping;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.python.api.PythonOperationInfo.DatasizeHint;
+import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.HUGE;
+import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.NONE;
+import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.TINY;
+import org.apache.flink.python.api.PythonOperationInfo.ProjectionEntry;
+import org.apache.flink.python.api.functions.PythonCoGroup;
+import org.apache.flink.python.api.functions.PythonCombineIdentity;
+import org.apache.flink.python.api.functions.PythonMapPartition;
+import org.apache.flink.python.api.streaming.Receiver;
+import org.apache.flink.python.api.streaming.StreamPrinter;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class allows the execution of a Flink plan written in python.
+ */
+public class PythonPlanBinder {
+	static final Logger LOG = LoggerFactory.getLogger(PythonPlanBinder.class);
+
+	public static final String ARGUMENT_PYTHON_2 = "2";
+	public static final String ARGUMENT_PYTHON_3 = "3";
+
+	public static final String FLINK_PYTHON_DC_ID = "flink";
+	public static final String FLINK_PYTHON_PLAN_NAME = "/plan.py";
+
+	public static final String FLINK_PYTHON2_BINARY_KEY = "python.binary.python2";
+	public static final String FLINK_PYTHON3_BINARY_KEY = "python.binary.python3";
+	public static final String PLANBINDER_CONFIG_BCVAR_COUNT = "PLANBINDER_BCVAR_COUNT";
+	public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = "PLANBINDER_BCVAR_";
+	public static String FLINK_PYTHON2_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python");
+	public static String FLINK_PYTHON3_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
+
+	private static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir") + "/flink_plan";
+	private static final String FLINK_PYTHON_REL_LOCAL_PATH = "/resources/python";
+	private static final String FLINK_DIR = System.getenv("FLINK_ROOT_DIR");
+	private static String FULL_PATH;
+
+	public static StringBuilder arguments = new StringBuilder();
+
+	private Process process;
+
+	public static boolean usePython3 = false;
+
+	private static String FLINK_HDFS_PATH = "hdfs:/tmp";
+	public static final String FLINK_TMP_DATA_DIR = System.getProperty("java.io.tmpdir") + "/flink_data";
+
+	public static boolean DEBUG = false;
+
+	private HashMap<Integer, Object> sets = new HashMap();
+	public ExecutionEnvironment env;
+	private Receiver receiver;
+
+	public static final int MAPPED_FILE_SIZE = 1024 * 1024 * 64;
+
+	/**
+	 * Entry point for the execution of a python plan.
+	 *
+	 * @param args planPath[ package1[ packageX[ - parameter1[ parameterX]]]]
+	 * @throws Exception
+	 */
+	public static void main(String[] args) throws Exception {
+		if (args.length < 2) {
+			System.out.println("Usage: ./bin/pyflink<2/3>.sh <pathToScript>[ <pathToPackage1>[ <pathToPackageX]][ - <parameter1>[ <parameterX>]]");
+			return;
+		}
+		usePython3 = args[0].equals(ARGUMENT_PYTHON_3);
+		PythonPlanBinder binder = new PythonPlanBinder();
+		binder.runPlan(Arrays.copyOfRange(args, 1, args.length));
+	}
+
+	public PythonPlanBinder() throws IOException {
+		FLINK_PYTHON2_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python");
+		FLINK_PYTHON3_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
+		FULL_PATH = FLINK_DIR != null
+				? FLINK_DIR + FLINK_PYTHON_REL_LOCAL_PATH //command-line
+				: FileSystem.getLocalFileSystem().getWorkingDirectory().toString() //testing
+				+ "/src/main/python/org/apache/flink/python/api";
+	}
+
+	private void runPlan(String[] args) throws Exception {
+		env = ExecutionEnvironment.getExecutionEnvironment();
+
+		int split = 0;
+		for (int x = 0; x < args.length; x++) {
+			if (args[x].compareTo("-") == 0) {
+				split = x;
+			}
+		}
+
+		try {
+			prepareFiles(Arrays.copyOfRange(args, 0, split == 0 ? 1 : split));
+			startPython(Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length));
+			receivePlan();
+
+			if (env instanceof LocalEnvironment) {
+				FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + "/flink";
+			}
+
+			distributeFiles(env);
+			env.execute();
+			close();
+		} catch (Exception e) {
+			close();
+			throw e;
+		}
+	}
+
+	//=====Setup========================================================================================================
+	/**
+	 * Copies all files to a common directory (FLINK_PYTHON_FILE_PATH). This allows us to distribute it as one big
+	 * package, and resolves PYTHONPATH issues.
+	 *
+	 * @param filePaths
+	 * @throws IOException
+	 * @throws URISyntaxException
+	 */
+	private void prepareFiles(String... filePaths) throws IOException, URISyntaxException {
+		//Flink python package
+		String tempFilePath = FLINK_PYTHON_FILE_PATH;
+		clearPath(tempFilePath);
+		FileCache.copy(new Path(FULL_PATH), new Path(tempFilePath), false);
+
+		//plan file		
+		copyFile(filePaths[0], FLINK_PYTHON_PLAN_NAME);
+
+		//additional files/folders
+		for (int x = 1; x < filePaths.length; x++) {
+			copyFile(filePaths[x], null);
+		}
+	}
+
+	private static void clearPath(String path) throws IOException, URISyntaxException {
+		FileSystem fs = FileSystem.get(new URI(path));
+		if (fs.exists(new Path(path))) {
+			fs.delete(new Path(path), true);
+		}
+	}
+
+	private static void copyFile(String path, String name) throws IOException, URISyntaxException {
+		if (path.endsWith("/")) {
+			path = path.substring(0, path.length() - 1);
+		}
+		String identifier = name == null ? path.substring(path.lastIndexOf("/")) : name;
+		String tmpFilePath = FLINK_PYTHON_FILE_PATH + "/" + identifier;
+		clearPath(tmpFilePath);
+		Path p = new Path(path);
+		FileCache.copy(p.makeQualified(FileSystem.get(p.toUri())), new Path(tmpFilePath), true);
+	}
+
+	private static void distributeFiles(ExecutionEnvironment env) throws IOException, URISyntaxException {
+		clearPath(FLINK_HDFS_PATH);
+		FileCache.copy(new Path(FLINK_PYTHON_FILE_PATH), new Path(FLINK_HDFS_PATH), true);
+		env.registerCachedFile(FLINK_HDFS_PATH, FLINK_PYTHON_DC_ID);
+		clearPath(FLINK_PYTHON_FILE_PATH);
+	}
+
+	private void startPython(String[] args) throws IOException {
+		for (String arg : args) {
+			arguments.append(" ").append(arg);
+		}
+		receiver = new Receiver(null);
+		receiver.open(FLINK_TMP_DATA_DIR + "/output");
+
+		String pythonBinaryPath = usePython3 ? FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH;
+
+		try {
+			Runtime.getRuntime().exec(pythonBinaryPath);
+		} catch (IOException ex) {
+			throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary.");
+		}
+		process = Runtime.getRuntime().exec(pythonBinaryPath + " -B " + FLINK_PYTHON_FILE_PATH + FLINK_PYTHON_PLAN_NAME + arguments.toString());
+
+		new StreamPrinter(process.getInputStream()).start();
+		new StreamPrinter(process.getErrorStream()).start();
+
+		try {
+			Thread.sleep(2000);
+		} catch (InterruptedException ex) {
+		}
+
+		try {
+			int value = process.exitValue();
+			if (value != 0) {
+				throw new RuntimeException("Plan file caused an error. Check log-files for details.");
+			}
+			if (value == 0) {
+				throw new RuntimeException("Plan file exited prematurely without an error.");
+			}
+		} catch (IllegalThreadStateException ise) {//Process still running
+		}
+
+		process.getOutputStream().write("plan\n".getBytes());
+		process.getOutputStream().write((FLINK_TMP_DATA_DIR + "/output\n").getBytes());
+		process.getOutputStream().flush();
+	}
+
+	private void close() {
+		try { //prevent throwing exception so that previous exceptions aren't hidden.
+			if (!DEBUG) {
+				FileSystem hdfs = FileSystem.get(new URI(FLINK_HDFS_PATH));
+				hdfs.delete(new Path(FLINK_HDFS_PATH), true);
+			}
+
+			FileSystem local = FileSystem.getLocalFileSystem();
+			local.delete(new Path(FLINK_PYTHON_FILE_PATH), true);
+			local.delete(new Path(FLINK_TMP_DATA_DIR), true);
+			receiver.close();
+		} catch (NullPointerException npe) {
+		} catch (IOException ioe) {
+			LOG.error("PythonAPI file cleanup failed. " + ioe.getMessage());
+		} catch (URISyntaxException use) { // can't occur
+		}
+		try {
+			process.exitValue();
+		} catch (NullPointerException npe) { //exception occurred before process was started
+		} catch (IllegalThreadStateException ise) { //process still active
+			process.destroy();
+		}
+	}
+
+	//====Plan==========================================================================================================
+	private void receivePlan() throws IOException {
+		receiveParameters();
+		receiveOperations();
+	}
+
+	//====Environment===================================================================================================
+	/**
+	 * This enum contains the identifiers for all supported environment parameters.
+	 */
+	private enum Parameters {
+		DOP,
+		MODE,
+		RETRY,
+		DEBUG
+	}
+
+	private void receiveParameters() throws IOException {
+		Integer parameterCount = (Integer) receiver.getRecord(true);
+
+		for (int x = 0; x < parameterCount; x++) {
+			Tuple value = (Tuple) receiver.getRecord(true);
+			switch (Parameters.valueOf(((String) value.getField(0)).toUpperCase())) {
+				case DOP:
+					Integer dop = (Integer) value.getField(1);
+					env.setParallelism(dop);
+					break;
+				case MODE:
+					FLINK_HDFS_PATH = (Boolean) value.getField(1) ? "file:/tmp/flink" : "hdfs:/tmp/flink";
+					break;
+				case RETRY:
+					int retry = (Integer) value.getField(1);
+					env.setNumberOfExecutionRetries(retry);
+					break;
+				case DEBUG:
+					DEBUG = (Boolean) value.getField(1);
+					break;
+			}
+		}
+		if (env.getParallelism() < 0) {
+			env.setParallelism(1);
+		}
+	}
+
+	//====Operations====================================================================================================
+	/**
+	 * This enum contains the identifiers for all supported DataSet operations.
+	 */
+	protected enum Operation {
+		SOURCE_CSV, SOURCE_TEXT, SOURCE_VALUE, SOURCE_SEQ, SINK_CSV, SINK_TEXT, SINK_PRINT,
+		PROJECTION, SORT, UNION, FIRST, DISTINCT, GROUPBY, AGGREGATE,
+		REBALANCE, PARTITION_HASH,
+		BROADCAST,
+		COGROUP, CROSS, CROSS_H, CROSS_T, FILTER, FLATMAP, GROUPREDUCE, JOIN, JOIN_H, JOIN_T, MAP, REDUCE, MAPPARTITION
+	}
+
+	private void receiveOperations() throws IOException {
+		Integer operationCount = (Integer) receiver.getRecord(true);
+		for (int x = 0; x < operationCount; x++) {
+			String identifier = (String) receiver.getRecord();
+			Operation op = null;
+			try {
+				op = Operation.valueOf(identifier.toUpperCase());
+			} catch (IllegalArgumentException iae) {
+				throw new IllegalArgumentException("Invalid operation specified: " + identifier);
+			}
+			if (op != null) {
+				switch (op) {
+					case SOURCE_CSV:
+						createCsvSource(createOperationInfo(op));
+						break;
+					case SOURCE_TEXT:
+						createTextSource(createOperationInfo(op));
+						break;
+					case SOURCE_VALUE:
+						createValueSource(createOperationInfo(op));
+						break;
+					case SOURCE_SEQ:
+						createSequenceSource(createOperationInfo(op));
+						break;
+					case SINK_CSV:
+						createCsvSink(createOperationInfo(op));
+						break;
+					case SINK_TEXT:
+						createTextSink(createOperationInfo(op));
+						break;
+					case SINK_PRINT:
+						createPrintSink(createOperationInfo(op));
+						break;
+					case BROADCAST:
+						createBroadcastVariable(createOperationInfo(op));
+						break;
+					case AGGREGATE:
+						createAggregationOperation(createOperationInfo(op));
+						break;
+					case DISTINCT:
+						createDistinctOperation(createOperationInfo(op));
+						break;
+					case FIRST:
+						createFirstOperation(createOperationInfo(op));
+						break;
+					case PARTITION_HASH:
+						createHashPartitionOperation(createOperationInfo(op));
+						break;
+					case PROJECTION:
+						createProjectOperation(createOperationInfo(op));
+						break;
+					case REBALANCE:
+						createRebalanceOperation(createOperationInfo(op));
+						break;
+					case GROUPBY:
+						createGroupOperation(createOperationInfo(op));
+						break;
+					case SORT:
+						createSortOperation(createOperationInfo(op));
+						break;
+					case UNION:
+						createUnionOperation(createOperationInfo(op));
+						break;
+					case COGROUP:
+						createCoGroupOperation(createOperationInfo(op));
+						break;
+					case CROSS:
+						createCrossOperation(NONE, createOperationInfo(op));
+						break;
+					case CROSS_H:
+						createCrossOperation(HUGE, createOperationInfo(op));
+						break;
+					case CROSS_T:
+						createCrossOperation(TINY, createOperationInfo(op));
+						break;
+					case FILTER:
+						createFilterOperation(createOperationInfo(op));
+						break;
+					case FLATMAP:
+						createFlatMapOperation(createOperationInfo(op));
+						break;
+					case GROUPREDUCE:
+						createGroupReduceOperation(createOperationInfo(op));
+						break;
+					case JOIN:
+						createJoinOperation(NONE, createOperationInfo(op));
+						break;
+					case JOIN_H:
+						createJoinOperation(HUGE, createOperationInfo(op));
+						break;
+					case JOIN_T:
+						createJoinOperation(TINY, createOperationInfo(op));
+						break;
+					case MAP:
+						createMapOperation(createOperationInfo(op));
+						break;
+					case MAPPARTITION:
+						createMapPartitionOperation(createOperationInfo(op));
+						break;
+					case REDUCE:
+						createReduceOperation(createOperationInfo(op));
+						break;
+				}
+			}
+		}
+	}
+
+	/**
+	 * This method creates an OperationInfo object based on the operation-identifier passed.
+	 *
+	 * @param operationIdentifier
+	 * @return
+	 * @throws IOException
+	 */
+	private PythonOperationInfo createOperationInfo(Operation operationIdentifier) throws IOException {
+		return new PythonOperationInfo(receiver, operationIdentifier);
+	}
+
+	private void createCsvSource(PythonOperationInfo info) throws IOException {
+		if (!(info.types instanceof CompositeType)) {
+			throw new RuntimeException("The output type of a csv source has to be a tuple or a "
+					+ "pojo type. The derived type is " + info);
+		}
+
+		sets.put(info.setID, env.createInput(new CsvInputFormat(new Path(info.path),
+				info.lineDelimiter, info.fieldDelimiter, (CompositeType) info.types), info.types)
+				.name("CsvSource"));
+	}
+
+	private void createTextSource(PythonOperationInfo info) throws IOException {
+		sets.put(info.setID, env.readTextFile(info.path).name("TextSource"));
+	}
+
+	private void createValueSource(PythonOperationInfo info) throws IOException {
+		sets.put(info.setID, env.fromElements(info.values).name("ValueSource"));
+	}
+
+	private void createSequenceSource(PythonOperationInfo info) throws IOException {
+		sets.put(info.setID, env.generateSequence(info.from, info.to).name("SequenceSource"));
+	}
+
+	private void createCsvSink(PythonOperationInfo info) throws IOException {
+		DataSet parent = (DataSet) sets.get(info.parentID);
+		parent.writeAsCsv(info.path, info.lineDelimiter, info.fieldDelimiter, info.writeMode).name("CsvSink");
+	}
+
+	private void createTextSink(PythonOperationInfo info) throws IOException {
+		DataSet parent = (DataSet) sets.get(info.parentID);
+		parent.writeAsText(info.path, info.writeMode).name("TextSink");
+	}
+
+	private void createPrintSink(PythonOperationInfo info) throws IOException {
+		DataSet parent = (DataSet) sets.get(info.parentID);
+		parent.output(new PrintingOutputFormat(info.toError));
+	}
+
+	private void createBroadcastVariable(PythonOperationInfo info) throws IOException {
+		UdfOperator op1 = (UdfOperator) sets.get(info.parentID);
+		DataSet op2 = (DataSet) sets.get(info.otherID);
+
+		op1.withBroadcastSet(op2, info.name);
+		Configuration c = ((UdfOperator) op1).getParameters();
+
+		if (c == null) {
+			c = new Configuration();
+		}
+
+		int count = c.getInteger(PLANBINDER_CONFIG_BCVAR_COUNT, 0);
+		c.setInteger(PLANBINDER_CONFIG_BCVAR_COUNT, count + 1);
+		c.setString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + count, info.name);
+
+		op1.withParameters(c);
+	}
+
+	private void createAggregationOperation(PythonOperationInfo info) throws IOException {
+		DataSet op = (DataSet) sets.get(info.parentID);
+		AggregateOperator ao = op.aggregate(info.aggregates[0].agg, info.aggregates[0].field);
+
+		for (int x = 1; x < info.count; x++) {
+			ao = ao.and(info.aggregates[x].agg, info.aggregates[x].field);
+		}
+
+		sets.put(info.setID, ao.name("Aggregation"));
+	}
+
+	private void createDistinctOperation(PythonOperationInfo info) throws IOException {
+		DataSet op = (DataSet) sets.get(info.parentID);
+		sets.put(info.setID, info.keys.length == 0 ? op.distinct() : op.distinct(info.keys).name("Distinct"));
+	}
+
+	private void createFirstOperation(PythonOperationInfo info) throws IOException {
+		DataSet op = (DataSet) sets.get(info.parentID);
+		sets.put(info.setID, op.first(info.count).name("First"));
+	}
+
+	private void createGroupOperation(PythonOperationInfo info) throws IOException {
+		DataSet op1 = (DataSet) sets.get(info.parentID);
+		sets.put(info.setID, op1.groupBy(info.keys));
+	}
+
+	private void createHashPartitionOperation(PythonOperationInfo info) throws IOException {
+		DataSet op1 = (DataSet) sets.get(info.parentID);
+		sets.put(info.setID, op1.partitionByHash(info.keys));
+	}
+
+	private void createProjectOperation(PythonOperationInfo info) throws IOException {
+		DataSet op1 = (DataSet) sets.get(info.parentID);
+		sets.put(info.setID, op1.project(info.fields).name("Projection"));
+	}
+
+	private void createRebalanceOperation(PythonOperationInfo info) throws IOException {
+		DataSet op = (DataSet) sets.get(info.parentID);
+		sets.put(info.setID, op.rebalance().name("Rebalance"));
+	}
+
+	private void createSortOperation(PythonOperationInfo info) throws IOException {
+		Grouping op1 = (Grouping) sets.get(info.parentID);
+		if (op1 instanceof UnsortedGrouping) {
+			sets.put(info.setID, ((UnsortedGrouping) op1).sortGroup(info.field, info.order));
+			return;
+		}
+		if (op1 instanceof SortedGrouping) {
+			sets.put(info.setID, ((SortedGrouping) op1).sortGroup(info.field, info.order));
+		}
+	}
+
+	private void createUnionOperation(PythonOperationInfo info) throws IOException {
+		DataSet op1 = (DataSet) sets.get(info.parentID);
+		DataSet op2 = (DataSet) sets.get(info.otherID);
+		sets.put(info.setID, op1.union(op2).name("Union"));
+	}
+
+	private void createCoGroupOperation(PythonOperationInfo info) {
+		DataSet op1 = (DataSet) sets.get(info.parentID);
+		DataSet op2 = (DataSet) sets.get(info.otherID);
+		sets.put(info.setID, new CoGroupRawOperator(
+				op1,
+				op2,
+				new Keys.ExpressionKeys(info.keys1, op1.getType()),
+				new Keys.ExpressionKeys(info.keys2, op2.getType()),
+				new PythonCoGroup(info.setID, info.types),
+				info.types, info.name));
+	}
+
+	private void createCrossOperation(DatasizeHint mode, PythonOperationInfo info) {
+		DataSet op1 = (DataSet) sets.get(info.parentID);
+		DataSet op2 = (DataSet) sets.get(info.otherID);
+
+		DefaultCross defaultResult;
+		switch (mode) {
+			case NONE:
+				defaultResult = op1.cross(op2);
+				break;
+			case HUGE:
+				defaultResult = op1.crossWithHuge(op2);
+				break;
+			case TINY:
+				defaultResult = op1.crossWithTiny(op2);
+				break;
+			default:
+				throw new IllegalArgumentException("Invalid Cross mode specified: " + mode);
+		}
+		if (info.types != null && (info.projections == null || info.projections.length == 0)) {
+			sets.put(info.setID, defaultResult.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name));
+		} else if (info.projections.length == 0) {
+			sets.put(info.setID, defaultResult.name("DefaultCross"));
+		} else {
+			ProjectCross project = null;
+			for (ProjectionEntry pe : info.projections) {
+				switch (pe.side) {
+					case FIRST:
+						project = project == null ? defaultResult.projectFirst(pe.keys) : project.projectFirst(pe.keys);
+						break;
+					case SECOND:
+						project = project == null ? defaultResult.projectSecond(pe.keys) : project.projectSecond(pe.keys);
+						break;
+				}
+			}
+			sets.put(info.setID, project.name("ProjectCross"));
+		}
+	}
+
+	private void createFilterOperation(PythonOperationInfo info) {
+		DataSet op1 = (DataSet) sets.get(info.parentID);
+		sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name));
+	}
+
+	private void createFlatMapOperation(PythonOperationInfo info) {
+		DataSet op1 = (DataSet) sets.get(info.parentID);
+		sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name));
+	}
+
+	private void createGroupReduceOperation(PythonOperationInfo info) {
+		Object op1 = sets.get(info.parentID);
+		if (op1 instanceof DataSet) {
+			sets.put(info.setID, applyGroupReduceOperation((DataSet) op1, info));
+			return;
+		}
+		if (op1 instanceof UnsortedGrouping) {
+			sets.put(info.setID, applyGroupReduceOperation((UnsortedGrouping) op1, info));
+			return;
+		}
+		if (op1 instanceof SortedGrouping) {
+			sets.put(info.setID, applyGroupReduceOperation((SortedGrouping) op1, info));
+		}
+	}
+
+	private DataSet applyGroupReduceOperation(DataSet op1, PythonOperationInfo info) {
+		if (info.combine) {
+			return op1.reduceGroup(new PythonCombineIdentity(info.setID * -1))
+					.setCombinable(true).name("PythonCombine")
+					.mapPartition(new PythonMapPartition(info.setID, info.types))
+					.name(info.name);
+		} else {
+			return op1.reduceGroup(new PythonCombineIdentity())
+					.setCombinable(false).name("PythonGroupReducePreStep")
+					.mapPartition(new PythonMapPartition(info.setID, info.types))
+					.name(info.name);
+		}
+	}
+
+	private DataSet applyGroupReduceOperation(UnsortedGrouping op1, PythonOperationInfo info) {
+		if (info.combine) {
+			return op1.reduceGroup(new PythonCombineIdentity(info.setID * -1))
+					.setCombinable(true).name("PythonCombine")
+					.mapPartition(new PythonMapPartition(info.setID, info.types))
+					.name(info.name);
+		} else {
+			return op1.reduceGroup(new PythonCombineIdentity())
+					.setCombinable(false).name("PythonGroupReducePreStep")
+					.mapPartition(new PythonMapPartition(info.setID, info.types))
+					.name(info.name);
+		}
+	}
+
+	private DataSet applyGroupReduceOperation(SortedGrouping op1, PythonOperationInfo info) {
+		if (info.combine) {
+			return op1.reduceGroup(new PythonCombineIdentity(info.setID * -1))
+					.setCombinable(true).name("PythonCombine")
+					.mapPartition(new PythonMapPartition(info.setID, info.types))
+					.name(info.name);
+		} else {
+			return op1.reduceGroup(new PythonCombineIdentity())
+					.setCombinable(false).name("PythonGroupReducePreStep")
+					.mapPartition(new PythonMapPartition(info.setID, info.types))
+					.name(info.name);
+		}
+	}
+
+	private void createJoinOperation(DatasizeHint mode, PythonOperationInfo info) {
+		DataSet op1 = (DataSet) sets.get(info.parentID);
+		DataSet op2 = (DataSet) sets.get(info.otherID);
+
+		if (info.types != null && (info.projections == null || info.projections.length == 0)) {
+			sets.put(info.setID, createDefaultJoin(op1, op2, info.keys1, info.keys2, mode).name("PythonJoinPreStep")
+					.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name));
+		} else {
+			DefaultJoin defaultResult = createDefaultJoin(op1, op2, info.keys1, info.keys2, mode);
+			if (info.projections.length == 0) {
+				sets.put(info.setID, defaultResult.name("DefaultJoin"));
+			} else {
+				ProjectJoin project = null;
+				for (ProjectionEntry pe : info.projections) {
+					switch (pe.side) {
+						case FIRST:
+							project = project == null ? defaultResult.projectFirst(pe.keys) : project.projectFirst(pe.keys);
+							break;
+						case SECOND:
+							project = project == null ? defaultResult.projectSecond(pe.keys) : project.projectSecond(pe.keys);
+							break;
+					}
+				}
+				sets.put(info.setID, project.name("ProjectJoin"));
+			}
+		}
+	}
+
+	private DefaultJoin createDefaultJoin(DataSet op1, DataSet op2, String[] firstKeys, String[] secondKeys, DatasizeHint mode) {
+		switch (mode) {
+			case NONE:
+				return op1.join(op2).where(firstKeys).equalTo(secondKeys);
+			case HUGE:
+				return op1.joinWithHuge(op2).where(firstKeys).equalTo(secondKeys);
+			case TINY:
+				return op1.joinWithTiny(op2).where(firstKeys).equalTo(secondKeys);
+			default:
+				throw new IllegalArgumentException("Invalid join mode specified.");
+		}
+	}
+
+	private void createMapOperation(PythonOperationInfo info) {
+		DataSet op1 = (DataSet) sets.get(info.parentID);
+		sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name));
+	}
+
+	private void createMapPartitionOperation(PythonOperationInfo info) {
+		DataSet op1 = (DataSet) sets.get(info.parentID);
+		sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name));
+	}
+
+	private void createReduceOperation(PythonOperationInfo info) {
+		Object op1 = sets.get(info.parentID);
+		if (op1 instanceof DataSet) {
+			sets.put(info.setID, applyReduceOperation((DataSet) op1, info));
+			return;
+		}
+		if (op1 instanceof UnsortedGrouping) {
+			sets.put(info.setID, applyReduceOperation((UnsortedGrouping) op1, info));
+		}
+	}
+
+	private DataSet applyReduceOperation(DataSet op1, PythonOperationInfo info) {
+		return op1.reduceGroup(new PythonCombineIdentity())
+				.setCombinable(false).name("PythonReducePreStep")
+				.mapPartition(new PythonMapPartition(info.setID, info.types))
+				.name(info.name);
+	}
+
+	private DataSet applyReduceOperation(UnsortedGrouping op1, PythonOperationInfo info) {
+		if (info.combine) {
+			return op1.reduceGroup(new PythonCombineIdentity(info.setID * -1))
+					.setCombinable(true).name("PythonCombine")
+					.mapPartition(new PythonMapPartition(info.setID, info.types))
+					.name(info.name);
+		} else {
+			return op1.reduceGroup(new PythonCombineIdentity())
+					.setCombinable(false).name("PythonReducePreStep")
+					.mapPartition(new PythonMapPartition(info.setID, info.types))
+					.name(info.name);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
new file mode 100644
index 0000000..4d74878
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.python.api.functions;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.api.streaming.PythonStreamer;
+import org.apache.flink.util.Collector;
+import java.io.IOException;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+/**
+ * CoGroupFunction that uses a python script.
+ *
+ * @param <IN1>
+ * @param <IN2>
+ * @param <OUT>
+ */
+public class PythonCoGroup<IN1, IN2, OUT> extends RichCoGroupFunction<IN1, IN2, OUT> implements ResultTypeQueryable {
+	private final PythonStreamer streamer;
+	private transient final TypeInformation<OUT> typeInformation;
+
+	public PythonCoGroup(int id, TypeInformation<OUT> typeInformation) {
+		this.typeInformation = typeInformation;
+		streamer = new PythonStreamer(this, id);
+	}
+
+	/**
+	 * Opens this function.
+	 *
+	 * @param config configuration
+	 * @throws IOException
+	 */
+	@Override
+	public void open(Configuration config) throws IOException {
+		streamer.open();
+		streamer.sendBroadCastVariables(config);
+	}
+
+	/**
+	 * Calls the external python function.
+	 *
+	 * @param first
+	 * @param second
+	 * @param out collector
+	 * @throws IOException
+	 */
+	@Override
+	public final void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<OUT> out) throws Exception {
+		streamer.streamBufferWithGroups(first.iterator(), second.iterator(), out);
+	}
+
+	/**
+	 * Closes this function.
+	 *
+	 * @throws IOException
+	 */
+	@Override
+	public void close() throws IOException {
+		streamer.close();
+	}
+
+	@Override
+	public TypeInformation<OUT> getProducedType() {
+		return typeInformation;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCombineIdentity.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCombineIdentity.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCombineIdentity.java
new file mode 100644
index 0000000..9b3189b
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCombineIdentity.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.python.api.functions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.api.streaming.PythonStreamer;
+import org.apache.flink.util.Collector;
+import java.io.IOException;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+
+/**
+ * Multi-purpose class, used for Combine-operations using a python script, and as a preprocess step for
+ * GroupReduce-operations.
+ *
+ * @param <IN>
+ */
+public class PythonCombineIdentity<IN> extends RichGroupReduceFunction<IN, IN> {
+	private PythonStreamer streamer;
+
+	public PythonCombineIdentity() {
+		streamer = null;
+	}
+
+	public PythonCombineIdentity(int id) {
+		streamer = new PythonStreamer(this, id);
+	}
+
+	@Override
+	public void open(Configuration config) throws IOException {
+		if (streamer != null) {
+			streamer.open();
+			streamer.sendBroadCastVariables(config);
+		}
+	}
+
+	/**
+	 * Calls the external python function.
+	 *
+	 * @param values function input
+	 * @param out collector
+	 * @throws IOException
+	 */
+	@Override
+	public final void reduce(Iterable<IN> values, Collector<IN> out) throws Exception {
+		for (IN value : values) {
+			out.collect(value);
+		}
+	}
+
+	/**
+	 * Calls the external python function.
+	 *
+	 * @param values function input
+	 * @param out collector
+	 * @throws IOException
+	 */
+	@Override
+	public final void combine(Iterable<IN> values, Collector<IN> out) throws Exception {
+		streamer.streamBufferWithoutGroups(values.iterator(), out);
+	}
+
+	@Override
+	public void close() throws IOException {
+		if (streamer != null) {
+			streamer.close();
+			streamer = null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
new file mode 100644
index 0000000..1578e18
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.python.api.functions;
+
+import java.io.IOException;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.api.streaming.PythonStreamer;
+import org.apache.flink.util.Collector;
+
+/**
+ * Multi-purpose class, usable by all operations using a python script with one input source and possibly differing
+ * in-/output types.
+ *
+ * @param <IN>
+ * @param <OUT>
+ */
+public class PythonMapPartition<IN, OUT> extends RichMapPartitionFunction<IN, OUT> implements ResultTypeQueryable {
+	private final PythonStreamer streamer;
+	private transient final TypeInformation<OUT> typeInformation;
+
+	public PythonMapPartition(int id, TypeInformation<OUT> typeInformation) {
+		this.typeInformation = typeInformation;
+		streamer = new PythonStreamer(this, id);
+	}
+
+	/**
+	 * Opens this function.
+	 *
+	 * @param config configuration
+	 * @throws IOException
+	 */
+	@Override
+	public void open(Configuration config) throws IOException {
+		streamer.open();
+		streamer.sendBroadCastVariables(config);
+	}
+
+	@Override
+	public void mapPartition(Iterable<IN> values, Collector<OUT> out) throws Exception {
+		streamer.streamBufferWithoutGroups(values.iterator(), out);
+	}
+
+	/**
+	 * Closes this function.
+	 *
+	 * @throws IOException
+	 */
+	@Override
+	public void close() throws IOException {
+		streamer.close();
+	}
+
+	@Override
+	public TypeInformation<OUT> getProducedType() {
+		return typeInformation;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/PythonStreamer.java
new file mode 100644
index 0000000..9e4f479
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/PythonStreamer.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.python.api.streaming;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.util.Iterator;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.api.PythonPlanBinder;
+import static org.apache.flink.python.api.PythonPlanBinder.DEBUG;
+import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
+import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
+import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_DC_ID;
+import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_PLAN_NAME;
+import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
+import static org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_COUNT;
+import static org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_NAME_PREFIX;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This streamer is used by functions to send/receive data to/from an external python process.
+ */
+public class PythonStreamer implements Serializable {
+	protected static final Logger LOG = LoggerFactory.getLogger(PythonStreamer.class);
+	private static final int SIGNAL_BUFFER_REQUEST = 0;
+	private static final int SIGNAL_BUFFER_REQUEST_G0 = -3;
+	private static final int SIGNAL_BUFFER_REQUEST_G1 = -4;
+	private static final int SIGNAL_FINISHED = -1;
+	private static final int SIGNAL_ERROR = -2;
+	private static final byte SIGNAL_LAST = 32;
+
+	private final int id;
+	private final boolean usePython3;
+	private final boolean debug;
+	private final String planArguments;
+
+	private String inputFilePath;
+	private String outputFilePath;
+
+	private final byte[] buffer = new byte[4];
+
+	private Process process;
+	private Thread shutdownThread;
+	protected ServerSocket server;
+	protected Socket socket;
+	protected InputStream in;
+	protected OutputStream out;
+	protected int port;
+
+	protected Sender sender;
+	protected Receiver receiver;
+
+	protected StringBuilder msg = new StringBuilder();
+
+	protected final AbstractRichFunction function;
+
+	public PythonStreamer(AbstractRichFunction function, int id) {
+		this.id = id;
+		this.usePython3 = PythonPlanBinder.usePython3;
+		this.debug = DEBUG;
+		planArguments = PythonPlanBinder.arguments.toString();
+		sender = new Sender(function);
+		receiver = new Receiver(function);
+		this.function = function;
+	}
+
+	/**
+	 * Starts the python script.
+	 *
+	 * @throws IOException
+	 */
+	public void open() throws IOException {
+		server = new ServerSocket(0);
+		startPython();
+	}
+
+	private void startPython() throws IOException {
+		this.outputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "output";
+		this.inputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "input";
+
+		sender.open(inputFilePath);
+		receiver.open(outputFilePath);
+
+		String path = function.getRuntimeContext().getDistributedCache().getFile(FLINK_PYTHON_DC_ID).getAbsolutePath();
+		String planPath = path + FLINK_PYTHON_PLAN_NAME;
+
+		String pythonBinaryPath = usePython3 ? FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH;
+
+		try {
+			Runtime.getRuntime().exec(pythonBinaryPath);
+		} catch (IOException ex) {
+			throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary.");
+		}
+
+		if (debug) {
+			socket.setSoTimeout(0);
+			LOG.info("Waiting for Python Process : " + function.getRuntimeContext().getTaskName()
+					+ " Run python " + planPath + planArguments);
+		} else {
+			process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + planArguments);
+			new StreamPrinter(process.getInputStream()).start();
+			new StreamPrinter(process.getErrorStream(), true, msg).start();
+		}
+
+		shutdownThread = new Thread() {
+			@Override
+			public void run() {
+				try {
+					destroyProcess();
+				} catch (IOException ex) {
+				}
+			}
+		};
+
+		Runtime.getRuntime().addShutdownHook(shutdownThread);
+
+		OutputStream processOutput = process.getOutputStream();
+		processOutput.write("operator\n".getBytes());
+		processOutput.write(("" + server.getLocalPort() + "\n").getBytes());
+		processOutput.write((id + "\n").getBytes());
+		processOutput.write((inputFilePath + "\n").getBytes());
+		processOutput.write((outputFilePath + "\n").getBytes());
+		processOutput.flush();
+
+		try { // wait a bit to catch syntax errors
+			Thread.sleep(2000);
+		} catch (InterruptedException ex) {
+		}
+		if (!debug) {
+			try {
+				process.exitValue();
+				throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg);
+			} catch (IllegalThreadStateException ise) { //process still active -> start receiving data
+			}
+		}
+
+		socket = server.accept();
+		in = socket.getInputStream();
+		out = socket.getOutputStream();
+	}
+
+	/**
+	 * Closes this streamer.
+	 *
+	 * @throws IOException
+	 */
+	public void close() throws IOException {
+		try {
+		socket.close();
+		sender.close();
+		receiver.close();
+		} catch (Exception e) {
+			LOG.error("Exception occurred while closing Streamer. :" + e.getMessage());
+		}
+		if (!debug) {
+			destroyProcess();
+		}
+		if (shutdownThread != null) {
+			Runtime.getRuntime().removeShutdownHook(shutdownThread);
+		}
+	}
+
+	private void destroyProcess() throws IOException {
+		try {
+			process.exitValue();
+		} catch (IllegalThreadStateException ise) { //process still active
+			if (process.getClass().getName().equals("java.lang.UNIXProcess")) {
+				int pid;
+				try {
+					Field f = process.getClass().getDeclaredField("pid");
+					f.setAccessible(true);
+					pid = f.getInt(process);
+				} catch (Throwable e) {
+					process.destroy();
+					return;
+				}
+				String[] args = new String[]{"kill", "-9", "" + pid};
+				Runtime.getRuntime().exec(args);
+			} else {
+				process.destroy();
+			}
+		}
+	}
+	
+		private void sendWriteNotification(int size, boolean hasNext) throws IOException {
+		byte[] tmp = new byte[5];
+		putInt(tmp, 0, size);
+		tmp[4] = hasNext ? 0 : SIGNAL_LAST;
+		out.write(tmp, 0, 5);
+		out.flush();
+	}
+
+	private void sendReadConfirmation() throws IOException {
+		out.write(new byte[1], 0, 1);
+		out.flush();
+	}
+
+	private void checkForError() {
+		if (getInt(buffer, 0) == -2) {
+			try { //wait before terminating to ensure that the complete error message is printed
+				Thread.sleep(2000);
+			} catch (InterruptedException ex) {
+			}
+			throw new RuntimeException(
+					"External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg);
+		}
+	}
+
+	/**
+	 * Sends all broadcast-variables encoded in the configuration to the external process.
+	 *
+	 * @param config configuration object containing broadcast-variable count and names
+	 * @throws IOException
+	 */
+	public final void sendBroadCastVariables(Configuration config) throws IOException {
+		try {
+			int broadcastCount = config.getInteger(PLANBINDER_CONFIG_BCVAR_COUNT, 0);
+
+			String[] names = new String[broadcastCount];
+
+			for (int x = 0; x < names.length; x++) {
+				names[x] = config.getString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + x, null);
+			}
+
+			in.read(buffer, 0, 4);
+			checkForError();
+			int size = sender.sendRecord(broadcastCount);
+			sendWriteNotification(size, false);
+
+			for (String name : names) {
+				Iterator bcv = function.getRuntimeContext().getBroadcastVariable(name).iterator();
+
+				in.read(buffer, 0, 4);
+				checkForError();
+				size = sender.sendRecord(name);
+				sendWriteNotification(size, false);
+
+				while (bcv.hasNext() || sender.hasRemaining(0)) {
+					in.read(buffer, 0, 4);
+					checkForError();
+					size = sender.sendBuffer(bcv, 0);
+					sendWriteNotification(size, bcv.hasNext() || sender.hasRemaining(0));
+				}
+				sender.reset();
+			}
+		} catch (SocketTimeoutException ste) {
+			throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
+		}
+	}
+
+	/**
+	 * Sends all values contained in the iterator to the external process and collects all results.
+	 *
+	 * @param i iterator
+	 * @param c collector
+	 * @throws IOException
+	 */
+	public final void streamBufferWithoutGroups(Iterator i, Collector c) throws IOException {
+		try {
+			int size;
+			if (i.hasNext()) {
+				while (true) {
+					in.read(buffer, 0, 4);
+					int sig = getInt(buffer, 0);
+					switch (sig) {
+						case SIGNAL_BUFFER_REQUEST:
+							if (i.hasNext() || sender.hasRemaining(0)) {
+								size = sender.sendBuffer(i, 0);
+								sendWriteNotification(size, sender.hasRemaining(0) || i.hasNext());
+							} else {
+								throw new RuntimeException("External process requested data even though none is available.");
+							}
+							break;
+						case SIGNAL_FINISHED:
+							return;
+						case SIGNAL_ERROR:
+							try { //wait before terminating to ensure that the complete error message is printed
+								Thread.sleep(2000);
+							} catch (InterruptedException ex) {
+							}
+							throw new RuntimeException(
+									"External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg);
+						default:
+							receiver.collectBuffer(c, sig);
+							sendReadConfirmation();
+							break;
+					}
+				}
+			}
+		} catch (SocketTimeoutException ste) {
+			throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
+		}
+	}
+
+	/**
+	 * Sends all values contained in both iterators to the external process and collects all results.
+	 *
+	 * @param i1 iterator
+	 * @param i2 iterator
+	 * @param c collector
+	 * @throws IOException
+	 */
+	public final void streamBufferWithGroups(Iterator i1, Iterator i2, Collector c) throws IOException {
+		try {
+			int size;
+			if (i1.hasNext() || i2.hasNext()) {
+				while (true) {
+					in.read(buffer, 0, 4);
+					int sig = getInt(buffer, 0);
+					switch (sig) {
+						case SIGNAL_BUFFER_REQUEST_G0:
+							if (i1.hasNext() || sender.hasRemaining(0)) {
+								size = sender.sendBuffer(i1, 0);
+								sendWriteNotification(size, sender.hasRemaining(0) || i1.hasNext());
+							}
+							break;
+						case SIGNAL_BUFFER_REQUEST_G1:
+							if (i2.hasNext() || sender.hasRemaining(1)) {
+								size = sender.sendBuffer(i2, 1);
+								sendWriteNotification(size, sender.hasRemaining(1) || i2.hasNext());
+							}
+							break;
+						case SIGNAL_FINISHED:
+							return;
+						case SIGNAL_ERROR:
+							try { //wait before terminating to ensure that the complete error message is printed
+								Thread.sleep(2000);
+							} catch (InterruptedException ex) {
+							}
+							throw new RuntimeException(
+									"External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg);
+						default:
+							receiver.collectBuffer(c, sig);
+							sendReadConfirmation();
+							break;
+					}
+				}
+			}
+		} catch (SocketTimeoutException ste) {
+			throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
+		}
+	}
+
+	protected final static int getInt(byte[] array, int offset) {
+		return (array[offset] << 24) | (array[offset + 1] & 0xff) << 16 | (array[offset + 2] & 0xff) << 8 | (array[offset + 3] & 0xff);
+	}
+
+	protected final static void putInt(byte[] array, int offset, int value) {
+		array[offset] = (byte) (value >> 24);
+		array[offset + 1] = (byte) (value >> 16);
+		array[offset + 2] = (byte) (value >> 8);
+		array[offset + 3] = (byte) (value);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Receiver.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Receiver.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Receiver.java
new file mode 100644
index 0000000..07698d3
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Receiver.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.python.api.streaming;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.io.Serializable;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
+import static org.apache.flink.python.api.PythonPlanBinder.MAPPED_FILE_SIZE;
+import static org.apache.flink.python.api.streaming.Sender.TYPE_BOOLEAN;
+import static org.apache.flink.python.api.streaming.Sender.TYPE_BYTE;
+import static org.apache.flink.python.api.streaming.Sender.TYPE_BYTES;
+import static org.apache.flink.python.api.streaming.Sender.TYPE_DOUBLE;
+import static org.apache.flink.python.api.streaming.Sender.TYPE_FLOAT;
+import static org.apache.flink.python.api.streaming.Sender.TYPE_INTEGER;
+import static org.apache.flink.python.api.streaming.Sender.TYPE_LONG;
+import static org.apache.flink.python.api.streaming.Sender.TYPE_NULL;
+import static org.apache.flink.python.api.streaming.Sender.TYPE_SHORT;
+import static org.apache.flink.python.api.streaming.Sender.TYPE_STRING;
+import static org.apache.flink.python.api.streaming.Sender.TYPE_TUPLE;
+import org.apache.flink.util.Collector;
+
+/**
+ * General-purpose class to read data from memory-mapped files.
+ */
+public class Receiver implements Serializable {
+	private static final long serialVersionUID = -2474088929850009968L;
+
+	private final AbstractRichFunction function;
+
+	private File inputFile;
+	private RandomAccessFile inputRAF;
+	private FileChannel inputChannel;
+	private MappedByteBuffer fileBuffer;
+
+	private Deserializer<?> deserializer = null;
+
+	public Receiver(AbstractRichFunction function) {
+		this.function = function;
+	}
+
+	//=====Setup========================================================================================================
+	public void open(String path) throws IOException {
+		setupMappedFile(path);
+	}
+
+	private void setupMappedFile(String inputFilePath) throws FileNotFoundException, IOException {
+		File x = new File(FLINK_TMP_DATA_DIR);
+		x.mkdirs();
+
+		inputFile = new File(inputFilePath);
+		if (inputFile.exists()) {
+			inputFile.delete();
+		}
+		inputFile.createNewFile();
+		inputRAF = new RandomAccessFile(inputFilePath, "rw");
+		inputRAF.setLength(MAPPED_FILE_SIZE);
+		inputRAF.seek(MAPPED_FILE_SIZE - 1);
+		inputRAF.writeByte(0);
+		inputRAF.seek(0);
+		inputChannel = inputRAF.getChannel();
+		fileBuffer = inputChannel.map(FileChannel.MapMode.READ_WRITE, 0, MAPPED_FILE_SIZE);
+	}
+
+	public void close() throws IOException {
+		closeMappedFile();
+	}
+
+	private void closeMappedFile() throws IOException {
+		inputChannel.close();
+		inputRAF.close();
+	}
+
+	//=====Record-API===================================================================================================
+	/**
+	 * Loads a buffer from the memory-mapped file. The records contained within the buffer can be accessed using
+	 * collectRecord(). These records do not necessarily have to be of the same type. This method requires external
+	 * synchronization.
+	 *
+	 * @throws IOException
+	 */
+	private void loadBuffer() throws IOException {
+		int count = 0;
+		while (fileBuffer.get(0) == 0 && count < 10) {
+			try {
+				Thread.sleep(1000);
+			} catch (InterruptedException ie) {
+			}
+			fileBuffer.load();
+			count++;
+		}
+		if (fileBuffer.get(0) == 0) {
+			throw new RuntimeException("External process not responding.");
+		}
+		fileBuffer.position(1);
+	}
+
+	/**
+	 * Returns a record from the buffer. Note: This method cannot be replaced with specific methods like readInt() or
+	 * similar. The PlanBinder requires a method that can return any kind of object.
+	 *
+	 * @return read record
+	 * @throws IOException
+	 */
+	public Object getRecord() throws IOException {
+		return getRecord(false);
+	}
+
+	/**
+	 * Returns a record from the buffer. Note: This method cannot be replaced with specific methods like readInt() or
+	 * similar. The PlanBinder requires a method that can return any kind of object.
+	 *
+	 * @param normalized flag indicating whether certain types should be normalized
+	 * @return read record
+	 * @throws IOException
+	 */
+	public Object getRecord(boolean normalized) throws IOException {
+		if (fileBuffer.position() == 0) {
+			loadBuffer();
+		}
+		return receiveField(normalized);
+	}
+
+	/**
+	 * Reads a single primitive value or tuple from the buffer.
+	 *
+	 * @return primitive value or tuple
+	 * @throws IOException
+	 */
+	private Object receiveField(boolean normalized) throws IOException {
+		byte type = fileBuffer.get();
+		switch (type) {
+			case TYPE_TUPLE:
+				int tupleSize = fileBuffer.get();
+				Tuple tuple = createTuple(tupleSize);
+				for (int x = 0; x < tupleSize; x++) {
+					tuple.setField(receiveField(normalized), x);
+				}
+				return tuple;
+			case TYPE_BOOLEAN:
+				return fileBuffer.get() == 1;
+			case TYPE_BYTE:
+				return fileBuffer.get();
+			case TYPE_SHORT:
+				if (normalized) {
+					return (int) fileBuffer.getShort();
+				} else {
+					return fileBuffer.getShort();
+				}
+			case TYPE_INTEGER:
+				return fileBuffer.getInt();
+			case TYPE_LONG:
+				if (normalized) {
+					return new Long(fileBuffer.getLong()).intValue();
+				} else {
+					return fileBuffer.getLong();
+				}
+			case TYPE_FLOAT:
+				if (normalized) {
+					return (double) fileBuffer.getFloat();
+				} else {
+					return fileBuffer.getFloat();
+				}
+			case TYPE_DOUBLE:
+				return fileBuffer.getDouble();
+			case TYPE_STRING:
+				int stringSize = fileBuffer.getInt();
+				byte[] buffer = new byte[stringSize];
+				fileBuffer.get(buffer);
+				return new String(buffer);
+			case TYPE_BYTES:
+				int bytessize = fileBuffer.getInt();
+				byte[] bytebuffer = new byte[bytessize];
+				fileBuffer.get(bytebuffer);
+				return bytebuffer;
+			case TYPE_NULL:
+				return null;
+			default:
+				throw new IllegalArgumentException("Unknown TypeID encountered: " + type);
+		}
+	}
+
+	//=====Buffered-API=================================================================================================
+	/**
+	 * Reads a buffer of the given size from the memory-mapped file, and collects all records contained. This method
+	 * assumes that all values in the buffer are of the same type. This method does NOT take care of synchronization.
+	 * The user must guarantee that the buffer was completely written before calling this method.
+	 *
+	 * @param c Collector to collect records
+	 * @param bufferSize size of the buffer
+	 * @throws IOException
+	 */
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	public void collectBuffer(Collector c, int bufferSize) throws IOException {
+		fileBuffer.position(0);
+
+		if (deserializer == null) {
+			byte type = fileBuffer.get();
+			deserializer = getDeserializer(type);
+		}
+		while (fileBuffer.position() < bufferSize) {
+			c.collect(deserializer.deserialize());
+		}
+	}
+
+	//=====Deserializer=================================================================================================
+	private Deserializer<?> getDeserializer(byte type) {
+		switch (type) {
+			case TYPE_TUPLE:
+				return new TupleDeserializer();
+			case TYPE_BOOLEAN:
+				return new BooleanDeserializer();
+			case TYPE_BYTE:
+				return new ByteDeserializer();
+			case TYPE_BYTES:
+				return new BytesDeserializer();
+			case TYPE_SHORT:
+				return new ShortDeserializer();
+			case TYPE_INTEGER:
+				return new IntDeserializer();
+			case TYPE_LONG:
+				return new LongDeserializer();
+			case TYPE_STRING:
+				return new StringDeserializer();
+			case TYPE_FLOAT:
+				return new FloatDeserializer();
+			case TYPE_DOUBLE:
+				return new DoubleDeserializer();
+			case TYPE_NULL:
+				return new NullDeserializer();
+			default:
+				throw new IllegalArgumentException("Unknown TypeID encountered: " + type);
+
+		}
+	}
+
+	private interface Deserializer<T> {
+		public T deserialize();
+
+	}
+
+	private class BooleanDeserializer implements Deserializer<Boolean> {
+		@Override
+		public Boolean deserialize() {
+			return fileBuffer.get() == 1;
+		}
+	}
+
+	private class ByteDeserializer implements Deserializer<Byte> {
+		@Override
+		public Byte deserialize() {
+			return fileBuffer.get();
+		}
+	}
+
+	private class ShortDeserializer implements Deserializer<Short> {
+		@Override
+		public Short deserialize() {
+			return fileBuffer.getShort();
+		}
+	}
+
+	private class IntDeserializer implements Deserializer<Integer> {
+		@Override
+		public Integer deserialize() {
+			return fileBuffer.getInt();
+		}
+	}
+
+	private class LongDeserializer implements Deserializer<Long> {
+		@Override
+		public Long deserialize() {
+			return fileBuffer.getLong();
+		}
+	}
+
+	private class FloatDeserializer implements Deserializer<Float> {
+		@Override
+		public Float deserialize() {
+			return fileBuffer.getFloat();
+		}
+	}
+
+	private class DoubleDeserializer implements Deserializer<Double> {
+		@Override
+		public Double deserialize() {
+			return fileBuffer.getDouble();
+		}
+	}
+
+	private class StringDeserializer implements Deserializer<String> {
+		private int size;
+
+		@Override
+		public String deserialize() {
+			size = fileBuffer.getInt();
+			byte[] buffer = new byte[size];
+			fileBuffer.get(buffer);
+			return new String(buffer);
+		}
+	}
+
+	private class NullDeserializer implements Deserializer<Object> {
+		@Override
+		public Object deserialize() {
+			return null;
+		}
+	}
+
+	private class BytesDeserializer implements Deserializer<byte[]> {
+		@Override
+		public byte[] deserialize() {
+			int length = fileBuffer.getInt();
+			byte[] result = new byte[length];
+			fileBuffer.get(result);
+			return result;
+		}
+
+	}
+
+	private class TupleDeserializer implements Deserializer<Tuple> {
+		Deserializer<?>[] deserializer = null;
+		Tuple reuse;
+
+		public TupleDeserializer() {
+			int size = fileBuffer.getInt();
+			reuse = createTuple(size);
+			deserializer = new Deserializer[size];
+			for (int x = 0; x < deserializer.length; x++) {
+				deserializer[x] = getDeserializer(fileBuffer.get());
+			}
+		}
+
+		@Override
+		public Tuple deserialize() {
+			for (int x = 0; x < deserializer.length; x++) {
+				reuse.setField(deserializer[x].deserialize(), x);
+			}
+			return reuse;
+		}
+	}
+
+	public static Tuple createTuple(int size) {
+		try {
+			return Tuple.getTupleClass(size).newInstance();
+		} catch (InstantiationException e) {
+			throw new RuntimeException(e);
+		} catch (IllegalAccessException e) {
+			throw new RuntimeException(e);
+		}
+	}
+}


Mime
View raw message