flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [2/3] flink git commit: [FLINK-3057] Bidrectional plan connection
Date Wed, 20 Jan 2016 06:25:41 GMT
[FLINK-3057] Bidrectional plan connection


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc4d9469
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc4d9469
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc4d9469

Branch: refs/heads/master
Commit: fc4d9469d9d3cf17f877b2c49f345ed0001322c1
Parents: 0ac5d40
Author: zentol <chesnay@apache.org>
Authored: Sun Nov 22 16:23:29 2015 +0100
Committer: zentol <s.motsu@web.de>
Committed: Wed Jan 20 06:03:38 2016 +0100

----------------------------------------------------------------------
 .../flink/python/api/PythonOperationInfo.java   | 126 +++---
 .../flink/python/api/PythonPlanBinder.java      |  69 +--
 .../python/api/functions/PythonCoGroup.java     |   2 +-
 .../api/functions/PythonCombineIdentity.java    |   2 +-
 .../api/functions/PythonMapPartition.java       |   2 +-
 .../python/api/streaming/PythonStreamer.java    | 374 ----------------
 .../flink/python/api/streaming/Receiver.java    | 384 -----------------
 .../flink/python/api/streaming/Sender.java      | 427 -------------------
 .../python/api/streaming/StreamPrinter.java     |  55 ---
 .../api/streaming/data/PythonReceiver.java      | 267 ++++++++++++
 .../python/api/streaming/data/PythonSender.java | 420 ++++++++++++++++++
 .../api/streaming/data/PythonStreamer.java      | 375 ++++++++++++++++
 .../api/streaming/plan/PythonPlanReceiver.java  | 107 +++++
 .../api/streaming/plan/PythonPlanSender.java    | 116 +++++
 .../api/streaming/plan/PythonPlanStreamer.java  |  98 +++++
 .../api/streaming/util/StreamPrinter.java       |  55 +++
 .../python/api/flink/connection/Connection.py   |  30 +-
 .../python/api/flink/connection/Iterator.py     |  40 ++
 .../flink/python/api/flink/plan/Environment.py  |  22 +-
 19 files changed, 1588 insertions(+), 1383 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
index 0ccf568..5cf3621 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
@@ -21,7 +21,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.python.api.PythonPlanBinder.Operation;
-import org.apache.flink.python.api.streaming.Receiver;
+import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
 
 public class PythonOperationInfo {
 	public int parentID; //DataSet that an operation is applied on
@@ -48,88 +48,88 @@ public class PythonOperationInfo {
 	public boolean toError;
 	public String name;
 
-	public PythonOperationInfo(Receiver receiver, Operation identifier) throws IOException {
+	public PythonOperationInfo(PythonPlanStreamer streamer, Operation identifier) throws IOException {
 		Object tmpType;
 		switch (identifier) {
 			case SOURCE_CSV:
-				setID = (Integer) receiver.getRecord(true);
-				path = (String) receiver.getRecord();
-				fieldDelimiter = (String) receiver.getRecord();
-				lineDelimiter = (String) receiver.getRecord();
-				tmpType = (Tuple) receiver.getRecord();
+				setID = (Integer) streamer.getRecord(true);
+				path = (String) streamer.getRecord();
+				fieldDelimiter = (String) streamer.getRecord();
+				lineDelimiter = (String) streamer.getRecord();
+				tmpType = (Tuple) streamer.getRecord();
 				types = tmpType == null ? null : getForObject(tmpType);
 				return;
 			case SOURCE_TEXT:
-				setID = (Integer) receiver.getRecord(true);
-				path = (String) receiver.getRecord();
+				setID = (Integer) streamer.getRecord(true);
+				path = (String) streamer.getRecord();
 				return;
 			case SOURCE_VALUE:
-				setID = (Integer) receiver.getRecord(true);
-				int valueCount = (Integer) receiver.getRecord(true);
+				setID = (Integer) streamer.getRecord(true);
+				int valueCount = (Integer) streamer.getRecord(true);
 				values = new Object[valueCount];
 				for (int x = 0; x < valueCount; x++) {
-					values[x] = receiver.getRecord();
+					values[x] = streamer.getRecord();
 				}
 				return;
 			case SOURCE_SEQ:
-				setID = (Integer) receiver.getRecord(true);
-				from = (Long) receiver.getRecord();
-				to = (Long) receiver.getRecord();
+				setID = (Integer) streamer.getRecord(true);
+				from = (Long) streamer.getRecord();
+				to = (Long) streamer.getRecord();
 				return;
 			case SINK_CSV:
-				parentID = (Integer) receiver.getRecord(true);
-				path = (String) receiver.getRecord();
-				fieldDelimiter = (String) receiver.getRecord();
-				lineDelimiter = (String) receiver.getRecord();
-				writeMode = ((Integer) receiver.getRecord(true)) == 1
+				parentID = (Integer) streamer.getRecord(true);
+				path = (String) streamer.getRecord();
+				fieldDelimiter = (String) streamer.getRecord();
+				lineDelimiter = (String) streamer.getRecord();
+				writeMode = ((Integer) streamer.getRecord(true)) == 1
 						? WriteMode.OVERWRITE
 						: WriteMode.NO_OVERWRITE;
 				return;
 			case SINK_TEXT:
-				parentID = (Integer) receiver.getRecord(true);
-				path = (String) receiver.getRecord();
-				writeMode = ((Integer) receiver.getRecord(true)) == 1
+				parentID = (Integer) streamer.getRecord(true);
+				path = (String) streamer.getRecord();
+				writeMode = ((Integer) streamer.getRecord(true)) == 1
 						? WriteMode.OVERWRITE
 						: WriteMode.NO_OVERWRITE;
 				return;
 			case SINK_PRINT:
-				parentID = (Integer) receiver.getRecord(true);
-				toError = (Boolean) receiver.getRecord();
+				parentID = (Integer) streamer.getRecord(true);
+				toError = (Boolean) streamer.getRecord();
 				return;
 			case BROADCAST:
-				parentID = (Integer) receiver.getRecord(true);
-				otherID = (Integer) receiver.getRecord(true);
-				name = (String) receiver.getRecord();
+				parentID = (Integer) streamer.getRecord(true);
+				otherID = (Integer) streamer.getRecord(true);
+				name = (String) streamer.getRecord();
 				return;
 		}
-		setID = (Integer) receiver.getRecord(true);
-		parentID = (Integer) receiver.getRecord(true);
+		setID = (Integer) streamer.getRecord(true);
+		parentID = (Integer) streamer.getRecord(true);
 		switch (identifier) {
 			case AGGREGATE:
-				count = (Integer) receiver.getRecord(true);
+				count = (Integer) streamer.getRecord(true);
 				aggregates = new AggregationEntry[count];
 				for (int x = 0; x < count; x++) {
-					int encodedAgg = (Integer) receiver.getRecord(true);
-					int field = (Integer) receiver.getRecord(true);
+					int encodedAgg = (Integer) streamer.getRecord(true);
+					int field = (Integer) streamer.getRecord(true);
 					aggregates[x] = new AggregationEntry(encodedAgg, field);
 				}
 				return;
 			case FIRST:
-				count = (Integer) receiver.getRecord(true);
+				count = (Integer) streamer.getRecord(true);
 				return;
 			case DISTINCT:
 			case GROUPBY:
 			case PARTITION_HASH:
-				keys = normalizeKeys(receiver.getRecord(true));
+				keys = normalizeKeys(streamer.getRecord(true));
 				return;
 			case PROJECTION:
-				fields = toIntArray(receiver.getRecord(true));
+				fields = toIntArray(streamer.getRecord(true));
 				return;
 			case REBALANCE:
 				return;
 			case SORT:
-				field = (Integer) receiver.getRecord(true);
-				int encodedOrder = (Integer) receiver.getRecord(true);
+				field = (Integer) streamer.getRecord(true);
+				int encodedOrder = (Integer) streamer.getRecord(true);
 				switch (encodedOrder) {
 					case 0:
 						order = Order.NONE;
@@ -149,62 +149,62 @@ public class PythonOperationInfo {
 				}
 				return;
 			case UNION:
-				otherID = (Integer) receiver.getRecord(true);
+				otherID = (Integer) streamer.getRecord(true);
 				return;
 			case COGROUP:
-				otherID = (Integer) receiver.getRecord(true);
-				keys1 = normalizeKeys(receiver.getRecord(true));
-				keys2 = normalizeKeys(receiver.getRecord(true));
-				tmpType = receiver.getRecord();
+				otherID = (Integer) streamer.getRecord(true);
+				keys1 = normalizeKeys(streamer.getRecord(true));
+				keys2 = normalizeKeys(streamer.getRecord(true));
+				tmpType = streamer.getRecord();
 				types = tmpType == null ? null : getForObject(tmpType);
-				name = (String) receiver.getRecord();
+				name = (String) streamer.getRecord();
 				return;
 			case CROSS:
 			case CROSS_H:
 			case CROSS_T:
-				otherID = (Integer) receiver.getRecord(true);
-				tmpType = receiver.getRecord();
+				otherID = (Integer) streamer.getRecord(true);
+				tmpType = streamer.getRecord();
 				types = tmpType == null ? null : getForObject(tmpType);
-				int cProjectCount = (Integer) receiver.getRecord(true);
+				int cProjectCount = (Integer) streamer.getRecord(true);
 				projections = new ProjectionEntry[cProjectCount];
 				for (int x = 0; x < cProjectCount; x++) {
-					String side = (String) receiver.getRecord();
-					int[] keys = toIntArray((Tuple) receiver.getRecord(true));
+					String side = (String) streamer.getRecord();
+					int[] keys = toIntArray((Tuple) streamer.getRecord(true));
 					projections[x] = new ProjectionEntry(ProjectionSide.valueOf(side.toUpperCase()), keys);
 				}
-				name = (String) receiver.getRecord();
+				name = (String) streamer.getRecord();
 				return;
 			case REDUCE:
 			case GROUPREDUCE:
-				tmpType = receiver.getRecord();
+				tmpType = streamer.getRecord();
 				types = tmpType == null ? null : getForObject(tmpType);
-				combine = (Boolean) receiver.getRecord();
-				name = (String) receiver.getRecord();
+				combine = (Boolean) streamer.getRecord();
+				name = (String) streamer.getRecord();
 				return;
 			case JOIN:
 			case JOIN_H:
 			case JOIN_T:
-				keys1 = normalizeKeys(receiver.getRecord(true));
-				keys2 = normalizeKeys(receiver.getRecord(true));
-				otherID = (Integer) receiver.getRecord(true);
-				tmpType = receiver.getRecord();
+				keys1 = normalizeKeys(streamer.getRecord(true));
+				keys2 = normalizeKeys(streamer.getRecord(true));
+				otherID = (Integer) streamer.getRecord(true);
+				tmpType = streamer.getRecord();
 				types = tmpType == null ? null : getForObject(tmpType);
-				int jProjectCount = (Integer) receiver.getRecord(true);
+				int jProjectCount = (Integer) streamer.getRecord(true);
 				projections = new ProjectionEntry[jProjectCount];
 				for (int x = 0; x < jProjectCount; x++) {
-					String side = (String) receiver.getRecord();
-					int[] keys = toIntArray((Tuple) receiver.getRecord(true));
+					String side = (String) streamer.getRecord();
+					int[] keys = toIntArray((Tuple) streamer.getRecord(true));
 					projections[x] = new ProjectionEntry(ProjectionSide.valueOf(side.toUpperCase()), keys);
 				}
-				name = (String) receiver.getRecord();
+				name = (String) streamer.getRecord();
 				return;
 			case MAPPARTITION:
 			case FLATMAP:
 			case MAP:
 			case FILTER:
-				tmpType = receiver.getRecord();
+				tmpType = streamer.getRecord();
 				types = tmpType == null ? null : getForObject(tmpType);
-				name = (String) receiver.getRecord();
+				name = (String) streamer.getRecord();
 				return;
 			default:
 				throw new UnsupportedOperationException("This operation is not implemented in the Python API: " + identifier);

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 7c74054..d178dcb 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -19,6 +19,7 @@ import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Random;
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.LocalEnvironment;
@@ -49,8 +50,7 @@ import org.apache.flink.python.api.PythonOperationInfo.ProjectionEntry;
 import org.apache.flink.python.api.functions.PythonCoGroup;
 import org.apache.flink.python.api.functions.PythonCombineIdentity;
 import org.apache.flink.python.api.functions.PythonMapPartition;
-import org.apache.flink.python.api.streaming.Receiver;
-import org.apache.flink.python.api.streaming.StreamPrinter;
+import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,15 +76,13 @@ public class PythonPlanBinder {
 
 	private static final Random r = new Random();
 
-	private static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "flink_plan";
+	public static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "flink_plan";
 	private static final String FLINK_PYTHON_REL_LOCAL_PATH = File.separator + "resources" + File.separator + "python";
 	private static final String FLINK_DIR = System.getenv("FLINK_ROOT_DIR");
 	private static String FULL_PATH;
 
 	public static StringBuilder arguments = new StringBuilder();
 
-	private Process process;
-
 	public static boolean usePython3 = false;
 
 	private static String FLINK_HDFS_PATH = "hdfs:/tmp";
@@ -94,7 +92,7 @@ public class PythonPlanBinder {
 
 	private HashMap<Integer, Object> sets = new HashMap();
 	public ExecutionEnvironment env;
-	private Receiver receiver;
+	private PythonPlanStreamer streamer;
 
 	public static final int MAPPED_FILE_SIZE = 1024 * 1024 * 64;
 
@@ -145,7 +143,8 @@ public class PythonPlanBinder {
 			}
 
 			distributeFiles(tmpPath, env);
-			env.execute();
+			JobExecutionResult jer = env.execute();
+			sendResult(jer);
 			close();
 		} catch (Exception e) {
 			close();
@@ -205,41 +204,13 @@ public class PythonPlanBinder {
 		for (String arg : args) {
 			arguments.append(" ").append(arg);
 		}
-		String mappedFilePath = FLINK_TMP_DATA_DIR + "/output" + r.nextInt();
-		receiver = new Receiver(null);
-		receiver.open(mappedFilePath);
-
-		String pythonBinaryPath = usePython3 ? FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH;
-
-		try {
-			Runtime.getRuntime().exec(pythonBinaryPath);
-		} catch (IOException ex) {
-			throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary.");
-		}
-		process = Runtime.getRuntime().exec(pythonBinaryPath + " -B " + tempPath + FLINK_PYTHON_PLAN_NAME + arguments.toString());
-
-		new StreamPrinter(process.getInputStream()).start();
-		new StreamPrinter(process.getErrorStream()).start();
-
-		try {
-			Thread.sleep(2000);
-		} catch (InterruptedException ex) {
-		}
-
-		try {
-			int value = process.exitValue();
-			if (value != 0) {
-				throw new RuntimeException("Plan file caused an error. Check log-files for details.");
-			}
-			if (value == 0) {
-				throw new RuntimeException("Plan file exited prematurely without an error.");
-			}
-		} catch (IllegalThreadStateException ise) {//Process still running
-		}
+		streamer = new PythonPlanStreamer();
+		streamer.open(tempPath, arguments.toString());
+	}
 
-		process.getOutputStream().write("plan\n".getBytes());
-		process.getOutputStream().write((mappedFilePath + "\n").getBytes());
-		process.getOutputStream().flush();
+	private void sendResult(JobExecutionResult jer) throws IOException {
+		long runtime = jer.getNetRuntime();
+		streamer.sendRecord(runtime);
 	}
 
 	private void close() {
@@ -252,18 +223,12 @@ public class PythonPlanBinder {
 			FileSystem local = FileSystem.getLocalFileSystem();
 			local.delete(new Path(FLINK_PYTHON_FILE_PATH), true);
 			local.delete(new Path(FLINK_TMP_DATA_DIR), true);
-			receiver.close();
+			streamer.close();
 		} catch (NullPointerException npe) {
 		} catch (IOException ioe) {
 			LOG.error("PythonAPI file cleanup failed. " + ioe.getMessage());
 		} catch (URISyntaxException use) { // can't occur
 		}
-		try {
-			process.exitValue();
-		} catch (NullPointerException npe) { //exception occurred before process was started
-		} catch (IllegalThreadStateException ise) { //process still active
-			process.destroy();
-		}
 	}
 
 	//====Plan==========================================================================================================
@@ -285,7 +250,7 @@ public class PythonPlanBinder {
 
 	private void receiveParameters() throws IOException {
 		for (int x = 0; x < 4; x++) {
-			Tuple value = (Tuple) receiver.getRecord(true);
+			Tuple value = (Tuple) streamer.getRecord(true);
 			switch (Parameters.valueOf(((String) value.getField(0)).toUpperCase())) {
 				case DOP:
 					Integer dop = (Integer) value.getField(1);
@@ -321,9 +286,9 @@ public class PythonPlanBinder {
 	}
 
 	private void receiveOperations() throws IOException {
-		Integer operationCount = (Integer) receiver.getRecord(true);
+		Integer operationCount = (Integer) streamer.getRecord(true);
 		for (int x = 0; x < operationCount; x++) {
-			String identifier = (String) receiver.getRecord();
+			String identifier = (String) streamer.getRecord();
 			Operation op = null;
 			try {
 				op = Operation.valueOf(identifier.toUpperCase());
@@ -435,7 +400,7 @@ public class PythonPlanBinder {
 	 * @throws IOException
 	 */
 	private PythonOperationInfo createOperationInfo(Operation operationIdentifier) throws IOException {
-		return new PythonOperationInfo(receiver, operationIdentifier);
+		return new PythonOperationInfo(streamer, operationIdentifier);
 	}
 
 	private void createCsvSource(PythonOperationInfo info) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
index 4d74878..2349aa9 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
@@ -14,7 +14,7 @@ package org.apache.flink.python.api.functions;
 
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.api.streaming.PythonStreamer;
+import org.apache.flink.python.api.streaming.data.PythonStreamer;
 import org.apache.flink.util.Collector;
 import java.io.IOException;
 import org.apache.flink.api.common.functions.RichCoGroupFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCombineIdentity.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCombineIdentity.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCombineIdentity.java
index 9b3189b..f80d975 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCombineIdentity.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCombineIdentity.java
@@ -13,7 +13,7 @@
 package org.apache.flink.python.api.functions;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.api.streaming.PythonStreamer;
+import org.apache.flink.python.api.streaming.data.PythonStreamer;
 import org.apache.flink.util.Collector;
 import java.io.IOException;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
index 1578e18..50b2cf4 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
@@ -17,7 +17,7 @@ import org.apache.flink.api.common.functions.RichMapPartitionFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.api.streaming.PythonStreamer;
+import org.apache.flink.python.api.streaming.data.PythonStreamer;
 import org.apache.flink.util.Collector;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/PythonStreamer.java
deleted file mode 100644
index 9e4f479..0000000
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/PythonStreamer.java
+++ /dev/null
@@ -1,374 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.python.api.streaming;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import java.util.Iterator;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.api.PythonPlanBinder;
-import static org.apache.flink.python.api.PythonPlanBinder.DEBUG;
-import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
-import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
-import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_DC_ID;
-import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_PLAN_NAME;
-import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
-import static org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_COUNT;
-import static org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_NAME_PREFIX;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This streamer is used by functions to send/receive data to/from an external python process.
- */
-public class PythonStreamer implements Serializable {
-	protected static final Logger LOG = LoggerFactory.getLogger(PythonStreamer.class);
-	private static final int SIGNAL_BUFFER_REQUEST = 0;
-	private static final int SIGNAL_BUFFER_REQUEST_G0 = -3;
-	private static final int SIGNAL_BUFFER_REQUEST_G1 = -4;
-	private static final int SIGNAL_FINISHED = -1;
-	private static final int SIGNAL_ERROR = -2;
-	private static final byte SIGNAL_LAST = 32;
-
-	private final int id;
-	private final boolean usePython3;
-	private final boolean debug;
-	private final String planArguments;
-
-	private String inputFilePath;
-	private String outputFilePath;
-
-	private final byte[] buffer = new byte[4];
-
-	private Process process;
-	private Thread shutdownThread;
-	protected ServerSocket server;
-	protected Socket socket;
-	protected InputStream in;
-	protected OutputStream out;
-	protected int port;
-
-	protected Sender sender;
-	protected Receiver receiver;
-
-	protected StringBuilder msg = new StringBuilder();
-
-	protected final AbstractRichFunction function;
-
-	public PythonStreamer(AbstractRichFunction function, int id) {
-		this.id = id;
-		this.usePython3 = PythonPlanBinder.usePython3;
-		this.debug = DEBUG;
-		planArguments = PythonPlanBinder.arguments.toString();
-		sender = new Sender(function);
-		receiver = new Receiver(function);
-		this.function = function;
-	}
-
-	/**
-	 * Starts the python script.
-	 *
-	 * @throws IOException
-	 */
-	public void open() throws IOException {
-		server = new ServerSocket(0);
-		startPython();
-	}
-
-	private void startPython() throws IOException {
-		this.outputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "output";
-		this.inputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "input";
-
-		sender.open(inputFilePath);
-		receiver.open(outputFilePath);
-
-		String path = function.getRuntimeContext().getDistributedCache().getFile(FLINK_PYTHON_DC_ID).getAbsolutePath();
-		String planPath = path + FLINK_PYTHON_PLAN_NAME;
-
-		String pythonBinaryPath = usePython3 ? FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH;
-
-		try {
-			Runtime.getRuntime().exec(pythonBinaryPath);
-		} catch (IOException ex) {
-			throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary.");
-		}
-
-		if (debug) {
-			socket.setSoTimeout(0);
-			LOG.info("Waiting for Python Process : " + function.getRuntimeContext().getTaskName()
-					+ " Run python " + planPath + planArguments);
-		} else {
-			process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + planArguments);
-			new StreamPrinter(process.getInputStream()).start();
-			new StreamPrinter(process.getErrorStream(), true, msg).start();
-		}
-
-		shutdownThread = new Thread() {
-			@Override
-			public void run() {
-				try {
-					destroyProcess();
-				} catch (IOException ex) {
-				}
-			}
-		};
-
-		Runtime.getRuntime().addShutdownHook(shutdownThread);
-
-		OutputStream processOutput = process.getOutputStream();
-		processOutput.write("operator\n".getBytes());
-		processOutput.write(("" + server.getLocalPort() + "\n").getBytes());
-		processOutput.write((id + "\n").getBytes());
-		processOutput.write((inputFilePath + "\n").getBytes());
-		processOutput.write((outputFilePath + "\n").getBytes());
-		processOutput.flush();
-
-		try { // wait a bit to catch syntax errors
-			Thread.sleep(2000);
-		} catch (InterruptedException ex) {
-		}
-		if (!debug) {
-			try {
-				process.exitValue();
-				throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg);
-			} catch (IllegalThreadStateException ise) { //process still active -> start receiving data
-			}
-		}
-
-		socket = server.accept();
-		in = socket.getInputStream();
-		out = socket.getOutputStream();
-	}
-
-	/**
-	 * Closes this streamer.
-	 *
-	 * @throws IOException
-	 */
-	public void close() throws IOException {
-		try {
-		socket.close();
-		sender.close();
-		receiver.close();
-		} catch (Exception e) {
-			LOG.error("Exception occurred while closing Streamer. :" + e.getMessage());
-		}
-		if (!debug) {
-			destroyProcess();
-		}
-		if (shutdownThread != null) {
-			Runtime.getRuntime().removeShutdownHook(shutdownThread);
-		}
-	}
-
-	private void destroyProcess() throws IOException {
-		try {
-			process.exitValue();
-		} catch (IllegalThreadStateException ise) { //process still active
-			if (process.getClass().getName().equals("java.lang.UNIXProcess")) {
-				int pid;
-				try {
-					Field f = process.getClass().getDeclaredField("pid");
-					f.setAccessible(true);
-					pid = f.getInt(process);
-				} catch (Throwable e) {
-					process.destroy();
-					return;
-				}
-				String[] args = new String[]{"kill", "-9", "" + pid};
-				Runtime.getRuntime().exec(args);
-			} else {
-				process.destroy();
-			}
-		}
-	}
-	
-		private void sendWriteNotification(int size, boolean hasNext) throws IOException {
-		byte[] tmp = new byte[5];
-		putInt(tmp, 0, size);
-		tmp[4] = hasNext ? 0 : SIGNAL_LAST;
-		out.write(tmp, 0, 5);
-		out.flush();
-	}
-
-	private void sendReadConfirmation() throws IOException {
-		out.write(new byte[1], 0, 1);
-		out.flush();
-	}
-
-	private void checkForError() {
-		if (getInt(buffer, 0) == -2) {
-			try { //wait before terminating to ensure that the complete error message is printed
-				Thread.sleep(2000);
-			} catch (InterruptedException ex) {
-			}
-			throw new RuntimeException(
-					"External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg);
-		}
-	}
-
-	/**
-	 * Sends all broadcast-variables encoded in the configuration to the external process.
-	 *
-	 * @param config configuration object containing broadcast-variable count and names
-	 * @throws IOException
-	 */
-	public final void sendBroadCastVariables(Configuration config) throws IOException {
-		try {
-			int broadcastCount = config.getInteger(PLANBINDER_CONFIG_BCVAR_COUNT, 0);
-
-			String[] names = new String[broadcastCount];
-
-			for (int x = 0; x < names.length; x++) {
-				names[x] = config.getString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + x, null);
-			}
-
-			in.read(buffer, 0, 4);
-			checkForError();
-			int size = sender.sendRecord(broadcastCount);
-			sendWriteNotification(size, false);
-
-			for (String name : names) {
-				Iterator bcv = function.getRuntimeContext().getBroadcastVariable(name).iterator();
-
-				in.read(buffer, 0, 4);
-				checkForError();
-				size = sender.sendRecord(name);
-				sendWriteNotification(size, false);
-
-				while (bcv.hasNext() || sender.hasRemaining(0)) {
-					in.read(buffer, 0, 4);
-					checkForError();
-					size = sender.sendBuffer(bcv, 0);
-					sendWriteNotification(size, bcv.hasNext() || sender.hasRemaining(0));
-				}
-				sender.reset();
-			}
-		} catch (SocketTimeoutException ste) {
-			throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
-		}
-	}
-
-	/**
-	 * Sends all values contained in the iterator to the external process and collects all results.
-	 *
-	 * @param i iterator
-	 * @param c collector
-	 * @throws IOException
-	 */
-	public final void streamBufferWithoutGroups(Iterator i, Collector c) throws IOException {
-		try {
-			int size;
-			if (i.hasNext()) {
-				while (true) {
-					in.read(buffer, 0, 4);
-					int sig = getInt(buffer, 0);
-					switch (sig) {
-						case SIGNAL_BUFFER_REQUEST:
-							if (i.hasNext() || sender.hasRemaining(0)) {
-								size = sender.sendBuffer(i, 0);
-								sendWriteNotification(size, sender.hasRemaining(0) || i.hasNext());
-							} else {
-								throw new RuntimeException("External process requested data even though none is available.");
-							}
-							break;
-						case SIGNAL_FINISHED:
-							return;
-						case SIGNAL_ERROR:
-							try { //wait before terminating to ensure that the complete error message is printed
-								Thread.sleep(2000);
-							} catch (InterruptedException ex) {
-							}
-							throw new RuntimeException(
-									"External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg);
-						default:
-							receiver.collectBuffer(c, sig);
-							sendReadConfirmation();
-							break;
-					}
-				}
-			}
-		} catch (SocketTimeoutException ste) {
-			throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
-		}
-	}
-
-	/**
-	 * Sends all values contained in both iterators to the external process and collects all results.
-	 *
-	 * @param i1 iterator
-	 * @param i2 iterator
-	 * @param c collector
-	 * @throws IOException
-	 */
-	public final void streamBufferWithGroups(Iterator i1, Iterator i2, Collector c) throws IOException {
-		try {
-			int size;
-			if (i1.hasNext() || i2.hasNext()) {
-				while (true) {
-					in.read(buffer, 0, 4);
-					int sig = getInt(buffer, 0);
-					switch (sig) {
-						case SIGNAL_BUFFER_REQUEST_G0:
-							if (i1.hasNext() || sender.hasRemaining(0)) {
-								size = sender.sendBuffer(i1, 0);
-								sendWriteNotification(size, sender.hasRemaining(0) || i1.hasNext());
-							}
-							break;
-						case SIGNAL_BUFFER_REQUEST_G1:
-							if (i2.hasNext() || sender.hasRemaining(1)) {
-								size = sender.sendBuffer(i2, 1);
-								sendWriteNotification(size, sender.hasRemaining(1) || i2.hasNext());
-							}
-							break;
-						case SIGNAL_FINISHED:
-							return;
-						case SIGNAL_ERROR:
-							try { //wait before terminating to ensure that the complete error message is printed
-								Thread.sleep(2000);
-							} catch (InterruptedException ex) {
-							}
-							throw new RuntimeException(
-									"External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg);
-						default:
-							receiver.collectBuffer(c, sig);
-							sendReadConfirmation();
-							break;
-					}
-				}
-			}
-		} catch (SocketTimeoutException ste) {
-			throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
-		}
-	}
-
-	protected final static int getInt(byte[] array, int offset) {
-		return (array[offset] << 24) | (array[offset + 1] & 0xff) << 16 | (array[offset + 2] & 0xff) << 8 | (array[offset + 3] & 0xff);
-	}
-
-	protected final static void putInt(byte[] array, int offset, int value) {
-		array[offset] = (byte) (value >> 24);
-		array[offset + 1] = (byte) (value >> 16);
-		array[offset + 2] = (byte) (value >> 8);
-		array[offset + 3] = (byte) (value);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Receiver.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Receiver.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Receiver.java
deleted file mode 100644
index a706053..0000000
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Receiver.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.python.api.streaming;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.io.Serializable;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.java.tuple.Tuple;
-import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
-import static org.apache.flink.python.api.PythonPlanBinder.MAPPED_FILE_SIZE;
-import static org.apache.flink.python.api.streaming.Sender.TYPE_BOOLEAN;
-import static org.apache.flink.python.api.streaming.Sender.TYPE_BYTE;
-import static org.apache.flink.python.api.streaming.Sender.TYPE_BYTES;
-import static org.apache.flink.python.api.streaming.Sender.TYPE_DOUBLE;
-import static org.apache.flink.python.api.streaming.Sender.TYPE_FLOAT;
-import static org.apache.flink.python.api.streaming.Sender.TYPE_INTEGER;
-import static org.apache.flink.python.api.streaming.Sender.TYPE_LONG;
-import static org.apache.flink.python.api.streaming.Sender.TYPE_NULL;
-import static org.apache.flink.python.api.streaming.Sender.TYPE_SHORT;
-import static org.apache.flink.python.api.streaming.Sender.TYPE_STRING;
-import static org.apache.flink.python.api.streaming.Sender.TYPE_TUPLE;
-import org.apache.flink.python.api.types.CustomTypeWrapper;
-import org.apache.flink.util.Collector;
-
-/**
- * General-purpose class to read data from memory-mapped files.
- */
-public class Receiver implements Serializable {
-	private static final long serialVersionUID = -2474088929850009968L;
-
-	private final AbstractRichFunction function;
-
-	private File inputFile;
-	private RandomAccessFile inputRAF;
-	private FileChannel inputChannel;
-	private MappedByteBuffer fileBuffer;
-
-	private Deserializer<?> deserializer = null;
-
-	public Receiver(AbstractRichFunction function) {
-		this.function = function;
-	}
-
-	//=====Setup========================================================================================================
-	public void open(String path) throws IOException {
-		setupMappedFile(path);
-	}
-
-	private void setupMappedFile(String inputFilePath) throws FileNotFoundException, IOException {
-		File x = new File(FLINK_TMP_DATA_DIR);
-		x.mkdirs();
-
-		inputFile = new File(inputFilePath);
-		if (inputFile.exists()) {
-			inputFile.delete();
-		}
-		inputFile.createNewFile();
-		inputRAF = new RandomAccessFile(inputFilePath, "rw");
-		inputRAF.setLength(MAPPED_FILE_SIZE);
-		inputRAF.seek(MAPPED_FILE_SIZE - 1);
-		inputRAF.writeByte(0);
-		inputRAF.seek(0);
-		inputChannel = inputRAF.getChannel();
-		fileBuffer = inputChannel.map(FileChannel.MapMode.READ_WRITE, 0, MAPPED_FILE_SIZE);
-	}
-
-	public void close() throws IOException {
-		closeMappedFile();
-	}
-
-	private void closeMappedFile() throws IOException {
-		inputChannel.close();
-		inputRAF.close();
-	}
-
-	//=====Record-API===================================================================================================
-	/**
-	 * Loads a buffer from the memory-mapped file. The records contained within the buffer can be accessed using
-	 * collectRecord(). These records do not necessarily have to be of the same type. This method requires external
-	 * synchronization.
-	 *
-	 * @throws IOException
-	 */
-	private void loadBuffer() throws IOException {
-		int count = 0;
-		while (fileBuffer.get(0) == 0 && count < 10) {
-			try {
-				Thread.sleep(1000);
-			} catch (InterruptedException ie) {
-			}
-			fileBuffer.load();
-			count++;
-		}
-		if (fileBuffer.get(0) == 0) {
-			throw new RuntimeException("External process not responding.");
-		}
-		fileBuffer.position(1);
-	}
-
-	/**
-	 * Returns a record from the buffer. Note: This method cannot be replaced with specific methods like readInt() or
-	 * similar. The PlanBinder requires a method that can return any kind of object.
-	 *
-	 * @return read record
-	 * @throws IOException
-	 */
-	public Object getRecord() throws IOException {
-		return getRecord(false);
-	}
-
-	/**
-	 * Returns a record from the buffer. Note: This method cannot be replaced with specific methods like readInt() or
-	 * similar. The PlanBinder requires a method that can return any kind of object.
-	 *
-	 * @param normalized flag indicating whether certain types should be normalized
-	 * @return read record
-	 * @throws IOException
-	 */
-	public Object getRecord(boolean normalized) throws IOException {
-		if (fileBuffer.position() == 0) {
-			loadBuffer();
-		}
-		return receiveField(normalized);
-	}
-
-	/**
-	 * Reads a single primitive value or tuple from the buffer.
-	 *
-	 * @return primitive value or tuple
-	 * @throws IOException
-	 */
-	private Object receiveField(boolean normalized) throws IOException {
-		byte type = fileBuffer.get();
-		switch (type) {
-			case TYPE_TUPLE:
-				int tupleSize = fileBuffer.get();
-				Tuple tuple = createTuple(tupleSize);
-				for (int x = 0; x < tupleSize; x++) {
-					tuple.setField(receiveField(normalized), x);
-				}
-				return tuple;
-			case TYPE_BOOLEAN:
-				return fileBuffer.get() == 1;
-			case TYPE_BYTE:
-				return fileBuffer.get();
-			case TYPE_SHORT:
-				if (normalized) {
-					return (int) fileBuffer.getShort();
-				} else {
-					return fileBuffer.getShort();
-				}
-			case TYPE_INTEGER:
-				return fileBuffer.getInt();
-			case TYPE_LONG:
-				if (normalized) {
-					return new Long(fileBuffer.getLong()).intValue();
-				} else {
-					return fileBuffer.getLong();
-				}
-			case TYPE_FLOAT:
-				if (normalized) {
-					return (double) fileBuffer.getFloat();
-				} else {
-					return fileBuffer.getFloat();
-				}
-			case TYPE_DOUBLE:
-				return fileBuffer.getDouble();
-			case TYPE_STRING:
-				int stringSize = fileBuffer.getInt();
-				byte[] buffer = new byte[stringSize];
-				fileBuffer.get(buffer);
-				return new String(buffer);
-			case TYPE_BYTES:
-				int bytessize = fileBuffer.getInt();
-				byte[] bytebuffer = new byte[bytessize];
-				fileBuffer.get(bytebuffer);
-				return bytebuffer;
-			case TYPE_NULL:
-				return null;
-			default:
-				return new CustomTypeDeserializer(type).deserialize();
-		}
-	}
-
-	//=====Buffered-API=================================================================================================
-	/**
-	 * Reads a buffer of the given size from the memory-mapped file, and collects all records contained. This method
-	 * assumes that all values in the buffer are of the same type. This method does NOT take care of synchronization.
-	 * The user must guarantee that the buffer was completely written before calling this method.
-	 *
-	 * @param c Collector to collect records
-	 * @param bufferSize size of the buffer
-	 * @throws IOException
-	 */
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public void collectBuffer(Collector c, int bufferSize) throws IOException {
-		fileBuffer.position(0);
-
-		if (deserializer == null) {
-			byte type = fileBuffer.get();
-			deserializer = getDeserializer(type);
-		}
-		while (fileBuffer.position() < bufferSize) {
-			c.collect(deserializer.deserialize());
-		}
-	}
-
-	//=====Deserializer=================================================================================================
-	private Deserializer<?> getDeserializer(byte type) {
-		switch (type) {
-			case TYPE_TUPLE:
-				return new TupleDeserializer();
-			case TYPE_BOOLEAN:
-				return new BooleanDeserializer();
-			case TYPE_BYTE:
-				return new ByteDeserializer();
-			case TYPE_BYTES:
-				return new BytesDeserializer();
-			case TYPE_SHORT:
-				return new ShortDeserializer();
-			case TYPE_INTEGER:
-				return new IntDeserializer();
-			case TYPE_LONG:
-				return new LongDeserializer();
-			case TYPE_STRING:
-				return new StringDeserializer();
-			case TYPE_FLOAT:
-				return new FloatDeserializer();
-			case TYPE_DOUBLE:
-				return new DoubleDeserializer();
-			case TYPE_NULL:
-				return new NullDeserializer();
-			default:
-				return new CustomTypeDeserializer(type);
-
-		}
-	}
-
-	private interface Deserializer<T> {
-		public T deserialize();
-	}
-
-	private class CustomTypeDeserializer implements Deserializer<CustomTypeWrapper> {
-		private final byte type;
-
-		public CustomTypeDeserializer(byte type) {
-			this.type = type;
-		}
-
-		@Override
-		public CustomTypeWrapper deserialize() {
-			int size = fileBuffer.getInt();
-			byte[] data = new byte[size];
-			fileBuffer.get(data);
-			return new CustomTypeWrapper(type, data);
-		}
-	}
-
-	private class BooleanDeserializer implements Deserializer<Boolean> {
-		@Override
-		public Boolean deserialize() {
-			return fileBuffer.get() == 1;
-		}
-	}
-
-	private class ByteDeserializer implements Deserializer<Byte> {
-		@Override
-		public Byte deserialize() {
-			return fileBuffer.get();
-		}
-	}
-
-	private class ShortDeserializer implements Deserializer<Short> {
-		@Override
-		public Short deserialize() {
-			return fileBuffer.getShort();
-		}
-	}
-
-	private class IntDeserializer implements Deserializer<Integer> {
-		@Override
-		public Integer deserialize() {
-			return fileBuffer.getInt();
-		}
-	}
-
-	private class LongDeserializer implements Deserializer<Long> {
-		@Override
-		public Long deserialize() {
-			return fileBuffer.getLong();
-		}
-	}
-
-	private class FloatDeserializer implements Deserializer<Float> {
-		@Override
-		public Float deserialize() {
-			return fileBuffer.getFloat();
-		}
-	}
-
-	private class DoubleDeserializer implements Deserializer<Double> {
-		@Override
-		public Double deserialize() {
-			return fileBuffer.getDouble();
-		}
-	}
-
-	private class StringDeserializer implements Deserializer<String> {
-		private int size;
-
-		@Override
-		public String deserialize() {
-			size = fileBuffer.getInt();
-			byte[] buffer = new byte[size];
-			fileBuffer.get(buffer);
-			return new String(buffer);
-		}
-	}
-
-	private class NullDeserializer implements Deserializer<Object> {
-		@Override
-		public Object deserialize() {
-			return null;
-		}
-	}
-
-	private class BytesDeserializer implements Deserializer<byte[]> {
-		@Override
-		public byte[] deserialize() {
-			int length = fileBuffer.getInt();
-			byte[] result = new byte[length];
-			fileBuffer.get(result);
-			return result;
-		}
-
-	}
-
-	private class TupleDeserializer implements Deserializer<Tuple> {
-		Deserializer<?>[] deserializer = null;
-		Tuple reuse;
-
-		public TupleDeserializer() {
-			int size = fileBuffer.getInt();
-			reuse = createTuple(size);
-			deserializer = new Deserializer[size];
-			for (int x = 0; x < deserializer.length; x++) {
-				deserializer[x] = getDeserializer(fileBuffer.get());
-			}
-		}
-
-		@Override
-		public Tuple deserialize() {
-			for (int x = 0; x < deserializer.length; x++) {
-				reuse.setField(deserializer[x].deserialize(), x);
-			}
-			return reuse;
-		}
-	}
-
-	public static Tuple createTuple(int size) {
-		try {
-			return Tuple.getTupleClass(size).newInstance();
-		} catch (InstantiationException e) {
-			throw new RuntimeException(e);
-		} catch (IllegalAccessException e) {
-			throw new RuntimeException(e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Sender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Sender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Sender.java
deleted file mode 100644
index 2db1441..0000000
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Sender.java
+++ /dev/null
@@ -1,427 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.python.api.streaming;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.java.tuple.Tuple;
-import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
-import static org.apache.flink.python.api.PythonPlanBinder.MAPPED_FILE_SIZE;
-import org.apache.flink.python.api.types.CustomTypeWrapper;
-
-/**
- * General-purpose class to write data to memory-mapped files.
- */
-public class Sender implements Serializable {
-	public static final byte TYPE_TUPLE = (byte) 11;
-	public static final byte TYPE_BOOLEAN = (byte) 10;
-	public static final byte TYPE_BYTE = (byte) 9;
-	public static final byte TYPE_SHORT = (byte) 8;
-	public static final byte TYPE_INTEGER = (byte) 7;
-	public static final byte TYPE_LONG = (byte) 6;
-	public static final byte TYPE_DOUBLE = (byte) 4;
-	public static final byte TYPE_FLOAT = (byte) 5;
-	public static final byte TYPE_CHAR = (byte) 3;
-	public static final byte TYPE_STRING = (byte) 2;
-	public static final byte TYPE_BYTES = (byte) 1;
-	public static final byte TYPE_NULL = (byte) 0;
-
-	private final AbstractRichFunction function;
-
-	private File outputFile;
-	private RandomAccessFile outputRAF;
-	private FileChannel outputChannel;
-	private MappedByteBuffer fileBuffer;
-
-	private final ByteBuffer[] saved = new ByteBuffer[2];
-
-	private final Serializer[] serializer = new Serializer[2];
-
-	public Sender(AbstractRichFunction function) {
-		this.function = function;
-	}
-
-	//=====Setup========================================================================================================
-	public void open(String path) throws IOException {
-		setupMappedFile(path);
-	}
-
-	private void setupMappedFile(String outputFilePath) throws FileNotFoundException, IOException {
-		File x = new File(FLINK_TMP_DATA_DIR);
-		x.mkdirs();
-
-		outputFile = new File(outputFilePath);
-		if (outputFile.exists()) {
-			outputFile.delete();
-		}
-		outputFile.createNewFile();
-		outputRAF = new RandomAccessFile(outputFilePath, "rw");
-		outputRAF.setLength(MAPPED_FILE_SIZE);
-		outputRAF.seek(MAPPED_FILE_SIZE - 1);
-		outputRAF.writeByte(0);
-		outputRAF.seek(0);
-		outputChannel = outputRAF.getChannel();
-		fileBuffer = outputChannel.map(FileChannel.MapMode.READ_WRITE, 0, MAPPED_FILE_SIZE);
-	}
-
-	public void close() throws IOException {
-		closeMappedFile();
-	}
-
-	private void closeMappedFile() throws IOException {
-		outputChannel.close();
-		outputRAF.close();
-	}
-
-	/**
-	 * Resets this object to the post-configuration state.
-	 */
-	public void reset() {
-		serializer[0] = null;
-		serializer[1] = null;
-		fileBuffer.clear();
-	}
-
-	//=====Serialization================================================================================================
-	/**
-	 * Writes a single record to the memory-mapped file. This method does NOT take care of synchronization. The user
-	 * must guarantee that the file may be written to before calling this method. This method essentially reserves the
-	 * whole buffer for one record. As such it imposes some performance restrictions and should only be used when
-	 * absolutely necessary.
-	 *
-	 * @param value record to send
-	 * @return size of the written buffer
-	 * @throws IOException
-	 */
-	public int sendRecord(Object value) throws IOException {
-		fileBuffer.clear();
-		int group = 0;
-
-		serializer[group] = getSerializer(value);
-		ByteBuffer bb = serializer[group].serialize(value);
-		if (bb.remaining() > MAPPED_FILE_SIZE) {
-			throw new RuntimeException("Serialized object does not fit into a single buffer.");
-		}
-		fileBuffer.put(bb);
-
-		int size = fileBuffer.position();
-
-		reset();
-		return size;
-	}
-
-	public boolean hasRemaining(int group) {
-		return saved[group] != null;
-	}
-
-	/**
-	 * Extracts records from an iterator and writes them to the memory-mapped file. This method assumes that all values
-	 * in the iterator are of the same type. This method does NOT take care of synchronization. The caller must
-	 * guarantee that the file may be written to before calling this method.
-	 *
-	 * @param i iterator containing records
-	 * @param group group to which the iterator belongs, most notably used by CoGroup-functions.
-	 * @return size of the written buffer
-	 * @throws IOException
-	 */
-	public int sendBuffer(Iterator i, int group) throws IOException {
-		fileBuffer.clear();
-
-		Object value;
-		ByteBuffer bb;
-		if (serializer[group] == null) {
-			value = i.next();
-			serializer[group] = getSerializer(value);
-			bb = serializer[group].serialize(value);
-			if (bb.remaining() > MAPPED_FILE_SIZE) {
-				throw new RuntimeException("Serialized object does not fit into a single buffer.");
-			}
-			fileBuffer.put(bb);
-
-		}
-		if (saved[group] != null) {
-			fileBuffer.put(saved[group]);
-			saved[group] = null;
-		}
-		while (i.hasNext() && saved[group] == null) {
-			value = i.next();
-			bb = serializer[group].serialize(value);
-			if (bb.remaining() > MAPPED_FILE_SIZE) {
-				throw new RuntimeException("Serialized object does not fit into a single buffer.");
-			}
-			if (bb.remaining() <= fileBuffer.remaining()) {
-				fileBuffer.put(bb);
-			} else {
-				saved[group] = bb;
-			}
-		}
-
-		int size = fileBuffer.position();
-		return size;
-	}
-
-	private enum SupportedTypes {
-		TUPLE, BOOLEAN, BYTE, BYTES, CHARACTER, SHORT, INTEGER, LONG, FLOAT, DOUBLE, STRING, OTHER, NULL, CUSTOMTYPEWRAPPER
-	}
-
-	//=====Serializer===================================================================================================
-	private Serializer getSerializer(Object value) throws IOException {
-		String className = value.getClass().getSimpleName().toUpperCase();
-		if (className.startsWith("TUPLE")) {
-			className = "TUPLE";
-		}
-		if (className.startsWith("BYTE[]")) {
-			className = "BYTES";
-		}
-		SupportedTypes type = SupportedTypes.valueOf(className);
-		switch (type) {
-			case TUPLE:
-				fileBuffer.put(TYPE_TUPLE);
-				fileBuffer.putInt(((Tuple) value).getArity());
-				return new TupleSerializer((Tuple) value);
-			case BOOLEAN:
-				fileBuffer.put(TYPE_BOOLEAN);
-				return new BooleanSerializer();
-			case BYTE:
-				fileBuffer.put(TYPE_BYTE);
-				return new ByteSerializer();
-			case BYTES:
-				fileBuffer.put(TYPE_BYTES);
-				return new BytesSerializer();
-			case CHARACTER:
-				fileBuffer.put(TYPE_CHAR);
-				return new CharSerializer();
-			case SHORT:
-				fileBuffer.put(TYPE_SHORT);
-				return new ShortSerializer();
-			case INTEGER:
-				fileBuffer.put(TYPE_INTEGER);
-				return new IntSerializer();
-			case LONG:
-				fileBuffer.put(TYPE_LONG);
-				return new LongSerializer();
-			case STRING:
-				fileBuffer.put(TYPE_STRING);
-				return new StringSerializer();
-			case FLOAT:
-				fileBuffer.put(TYPE_FLOAT);
-				return new FloatSerializer();
-			case DOUBLE:
-				fileBuffer.put(TYPE_DOUBLE);
-				return new DoubleSerializer();
-			case NULL:
-				fileBuffer.put(TYPE_NULL);
-				return new NullSerializer();
-			case CUSTOMTYPEWRAPPER:
-				fileBuffer.put(((CustomTypeWrapper) value).getType());
-				return new CustomTypeSerializer();
-			default:
-				throw new IllegalArgumentException("Unknown Type encountered: " + type);
-		}
-	}
-
-	private abstract class Serializer<T> {
-		protected ByteBuffer buffer;
-
-		public Serializer(int capacity) {
-			buffer = ByteBuffer.allocate(capacity);
-		}
-
-		public ByteBuffer serialize(T value) {
-			buffer.clear();
-			serializeInternal(value);
-			buffer.flip();
-			return buffer;
-		}
-
-		public abstract void serializeInternal(T value);
-	}
-
-	private class CustomTypeSerializer extends Serializer<CustomTypeWrapper> {
-		public CustomTypeSerializer() {
-			super(0);
-		}
-		@Override
-		public void serializeInternal(CustomTypeWrapper value) {
-			byte[] bytes = value.getData();
-			buffer = ByteBuffer.wrap(bytes);
-			buffer.position(bytes.length);
-		}
-	}
-
-	private class ByteSerializer extends Serializer<Byte> {
-		public ByteSerializer() {
-			super(1);
-		}
-
-		@Override
-		public void serializeInternal(Byte value) {
-			buffer.put(value);
-		}
-	}
-
-	private class BooleanSerializer extends Serializer<Boolean> {
-		public BooleanSerializer() {
-			super(1);
-		}
-
-		@Override
-		public void serializeInternal(Boolean value) {
-			buffer.put(value ? (byte) 1 : (byte) 0);
-		}
-	}
-
-	private class CharSerializer extends Serializer<Character> {
-		public CharSerializer() {
-			super(4);
-		}
-
-		@Override
-		public void serializeInternal(Character value) {
-			buffer.put((value + "").getBytes());
-		}
-	}
-
-	private class ShortSerializer extends Serializer<Short> {
-		public ShortSerializer() {
-			super(2);
-		}
-
-		@Override
-		public void serializeInternal(Short value) {
-			buffer.putShort(value);
-		}
-	}
-
-	private class IntSerializer extends Serializer<Integer> {
-		public IntSerializer() {
-			super(4);
-		}
-
-		@Override
-		public void serializeInternal(Integer value) {
-			buffer.putInt(value);
-		}
-	}
-
-	private class LongSerializer extends Serializer<Long> {
-		public LongSerializer() {
-			super(8);
-		}
-
-		@Override
-		public void serializeInternal(Long value) {
-			buffer.putLong(value);
-		}
-	}
-
-	private class StringSerializer extends Serializer<String> {
-		public StringSerializer() {
-			super(0);
-		}
-
-		@Override
-		public void serializeInternal(String value) {
-			byte[] bytes = value.getBytes();
-			buffer = ByteBuffer.allocate(bytes.length + 4);
-			buffer.putInt(bytes.length);
-			buffer.put(bytes);
-		}
-	}
-
-	private class FloatSerializer extends Serializer<Float> {
-		public FloatSerializer() {
-			super(4);
-		}
-
-		@Override
-		public void serializeInternal(Float value) {
-			buffer.putFloat(value);
-		}
-	}
-
-	private class DoubleSerializer extends Serializer<Double> {
-		public DoubleSerializer() {
-			super(8);
-		}
-
-		@Override
-		public void serializeInternal(Double value) {
-			buffer.putDouble(value);
-		}
-	}
-
-	private class NullSerializer extends Serializer<Object> {
-		public NullSerializer() {
-			super(0);
-		}
-
-		@Override
-		public void serializeInternal(Object value) {
-		}
-	}
-
-	private class BytesSerializer extends Serializer<byte[]> {
-		public BytesSerializer() {
-			super(0);
-		}
-
-		@Override
-		public void serializeInternal(byte[] value) {
-			buffer = ByteBuffer.allocate(4 + value.length);
-			buffer.putInt(value.length);
-			buffer.put(value);
-		}
-	}
-
-	private class TupleSerializer extends Serializer<Tuple> {
-		private final Serializer[] serializer;
-		private final List<ByteBuffer> buffers;
-
-		public TupleSerializer(Tuple value) throws IOException {
-			super(0);
-			serializer = new Serializer[value.getArity()];
-			buffers = new ArrayList();
-			for (int x = 0; x < serializer.length; x++) {
-				serializer[x] = getSerializer(value.getField(x));
-			}
-		}
-
-		@Override
-		public void serializeInternal(Tuple value) {
-			int length = 0;
-			for (int x = 0; x < serializer.length; x++) {
-				serializer[x].buffer.clear();
-				serializer[x].serializeInternal(value.getField(x));
-				length += serializer[x].buffer.position();
-				buffers.add(serializer[x].buffer);
-			}
-			buffer = ByteBuffer.allocate(length);
-			for (ByteBuffer b : buffers) {
-				b.flip();
-				buffer.put(b);
-			}
-			buffers.clear();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/StreamPrinter.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/StreamPrinter.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/StreamPrinter.java
deleted file mode 100644
index e42364b..0000000
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/StreamPrinter.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.python.api.streaming;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-/**
- * Simple utility class to print all contents of an inputstream to stdout.
- */
-public class StreamPrinter extends Thread {
-	private final BufferedReader reader;
-	private final boolean wrapInException;
-	private StringBuilder msg;
-
-	public StreamPrinter(InputStream stream) {
-		this(stream, false, null);
-	}
-
-	public StreamPrinter(InputStream stream, boolean wrapInException, StringBuilder msg) {
-		this.reader = new BufferedReader(new InputStreamReader(stream));
-		this.wrapInException = wrapInException;
-		this.msg = msg;
-	}
-
-	@Override
-	public void run() {
-		String line;
-		try {
-			if (wrapInException) {
-				while ((line = reader.readLine()) != null) {
-					msg.append("\n" + line);
-				}
-			} else {
-				while ((line = reader.readLine()) != null) {
-					System.out.println(line);
-					System.out.flush();
-				}
-			}
-		} catch (IOException ex) {
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
new file mode 100644
index 0000000..9ed047c
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.python.api.streaming.data;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.io.Serializable;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import org.apache.flink.api.java.tuple.Tuple;
+import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
+import static org.apache.flink.python.api.PythonPlanBinder.MAPPED_FILE_SIZE;
+import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_BOOLEAN;
+import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_BYTE;
+import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_BYTES;
+import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_DOUBLE;
+import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_FLOAT;
+import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_INTEGER;
+import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_LONG;
+import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_NULL;
+import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_SHORT;
+import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_STRING;
+import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_TUPLE;
+import org.apache.flink.python.api.types.CustomTypeWrapper;
+import org.apache.flink.util.Collector;
+
+/**
+ * General-purpose class to read data from memory-mapped files.
+ */
+public class PythonReceiver implements Serializable {
+	private static final long serialVersionUID = -2474088929850009968L;
+
+	private File inputFile;
+	private RandomAccessFile inputRAF;
+	private FileChannel inputChannel;
+	private MappedByteBuffer fileBuffer;
+
+	private Deserializer<?> deserializer = null;
+
+	//=====Setup========================================================================================================
+	public void open(String path) throws IOException {
+		setupMappedFile(path);
+	}
+
+	private void setupMappedFile(String inputFilePath) throws FileNotFoundException, IOException {
+		File x = new File(FLINK_TMP_DATA_DIR);
+		x.mkdirs();
+
+		inputFile = new File(inputFilePath);
+		if (inputFile.exists()) {
+			inputFile.delete();
+		}
+		inputFile.createNewFile();
+		inputRAF = new RandomAccessFile(inputFilePath, "rw");
+		inputRAF.setLength(MAPPED_FILE_SIZE);
+		inputRAF.seek(MAPPED_FILE_SIZE - 1);
+		inputRAF.writeByte(0);
+		inputRAF.seek(0);
+		inputChannel = inputRAF.getChannel();
+		fileBuffer = inputChannel.map(FileChannel.MapMode.READ_WRITE, 0, MAPPED_FILE_SIZE);
+	}
+
+	public void close() throws IOException {
+		closeMappedFile();
+	}
+
+	private void closeMappedFile() throws IOException {
+		inputChannel.close();
+		inputRAF.close();
+	}
+
+	/**
+	 * Reads a buffer of the given size from the memory-mapped file, and collects all records contained. This method
+	 * assumes that all values in the buffer are of the same type. This method does NOT take care of synchronization.
+	 * The user must guarantee that the buffer was completely written before calling this method.
+	 *
+	 * @param c Collector to collect records
+	 * @param bufferSize size of the buffer
+	 * @throws IOException
+	 */
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	public void collectBuffer(Collector c, int bufferSize) throws IOException {
+		fileBuffer.position(0);
+
+		if (deserializer == null) {
+			byte type = fileBuffer.get();
+			deserializer = getDeserializer(type);
+		}
+		while (fileBuffer.position() < bufferSize) {
+			c.collect(deserializer.deserialize());
+		}
+	}
+
+	//=====Deserializer=================================================================================================
+	private Deserializer<?> getDeserializer(byte type) {
+		switch (type) {
+			case TYPE_TUPLE:
+				return new TupleDeserializer();
+			case TYPE_BOOLEAN:
+				return new BooleanDeserializer();
+			case TYPE_BYTE:
+				return new ByteDeserializer();
+			case TYPE_BYTES:
+				return new BytesDeserializer();
+			case TYPE_SHORT:
+				return new ShortDeserializer();
+			case TYPE_INTEGER:
+				return new IntDeserializer();
+			case TYPE_LONG:
+				return new LongDeserializer();
+			case TYPE_STRING:
+				return new StringDeserializer();
+			case TYPE_FLOAT:
+				return new FloatDeserializer();
+			case TYPE_DOUBLE:
+				return new DoubleDeserializer();
+			case TYPE_NULL:
+				return new NullDeserializer();
+			default:
+				return new CustomTypeDeserializer(type);
+
+		}
+	}
+
+	private interface Deserializer<T> {
+		public T deserialize();
+	}
+
+	private class CustomTypeDeserializer implements Deserializer<CustomTypeWrapper> {
+		private final byte type;
+
+		public CustomTypeDeserializer(byte type) {
+			this.type = type;
+		}
+
+		@Override
+		public CustomTypeWrapper deserialize() {
+			int size = fileBuffer.getInt();
+			byte[] data = new byte[size];
+			fileBuffer.get(data);
+			return new CustomTypeWrapper(type, data);
+		}
+	}
+
+	private class BooleanDeserializer implements Deserializer<Boolean> {
+		@Override
+		public Boolean deserialize() {
+			return fileBuffer.get() == 1;
+		}
+	}
+
+	private class ByteDeserializer implements Deserializer<Byte> {
+		@Override
+		public Byte deserialize() {
+			return fileBuffer.get();
+		}
+	}
+
+	private class ShortDeserializer implements Deserializer<Short> {
+		@Override
+		public Short deserialize() {
+			return fileBuffer.getShort();
+		}
+	}
+
+	private class IntDeserializer implements Deserializer<Integer> {
+		@Override
+		public Integer deserialize() {
+			return fileBuffer.getInt();
+		}
+	}
+
+	private class LongDeserializer implements Deserializer<Long> {
+		@Override
+		public Long deserialize() {
+			return fileBuffer.getLong();
+		}
+	}
+
+	private class FloatDeserializer implements Deserializer<Float> {
+		@Override
+		public Float deserialize() {
+			return fileBuffer.getFloat();
+		}
+	}
+
+	private class DoubleDeserializer implements Deserializer<Double> {
+		@Override
+		public Double deserialize() {
+			return fileBuffer.getDouble();
+		}
+	}
+
+	private class StringDeserializer implements Deserializer<String> {
+		private int size;
+
+		@Override
+		public String deserialize() {
+			size = fileBuffer.getInt();
+			byte[] buffer = new byte[size];
+			fileBuffer.get(buffer);
+			return new String(buffer);
+		}
+	}
+
+	private class NullDeserializer implements Deserializer<Object> {
+		@Override
+		public Object deserialize() {
+			return null;
+		}
+	}
+
+	private class BytesDeserializer implements Deserializer<byte[]> {
+		@Override
+		public byte[] deserialize() {
+			int length = fileBuffer.getInt();
+			byte[] result = new byte[length];
+			fileBuffer.get(result);
+			return result;
+		}
+
+	}
+
+	private class TupleDeserializer implements Deserializer<Tuple> {
+		Deserializer<?>[] deserializer = null;
+		Tuple reuse;
+
+		public TupleDeserializer() {
+			int size = fileBuffer.getInt();
+			reuse = createTuple(size);
+			deserializer = new Deserializer[size];
+			for (int x = 0; x < deserializer.length; x++) {
+				deserializer[x] = getDeserializer(fileBuffer.get());
+			}
+		}
+
+		@Override
+		public Tuple deserialize() {
+			for (int x = 0; x < deserializer.length; x++) {
+				reuse.setField(deserializer[x].deserialize(), x);
+			}
+			return reuse;
+		}
+	}
+
+	public static Tuple createTuple(int size) {
+		try {
+			return Tuple.getTupleClass(size).newInstance();
+		} catch (InstantiationException e) {
+			throw new RuntimeException(e);
+		} catch (IllegalAccessException e) {
+			throw new RuntimeException(e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
new file mode 100644
index 0000000..1d17243
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.python.api.streaming.data;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.flink.api.java.tuple.Tuple;
+import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
+import static org.apache.flink.python.api.PythonPlanBinder.MAPPED_FILE_SIZE;
+import org.apache.flink.python.api.types.CustomTypeWrapper;
+
+/**
+ * General-purpose class to write data to memory-mapped files.
+ */
+public class PythonSender implements Serializable {
+	public static final byte TYPE_TUPLE = (byte) 11;
+	public static final byte TYPE_BOOLEAN = (byte) 10;
+	public static final byte TYPE_BYTE = (byte) 9;
+	public static final byte TYPE_SHORT = (byte) 8;
+	public static final byte TYPE_INTEGER = (byte) 7;
+	public static final byte TYPE_LONG = (byte) 6;
+	public static final byte TYPE_DOUBLE = (byte) 4;
+	public static final byte TYPE_FLOAT = (byte) 5;
+	public static final byte TYPE_CHAR = (byte) 3;
+	public static final byte TYPE_STRING = (byte) 2;
+	public static final byte TYPE_BYTES = (byte) 1;
+	public static final byte TYPE_NULL = (byte) 0;
+
+	private File outputFile;
+	private RandomAccessFile outputRAF;
+	private FileChannel outputChannel;
+	private MappedByteBuffer fileBuffer;
+
+	private final ByteBuffer[] saved = new ByteBuffer[2];
+
+	private final Serializer[] serializer = new Serializer[2];
+
+	//=====Setup========================================================================================================
+	public void open(String path) throws IOException {
+		setupMappedFile(path);
+	}
+
+	private void setupMappedFile(String outputFilePath) throws FileNotFoundException, IOException {
+		File x = new File(FLINK_TMP_DATA_DIR);
+		x.mkdirs();
+
+		outputFile = new File(outputFilePath);
+		if (outputFile.exists()) {
+			outputFile.delete();
+		}
+		outputFile.createNewFile();
+		outputRAF = new RandomAccessFile(outputFilePath, "rw");
+		outputRAF.setLength(MAPPED_FILE_SIZE);
+		outputRAF.seek(MAPPED_FILE_SIZE - 1);
+		outputRAF.writeByte(0);
+		outputRAF.seek(0);
+		outputChannel = outputRAF.getChannel();
+		fileBuffer = outputChannel.map(FileChannel.MapMode.READ_WRITE, 0, MAPPED_FILE_SIZE);
+	}
+
+	public void close() throws IOException {
+		closeMappedFile();
+	}
+
+	private void closeMappedFile() throws IOException {
+		outputChannel.close();
+		outputRAF.close();
+	}
+
+	/**
+	 * Resets this object to the post-configuration state.
+	 */
+	public void reset() {
+		serializer[0] = null;
+		serializer[1] = null;
+		fileBuffer.clear();
+	}
+
+	//=====Serialization================================================================================================
+	/**
+	 * Writes a single record to the memory-mapped file. This method does NOT take care of synchronization. The user
+	 * must guarantee that the file may be written to before calling this method. This method essentially reserves the
+	 * whole buffer for one record. As such it imposes some performance restrictions and should only be used when
+	 * absolutely necessary.
+	 *
+	 * @param value record to send
+	 * @return size of the written buffer
+	 * @throws IOException
+	 */
+	public int sendRecord(Object value) throws IOException {
+		fileBuffer.clear();
+		int group = 0;
+
+		serializer[group] = getSerializer(value);
+		ByteBuffer bb = serializer[group].serialize(value);
+		if (bb.remaining() > MAPPED_FILE_SIZE) {
+			throw new RuntimeException("Serialized object does not fit into a single buffer.");
+		}
+		fileBuffer.put(bb);
+
+		int size = fileBuffer.position();
+
+		reset();
+		return size;
+	}
+
+	public boolean hasRemaining(int group) {
+		return saved[group] != null;
+	}
+
+	/**
+	 * Extracts records from an iterator and writes them to the memory-mapped file. This method assumes that all values
+	 * in the iterator are of the same type. This method does NOT take care of synchronization. The caller must
+	 * guarantee that the file may be written to before calling this method.
+	 *
+	 * @param i iterator containing records
+	 * @param group group to which the iterator belongs, most notably used by CoGroup-functions.
+	 * @return size of the written buffer
+	 * @throws IOException
+	 */
+	public int sendBuffer(Iterator i, int group) throws IOException {
+		fileBuffer.clear();
+
+		Object value;
+		ByteBuffer bb;
+		if (serializer[group] == null) {
+			value = i.next();
+			serializer[group] = getSerializer(value);
+			bb = serializer[group].serialize(value);
+			if (bb.remaining() > MAPPED_FILE_SIZE) {
+				throw new RuntimeException("Serialized object does not fit into a single buffer.");
+			}
+			fileBuffer.put(bb);
+
+		}
+		if (saved[group] != null) {
+			fileBuffer.put(saved[group]);
+			saved[group] = null;
+		}
+		while (i.hasNext() && saved[group] == null) {
+			value = i.next();
+			bb = serializer[group].serialize(value);
+			if (bb.remaining() > MAPPED_FILE_SIZE) {
+				throw new RuntimeException("Serialized object does not fit into a single buffer.");
+			}
+			if (bb.remaining() <= fileBuffer.remaining()) {
+				fileBuffer.put(bb);
+			} else {
+				saved[group] = bb;
+			}
+		}
+
+		int size = fileBuffer.position();
+		return size;
+	}
+
+	private enum SupportedTypes {
+		TUPLE, BOOLEAN, BYTE, BYTES, CHARACTER, SHORT, INTEGER, LONG, FLOAT, DOUBLE, STRING, OTHER, NULL, CUSTOMTYPEWRAPPER
+	}
+
+	//=====Serializer===================================================================================================
+	private Serializer getSerializer(Object value) throws IOException {
+		String className = value.getClass().getSimpleName().toUpperCase();
+		if (className.startsWith("TUPLE")) {
+			className = "TUPLE";
+		}
+		if (className.startsWith("BYTE[]")) {
+			className = "BYTES";
+		}
+		SupportedTypes type = SupportedTypes.valueOf(className);
+		switch (type) {
+			case TUPLE:
+				fileBuffer.put(TYPE_TUPLE);
+				fileBuffer.putInt(((Tuple) value).getArity());
+				return new TupleSerializer((Tuple) value);
+			case BOOLEAN:
+				fileBuffer.put(TYPE_BOOLEAN);
+				return new BooleanSerializer();
+			case BYTE:
+				fileBuffer.put(TYPE_BYTE);
+				return new ByteSerializer();
+			case BYTES:
+				fileBuffer.put(TYPE_BYTES);
+				return new BytesSerializer();
+			case CHARACTER:
+				fileBuffer.put(TYPE_CHAR);
+				return new CharSerializer();
+			case SHORT:
+				fileBuffer.put(TYPE_SHORT);
+				return new ShortSerializer();
+			case INTEGER:
+				fileBuffer.put(TYPE_INTEGER);
+				return new IntSerializer();
+			case LONG:
+				fileBuffer.put(TYPE_LONG);
+				return new LongSerializer();
+			case STRING:
+				fileBuffer.put(TYPE_STRING);
+				return new StringSerializer();
+			case FLOAT:
+				fileBuffer.put(TYPE_FLOAT);
+				return new FloatSerializer();
+			case DOUBLE:
+				fileBuffer.put(TYPE_DOUBLE);
+				return new DoubleSerializer();
+			case NULL:
+				fileBuffer.put(TYPE_NULL);
+				return new NullSerializer();
+			case CUSTOMTYPEWRAPPER:
+				fileBuffer.put(((CustomTypeWrapper) value).getType());
+				return new CustomTypeSerializer();
+			default:
+				throw new IllegalArgumentException("Unknown Type encountered: " + type);
+		}
+	}
+
+	private abstract class Serializer<T> {
+		protected ByteBuffer buffer;
+
+		public Serializer(int capacity) {
+			buffer = ByteBuffer.allocate(capacity);
+		}
+
+		public ByteBuffer serialize(T value) {
+			buffer.clear();
+			serializeInternal(value);
+			buffer.flip();
+			return buffer;
+		}
+
+		public abstract void serializeInternal(T value);
+	}
+
+	private class CustomTypeSerializer extends Serializer<CustomTypeWrapper> {
+		public CustomTypeSerializer() {
+			super(0);
+		}
+		@Override
+		public void serializeInternal(CustomTypeWrapper value) {
+			byte[] bytes = value.getData();
+			buffer = ByteBuffer.wrap(bytes);
+			buffer.position(bytes.length);
+		}
+	}
+
+	private class ByteSerializer extends Serializer<Byte> {
+		public ByteSerializer() {
+			super(1);
+		}
+
+		@Override
+		public void serializeInternal(Byte value) {
+			buffer.put(value);
+		}
+	}
+
+	private class BooleanSerializer extends Serializer<Boolean> {
+		public BooleanSerializer() {
+			super(1);
+		}
+
+		@Override
+		public void serializeInternal(Boolean value) {
+			buffer.put(value ? (byte) 1 : (byte) 0);
+		}
+	}
+
+	private class CharSerializer extends Serializer<Character> {
+		public CharSerializer() {
+			super(4);
+		}
+
+		@Override
+		public void serializeInternal(Character value) {
+			buffer.put((value + "").getBytes());
+		}
+	}
+
+	private class ShortSerializer extends Serializer<Short> {
+		public ShortSerializer() {
+			super(2);
+		}
+
+		@Override
+		public void serializeInternal(Short value) {
+			buffer.putShort(value);
+		}
+	}
+
+	private class IntSerializer extends Serializer<Integer> {
+		public IntSerializer() {
+			super(4);
+		}
+
+		@Override
+		public void serializeInternal(Integer value) {
+			buffer.putInt(value);
+		}
+	}
+
+	private class LongSerializer extends Serializer<Long> {
+		public LongSerializer() {
+			super(8);
+		}
+
+		@Override
+		public void serializeInternal(Long value) {
+			buffer.putLong(value);
+		}
+	}
+
+	private class StringSerializer extends Serializer<String> {
+		public StringSerializer() {
+			super(0);
+		}
+
+		@Override
+		public void serializeInternal(String value) {
+			byte[] bytes = value.getBytes();
+			buffer = ByteBuffer.allocate(bytes.length + 4);
+			buffer.putInt(bytes.length);
+			buffer.put(bytes);
+		}
+	}
+
+	private class FloatSerializer extends Serializer<Float> {
+		public FloatSerializer() {
+			super(4);
+		}
+
+		@Override
+		public void serializeInternal(Float value) {
+			buffer.putFloat(value);
+		}
+	}
+
+	private class DoubleSerializer extends Serializer<Double> {
+		public DoubleSerializer() {
+			super(8);
+		}
+
+		@Override
+		public void serializeInternal(Double value) {
+			buffer.putDouble(value);
+		}
+	}
+
+	private class NullSerializer extends Serializer<Object> {
+		public NullSerializer() {
+			super(0);
+		}
+
+		@Override
+		public void serializeInternal(Object value) {
+		}
+	}
+
+	private class BytesSerializer extends Serializer<byte[]> {
+		public BytesSerializer() {
+			super(0);
+		}
+
+		@Override
+		public void serializeInternal(byte[] value) {
+			buffer = ByteBuffer.allocate(4 + value.length);
+			buffer.putInt(value.length);
+			buffer.put(value);
+		}
+	}
+
+	private class TupleSerializer extends Serializer<Tuple> {
+		private final Serializer[] serializer;
+		private final List<ByteBuffer> buffers;
+
+		public TupleSerializer(Tuple value) throws IOException {
+			super(0);
+			serializer = new Serializer[value.getArity()];
+			buffers = new ArrayList();
+			for (int x = 0; x < serializer.length; x++) {
+				serializer[x] = getSerializer(value.getField(x));
+			}
+		}
+
+		@Override
+		public void serializeInternal(Tuple value) {
+			int length = 0;
+			for (int x = 0; x < serializer.length; x++) {
+				serializer[x].buffer.clear();
+				serializer[x].serializeInternal(value.getField(x));
+				length += serializer[x].buffer.position();
+				buffers.add(serializer[x].buffer);
+			}
+			buffer = ByteBuffer.allocate(length);
+			for (ByteBuffer b : buffers) {
+				b.flip();
+				buffer.put(b);
+			}
+			buffers.clear();
+		}
+	}
+}


Mime
View raw message