flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [4/5] flink git commit: [FLINK-6229] [py] Rework configuration of PythonPlanBinder/Operators
Date Thu, 06 Apr 2017 08:58:10 GMT
[FLINK-6229] [py] Rework configuration of PythonPlanBinder/Operators

- unify python2/python3 configuration
- explicitly pass on a configuration to each operator
- port all configuration options to ConfigOptions
- [FLINK-5516] Make all paths explicitly configurable
- [FLINK-6230] Make mmap size configurable


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bdcebfda
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bdcebfda
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bdcebfda

Branch: refs/heads/master
Commit: bdcebfda06846a1e21bb6a4678909d503ebc6333
Parents: 940d16c
Author: zentol <chesnay@apache.org>
Authored: Fri Mar 31 11:55:18 2017 +0200
Committer: zentol <chesnay@apache.org>
Committed: Thu Apr 6 10:57:11 2017 +0200

----------------------------------------------------------------------
 docs/dev/batch/python.md                        |  10 +-
 flink-dist/src/main/flink-bin/bin/pyflink.bat   |  25 ++++
 flink-dist/src/main/flink-bin/bin/pyflink.sh    |  25 ++++
 flink-dist/src/main/flink-bin/bin/pyflink2.bat  |  25 ----
 flink-dist/src/main/flink-bin/bin/pyflink2.sh   |  25 ----
 flink-dist/src/main/flink-bin/bin/pyflink3.bat  |  25 ----
 flink-dist/src/main/flink-bin/bin/pyflink3.sh   |  26 ----
 .../apache/flink/python/api/PythonOptions.java  |  74 ++++++++++
 .../flink/python/api/PythonPlanBinder.java      | 143 +++++++++++--------
 .../python/api/functions/PythonCoGroup.java     |   4 +-
 .../api/functions/PythonMapPartition.java       |   4 +-
 .../streaming/data/PythonDualInputSender.java   |   6 +
 .../streaming/data/PythonDualInputStreamer.java |   5 +-
 .../api/streaming/data/PythonReceiver.java      |  29 ++--
 .../python/api/streaming/data/PythonSender.java |  36 ++---
 .../streaming/data/PythonSingleInputSender.java |   6 +
 .../data/PythonSingleInputStreamer.java         |   9 +-
 .../api/streaming/data/PythonStreamer.java      |  39 ++---
 .../api/streaming/plan/PythonPlanStreamer.java  |  12 +-
 .../python/api/flink/connection/Connection.py   |  21 ++-
 .../api/flink/functions/CoGroupFunction.py      |   4 +-
 .../python/api/flink/functions/Function.py      |   4 +-
 .../api/flink/functions/GroupReduceFunction.py  |   4 +-
 .../api/flink/functions/ReduceFunction.py       |   4 +-
 .../flink/python/api/flink/plan/Environment.py  |   5 +-
 .../flink/python/api/PythonPlanBinderTest.java  |  12 +-
 26 files changed, 322 insertions(+), 260 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/docs/dev/batch/python.md
----------------------------------------------------------------------
diff --git a/docs/dev/batch/python.md b/docs/dev/batch/python.md
index 09a4fa8..c4c2671 100644
--- a/docs/dev/batch/python.md
+++ b/docs/dev/batch/python.md
@@ -149,8 +149,7 @@ Apart from setting up Flink, no additional work is required. The python package
 
 The Python API was tested on Linux/Windows systems that have Python 2.7 or 3.4 installed.
 
-By default Flink will start python processes by calling "python" or "python3", depending on which start-script
-was used. By setting the "python.binary.python[2/3]" key in the flink-conf.yaml you can modify this behaviour to use a binary of your choice.
+By default Flink will start python processes by calling "python". By setting the "python.binary.path" key in the flink-conf.yaml you can modify this behaviour to use a binary of your choice.
 
 {% top %}
 
@@ -624,12 +623,11 @@ Executing Plans
 ---------------
 
 To run the plan with Flink, go to your Flink distribution, and run the pyflink.sh script from the /bin folder.
-use pyflink2.sh for python 2.7, and pyflink3.sh for python 3.4. The script containing the plan has to be passed
-as the first argument, followed by a number of additional python packages, and finally, separated by - additional
-arguments that will be fed to the script.
+The script containing the plan has to be passed as the first argument, followed by a number of additional python
+packages, and finally, separated by - additional arguments that will be fed to the script.
 
 {% highlight python %}
-./bin/pyflink<2/3>.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]]
+./bin/pyflink.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]]
 {% endhighlight %}
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-dist/src/main/flink-bin/bin/pyflink.bat
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/pyflink.bat b/flink-dist/src/main/flink-bin/bin/pyflink.bat
new file mode 100644
index 0000000..da180f3
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/pyflink.bat
@@ -0,0 +1,25 @@
+::###############################################################################
+::  Licensed to the Apache Software Foundation (ASF) under one
+::  or more contributor license agreements.  See the NOTICE file
+::  distributed with this work for additional information
+::  regarding copyright ownership.  The ASF licenses this file
+::  to you under the Apache License, Version 2.0 (the
+::  "License"); you may not use this file except in compliance
+::  with the License.  You may obtain a copy of the License at
+::
+::      http://www.apache.org/licenses/LICENSE-2.0
+::
+::  Unless required by applicable law or agreed to in writing, software
+::  distributed under the License is distributed on an "AS IS" BASIS,
+::  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+::  See the License for the specific language governing permissions and
+:: limitations under the License.
+::###############################################################################
+
+@echo off
+setlocal EnableDelayedExpansion
+
+SET bin=%~dp0
+SET FLINK_ROOT_DIR=%bin%..
+
+"%FLINK_ROOT_DIR%\bin\flink" run -v "%FLINK_ROOT_DIR%"\lib\flink-python*.jar %*
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-dist/src/main/flink-bin/bin/pyflink.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/pyflink.sh b/flink-dist/src/main/flink-bin/bin/pyflink.sh
new file mode 100644
index 0000000..37679d3
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/pyflink.sh
@@ -0,0 +1,25 @@
+#!/bin/bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+"$FLINK_BIN_DIR"/flink run -v "$FLINK_ROOT_DIR"/lib/flink-python*.jar "$@"

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-dist/src/main/flink-bin/bin/pyflink2.bat
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/pyflink2.bat b/flink-dist/src/main/flink-bin/bin/pyflink2.bat
deleted file mode 100644
index 1122ed4..0000000
--- a/flink-dist/src/main/flink-bin/bin/pyflink2.bat
+++ /dev/null
@@ -1,25 +0,0 @@
-::###############################################################################
-::  Licensed to the Apache Software Foundation (ASF) under one
-::  or more contributor license agreements.  See the NOTICE file
-::  distributed with this work for additional information
-::  regarding copyright ownership.  The ASF licenses this file
-::  to you under the Apache License, Version 2.0 (the
-::  "License"); you may not use this file except in compliance
-::  with the License.  You may obtain a copy of the License at
-::
-::      http://www.apache.org/licenses/LICENSE-2.0
-::
-::  Unless required by applicable law or agreed to in writing, software
-::  distributed under the License is distributed on an "AS IS" BASIS,
-::  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-::  See the License for the specific language governing permissions and
-:: limitations under the License.
-::###############################################################################
-
-@echo off
-setlocal EnableDelayedExpansion
-
-SET bin=%~dp0
-SET FLINK_ROOT_DIR=%bin%..
-
-"%FLINK_ROOT_DIR%\bin\flink" run -v "%FLINK_ROOT_DIR%"\lib\flink-python*.jar 2 %*
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-dist/src/main/flink-bin/bin/pyflink2.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/pyflink2.sh b/flink-dist/src/main/flink-bin/bin/pyflink2.sh
deleted file mode 100644
index 8a326e9..0000000
--- a/flink-dist/src/main/flink-bin/bin/pyflink2.sh
+++ /dev/null
@@ -1,25 +0,0 @@
-#!/bin/bash
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-bin=`dirname "$0"`
-bin=`cd "$bin"; pwd`
-
-. "$bin"/config.sh
-
-"$FLINK_BIN_DIR"/flink run -v "$FLINK_ROOT_DIR"/lib/flink-python*.jar "2" "$@"

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-dist/src/main/flink-bin/bin/pyflink3.bat
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/pyflink3.bat b/flink-dist/src/main/flink-bin/bin/pyflink3.bat
deleted file mode 100644
index 0294e37..0000000
--- a/flink-dist/src/main/flink-bin/bin/pyflink3.bat
+++ /dev/null
@@ -1,25 +0,0 @@
-::###############################################################################
-::  Licensed to the Apache Software Foundation (ASF) under one
-::  or more contributor license agreements.  See the NOTICE file
-::  distributed with this work for additional information
-::  regarding copyright ownership.  The ASF licenses this file
-::  to you under the Apache License, Version 2.0 (the
-::  "License"); you may not use this file except in compliance
-::  with the License.  You may obtain a copy of the License at
-::
-::      http://www.apache.org/licenses/LICENSE-2.0
-::
-::  Unless required by applicable law or agreed to in writing, software
-::  distributed under the License is distributed on an "AS IS" BASIS,
-::  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-::  See the License for the specific language governing permissions and
-:: limitations under the License.
-::###############################################################################
-
-@echo off
-setlocal EnableDelayedExpansion
-
-SET bin=%~dp0
-SET FLINK_ROOT_DIR=%bin%..
-
-"%FLINK_ROOT_DIR%\bin\flink" run -v "%FLINK_ROOT_DIR%"\lib\flink-python*.jar 3 %*
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-dist/src/main/flink-bin/bin/pyflink3.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/pyflink3.sh b/flink-dist/src/main/flink-bin/bin/pyflink3.sh
deleted file mode 100644
index d2d1991..0000000
--- a/flink-dist/src/main/flink-bin/bin/pyflink3.sh
+++ /dev/null
@@ -1,26 +0,0 @@
-#!/bin/bash
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-bin=`dirname "$0"`
-bin=`cd "$bin"; pwd`
-
-. "$bin"/config.sh
-
-
-"$FLINK_BIN_DIR"/flink run -v "$FLINK_ROOT_DIR"/lib/flink-python*.jar "3" "$@"

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java
new file mode 100644
index 0000000..de053a0
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.python.api;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import java.io.File;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for the Python API.
+ */
+public class PythonOptions {
+
+	/**
+	 * The config parameter defining the path to the python binary to use.
+	 */
+	public static final ConfigOption<String> PYTHON_BINARY_PATH =
+		key("python.binary.path")
+			.defaultValue("python")
+		.withDeprecatedKeys("python.binary.python2", "python.binary.python3");
+
+	/**
+	 * The config parameter defining the size of the memory-mapped files, in kb.
+	 * This value must be large enough to ensure that the largest serialized record can be written completely into
+	 * the file.
+	 * 
+	 * Every task will allocate 2 memory-files, each with this size.
+	 */
+	public static final ConfigOption<Long> MMAP_FILE_SIZE =
+		key("python.mmap.size.kb")
+			.defaultValue(4L);
+
+	/**
+	 * The config parameter defining where temporary plan-related files are stored on the client.
+	 */
+	public static final ConfigOption<String> PLAN_TMP_DIR =
+		key("python.plan.tmp.dir")
+			.noDefaultValue();
+
+	/**
+	 * The config parameter defining where the memory-mapped files will be created.
+	 */
+	public static final ConfigOption<String> DATA_TMP_DIR =
+		key("python.mmap.tmp.dir")
+			.noDefaultValue();
+
+	/**
+	 * The config parameter defining where the flink python library and user supplied files will be uploaded to before
+	 * registering them with the Distributed Cache. This directory must be accessible from all worker nodes.
+	 */
+	public static final ConfigOption<String> DC_TMP_DIR =
+		key("python.dc.tmp.dir")
+			.defaultValue(System.getProperty("java.io.tmpdir") + File.separator + "flink_dc");
+
+	private PythonOptions() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 733a6fb..b6181b4 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -18,7 +18,6 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.LocalEnvironment;
 import org.apache.flink.api.java.io.PrintingOutputFormat;
 import org.apache.flink.api.java.io.TupleCsvInputFormat;
 import org.apache.flink.api.java.operators.CoGroupRawOperator;
@@ -50,7 +49,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.Random;
@@ -65,42 +63,30 @@ import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.TINY;
 public class PythonPlanBinder {
 	static final Logger LOG = LoggerFactory.getLogger(PythonPlanBinder.class);
 
-	public static final String ARGUMENT_PYTHON_2 = "2";
-	public static final String ARGUMENT_PYTHON_3 = "3";
-
 	public static final String FLINK_PYTHON_DC_ID = "flink";
 	public static final String FLINK_PYTHON_PLAN_NAME = File.separator + "plan.py";
 
-	public static final String FLINK_PYTHON2_BINARY_KEY = "python.binary.python2";
-	public static final String FLINK_PYTHON3_BINARY_KEY = "python.binary.python3";
 	public static final String PLANBINDER_CONFIG_BCVAR_COUNT = "PLANBINDER_BCVAR_COUNT";
 	public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = "PLANBINDER_BCVAR_";
-	public static String FLINK_PYTHON2_BINARY_PATH =
-		GlobalConfiguration.loadConfiguration().getString(FLINK_PYTHON2_BINARY_KEY, "python");
-	public static String FLINK_PYTHON3_BINARY_PATH =
-		GlobalConfiguration.loadConfiguration().getString(FLINK_PYTHON3_BINARY_KEY, "python3");
 
 	private static final Random r = new Random();
 
-	public static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "flink_plan";
+	public static final String PLAN_ARGUMENTS_KEY = "python.plan.arguments";
+
 	private static final String FLINK_PYTHON_REL_LOCAL_PATH = File.separator + "resources" + File.separator + "python";
-	private static final String FLINK_DIR = System.getenv("FLINK_ROOT_DIR");
-	private static String FULL_PATH;
 
-	public static StringBuilder arguments = new StringBuilder();
+	private final Configuration operatorConfig;
 
-	public static boolean usePython3 = false;
+	private final String pythonLibraryPath;
 
-	private static String FLINK_HDFS_PATH = "hdfs:/tmp";
-	public static final String FLINK_TMP_DATA_DIR = System.getProperty("java.io.tmpdir") + File.separator + "flink_data";
+	private final String tmpPlanFilesDir;
+	private String tmpDistributedDir;
 
 	private final SetCache sets = new SetCache();
 	public ExecutionEnvironment env;
 	private int currentEnvironmentID = 0;
 	private PythonPlanStreamer streamer;
 
-	public static final int MAPPED_FILE_SIZE = 1024 * 1024 * 64;
-
 	/**
 	 * Entry point for the execution of a python plan.
 	 *
@@ -112,23 +98,37 @@ public class PythonPlanBinder {
 			System.out.println("Usage: ./bin/pyflink<2/3>.[sh/bat] <pathToScript>[ <pathToPackage1>[ <pathToPackageX]][ - <parameter1>[ <parameterX>]]");
 			return;
 		}
-		usePython3 = args[0].equals(ARGUMENT_PYTHON_3);
-		PythonPlanBinder binder = new PythonPlanBinder();
-		binder.runPlan(Arrays.copyOfRange(args, 1, args.length));
-	}
 
-	public PythonPlanBinder() {
-		Configuration conf = GlobalConfiguration.loadConfiguration();
-		FLINK_PYTHON2_BINARY_PATH = conf.getString(FLINK_PYTHON2_BINARY_KEY, "python");
-		FLINK_PYTHON3_BINARY_PATH = conf.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
-		FULL_PATH = FLINK_DIR != null
+		Configuration globalConfig = GlobalConfiguration.loadConfiguration();
+		PythonPlanBinder binder = new PythonPlanBinder(globalConfig);
+		binder.runPlan(args);
+	}
+
+	public PythonPlanBinder(Configuration globalConfig) {
+		String configuredPlanTmpPath = globalConfig.getString(PythonOptions.PLAN_TMP_DIR);
+		tmpPlanFilesDir = configuredPlanTmpPath != null
+			? configuredPlanTmpPath
+			: System.getProperty("java.io.tmpdir") + File.separator + "flink_plan_" + r.nextInt();
+		
+		tmpDistributedDir = globalConfig.getString(PythonOptions.DC_TMP_DIR);
+		
+		String flinkRootDir = System.getenv("FLINK_ROOT_DIR");
+		pythonLibraryPath = flinkRootDir != null
 				//command-line
-				? FLINK_DIR + FLINK_PYTHON_REL_LOCAL_PATH
+				? flinkRootDir + FLINK_PYTHON_REL_LOCAL_PATH
 				//testing
-				: new Path(FileSystem.getLocalFileSystem().getWorkingDirectory(), "src/main/python/org/apache/flink/python/api").toString();
+				: new File(System.getProperty("user.dir"), "src/main/python/org/apache/flink/python/api").getAbsolutePath();
+
+		operatorConfig = new Configuration();
+		operatorConfig.setString(PythonOptions.PYTHON_BINARY_PATH, globalConfig.getString(PythonOptions.PYTHON_BINARY_PATH));
+		String configuredTmpDataDir = globalConfig.getString(PythonOptions.DATA_TMP_DIR);
+		if (configuredTmpDataDir != null) {
+			operatorConfig.setString(PythonOptions.DATA_TMP_DIR, configuredTmpDataDir);
+		}
+		operatorConfig.setLong(PythonOptions.MMAP_FILE_SIZE, globalConfig.getLong(PythonOptions.MMAP_FILE_SIZE));
 	}
 
-	private void runPlan(String[] args) throws Exception {
+	void runPlan(String[] args) throws Exception {
 		int split = 0;
 		for (int x = 0; x < args.length; x++) {
 			if (args[x].compareTo("-") == 0) {
@@ -137,7 +137,7 @@ public class PythonPlanBinder {
 		}
 
 		try {
-			String tmpPath = FLINK_PYTHON_FILE_PATH + r.nextInt();
+			String tmpPath = tmpPlanFilesDir;
 			prepareFiles(tmpPath, Arrays.copyOfRange(args, 0, split == 0 ? args.length : split));
 			startPython(tmpPath, Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length));
 
@@ -145,10 +145,6 @@ public class PythonPlanBinder {
 			while (streamer.preparePlanMode()) {
 				receivePlan();
 
-				if (env instanceof LocalEnvironment) {
-					FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + File.separator + "flink";
-				}
-
 				distributeFiles(tmpPath, env);
 				JobExecutionResult jer = env.execute();
 				sendResult(jer);
@@ -167,8 +163,8 @@ public class PythonPlanBinder {
 	//=====Setup========================================================================================================
 
 	/**
-	 * Copies all files to a common directory (FLINK_PYTHON_FILE_PATH). This allows us to distribute it as one big
-	 * package, and resolves PYTHONPATH issues.
+	 * Copies all files to a common directory {@link PythonOptions#PLAN_TMP_DIR}). This allows us to distribute it as
+	 * one big package which resolves PYTHONPATH issues.
 	 *
 	 * @param filePaths
 	 * @throws IOException
@@ -177,7 +173,7 @@ public class PythonPlanBinder {
 	private void prepareFiles(String tempFilePath, String... filePaths) throws IOException, URISyntaxException {
 		//Flink python package
 		clearPath(tempFilePath);
-		FileCache.copy(new Path(FULL_PATH), new Path(tempFilePath), false);
+		FileCache.copy(new Path(pythonLibraryPath), new Path(tmpPlanFilesDir), false);
 
 		//plan file		
 		copyFile(filePaths[0], tempFilePath, FLINK_PYTHON_PLAN_NAME);
@@ -206,17 +202,21 @@ public class PythonPlanBinder {
 		FileCache.copy(p.makeQualified(FileSystem.get(p.toUri())), new Path(tmpFilePath), true);
 	}
 
-	private static void distributeFiles(String tmpPath, ExecutionEnvironment env) throws IOException, URISyntaxException {
-		clearPath(FLINK_HDFS_PATH);
-		FileCache.copy(new Path(tmpPath), new Path(FLINK_HDFS_PATH), true);
-		env.registerCachedFile(FLINK_HDFS_PATH, FLINK_PYTHON_DC_ID);
+	private void distributeFiles(String tmpPath, ExecutionEnvironment env) throws IOException {
+		clearPath(tmpDistributedDir);
+		FileCache.copy(new Path(tmpPath), new Path(tmpDistributedDir), true);
+		env.registerCachedFile(new Path(tmpDistributedDir).toUri().toString(), FLINK_PYTHON_DC_ID);
 	}
 
 	private void startPython(String tempPath, String[] args) throws IOException {
+		StringBuilder arguments = new StringBuilder();
 		for (String arg : args) {
 			arguments.append(" ").append(arg);
 		}
-		streamer = new PythonPlanStreamer();
+
+		operatorConfig.setString(PLAN_ARGUMENTS_KEY, arguments.toString());
+
+		streamer = new PythonPlanStreamer(operatorConfig);
 		streamer.open(tempPath, arguments.toString());
 	}
 
@@ -227,17 +227,15 @@ public class PythonPlanBinder {
 
 	private void close() {
 		try { //prevent throwing exception so that previous exceptions aren't hidden.
-			FileSystem hdfs = FileSystem.get(new URI(FLINK_HDFS_PATH));
-			hdfs.delete(new Path(FLINK_HDFS_PATH), true);
+			FileSystem hdfs = new Path(tmpDistributedDir).getFileSystem();
+			hdfs.delete(new Path(tmpDistributedDir), true);
 
 			FileSystem local = FileSystem.getLocalFileSystem();
-			local.delete(new Path(FLINK_PYTHON_FILE_PATH), true);
-			local.delete(new Path(FLINK_TMP_DATA_DIR), true);
+			local.delete(new Path(tmpPlanFilesDir), true);
 			streamer.close();
 		} catch (NullPointerException ignored) {
 		} catch (IOException ioe) {
 			LOG.error("PythonAPI file cleanup failed. {}", ioe.getMessage());
-		} catch (URISyntaxException use) { // can't occur
 		}
 	}
 
@@ -271,7 +269,10 @@ public class PythonPlanBinder {
 					env.setParallelism(dop);
 					break;
 				case MODE:
-					FLINK_HDFS_PATH = value.<Boolean>getField(1) ? "file:/tmp/flink" : "hdfs:/tmp/flink";
+					if (value.<Boolean>getField(1)) {
+						LOG.info("Local execution specified, using default for {}.", PythonOptions.DC_TMP_DIR);
+						tmpDistributedDir = PythonOptions.DC_TMP_DIR.defaultValue();
+					}
 					break;
 				case RETRY:
 					int retry = value.<Integer>getField(1);
@@ -527,7 +528,7 @@ public class PythonPlanBinder {
 		DataSet<IN2> op2 = sets.getDataSet(info.otherID);
 		Keys.ExpressionKeys<IN1> key1 = new Keys.ExpressionKeys<>(info.keys1, op1.getType());
 		Keys.ExpressionKeys<IN2> key2 = new Keys.ExpressionKeys<>(info.keys2, op2.getType());
-		PythonCoGroup<IN1, IN2, OUT> pcg = new PythonCoGroup<>(info.envID, info.setID, type);
+		PythonCoGroup<IN1, IN2, OUT> pcg = new PythonCoGroup<>(operatorConfig, info.envID, info.setID, type);
 		sets.add(info.setID, new CoGroupRawOperator<>(op1, op2, key1, key2, pcg, type, info.name).setParallelism(getParallelism(info)));
 	}
 
@@ -552,7 +553,9 @@ public class PythonPlanBinder {
 
 		defaultResult.setParallelism(getParallelism(info));
 		if (info.usesUDF) {
-			sets.add(info.setID, defaultResult.mapPartition(new PythonMapPartition<Tuple2<IN1, IN2>, OUT>(info.envID, info.setID, type)).setParallelism(getParallelism(info)).name(info.name));
+			sets.add(info.setID, defaultResult
+				.mapPartition(new PythonMapPartition<Tuple2<IN1, IN2>, OUT>(operatorConfig, info.envID, info.setID, type))
+				.setParallelism(getParallelism(info)).name(info.name));
 		} else {
 			sets.add(info.setID, defaultResult.name("DefaultCross"));
 		}
@@ -560,12 +563,16 @@ public class PythonPlanBinder {
 
 	private <IN, OUT> void createFilterOperation(PythonOperationInfo info, TypeInformation<OUT> type) {
 		DataSet<IN> op1 = sets.getDataSet(info.parentID);
-		sets.add(info.setID, op1.mapPartition(new PythonMapPartition<IN, OUT>(info.envID, info.setID, type)).setParallelism(getParallelism(info)).name(info.name));
+		sets.add(info.setID, op1
+			.mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type))
+			.setParallelism(getParallelism(info)).name(info.name));
 	}
 
 	private <IN, OUT> void createFlatMapOperation(PythonOperationInfo info, TypeInformation<OUT> type) {
 		DataSet<IN> op1 = sets.getDataSet(info.parentID);
-		sets.add(info.setID, op1.mapPartition(new PythonMapPartition<IN, OUT>(info.envID, info.setID, type)).setParallelism(getParallelism(info)).name(info.name));
+		sets.add(info.setID, op1
+			.mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type))
+			.setParallelism(getParallelism(info)).name(info.name));
 	}
 
 	private void createGroupReduceOperation(PythonOperationInfo info) {
@@ -581,19 +588,22 @@ public class PythonPlanBinder {
 	private <IN, OUT> DataSet<OUT> applyGroupReduceOperation(DataSet<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) {
 		return op1
 			.reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).name("PythonGroupReducePreStep").setParallelism(getParallelism(info))
-			.mapPartition(new PythonMapPartition<IN, OUT>(info.envID, info.setID, type)).setParallelism(getParallelism(info)).name(info.name);
+			.mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type))
+			.setParallelism(getParallelism(info)).name(info.name);
 	}
 
 	private <IN, OUT> DataSet<OUT> applyGroupReduceOperation(UnsortedGrouping<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) {
 		return op1
 			.reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonGroupReducePreStep")
-			.mapPartition(new PythonMapPartition<IN, OUT>(info.envID, info.setID, type)).setParallelism(getParallelism(info)).name(info.name);
+			.mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type))
+			.setParallelism(getParallelism(info)).name(info.name);
 	}
 
 	private <IN, OUT> DataSet<OUT> applyGroupReduceOperation(SortedGrouping<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) {
 		return op1
 			.reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonGroupReducePreStep")
-			.mapPartition(new PythonMapPartition<IN, OUT>(info.envID, info.setID, type)).setParallelism(getParallelism(info)).name(info.name);
+			.mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type))
+			.setParallelism(getParallelism(info)).name(info.name);
 	}
 
 	private <IN1, IN2, OUT> void createJoinOperation(DatasizeHint mode, PythonOperationInfo info, TypeInformation<OUT> type) {
@@ -602,7 +612,8 @@ public class PythonPlanBinder {
 
 		if (info.usesUDF) {
 			sets.add(info.setID, createDefaultJoin(op1, op2, info.keys1, info.keys2, mode, getParallelism(info))
-				.mapPartition(new PythonMapPartition<Tuple2<byte[], byte[]>, OUT>(info.envID, info.setID, type)).setParallelism(getParallelism(info)).name(info.name));
+				.mapPartition(new PythonMapPartition<Tuple2<byte[], byte[]>, OUT>(operatorConfig, info.envID, info.setID, type))
+				.setParallelism(getParallelism(info)).name(info.name));
 		} else {
 			sets.add(info.setID, createDefaultJoin(op1, op2, info.keys1, info.keys2, mode, getParallelism(info)));
 		}
@@ -629,12 +640,16 @@ public class PythonPlanBinder {
 
 	private <IN, OUT> void createMapOperation(PythonOperationInfo info, TypeInformation<OUT> type) {
 		DataSet<IN> op1 = sets.getDataSet(info.parentID);
-		sets.add(info.setID, op1.mapPartition(new PythonMapPartition<IN, OUT>(info.envID, info.setID, type)).setParallelism(getParallelism(info)).name(info.name));
+		sets.add(info.setID, op1
+			.mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type))
+			.setParallelism(getParallelism(info)).name(info.name));
 	}
 
 	private <IN, OUT> void createMapPartitionOperation(PythonOperationInfo info, TypeInformation<OUT> type) {
 		DataSet<IN> op1 = sets.getDataSet(info.parentID);
-		sets.add(info.setID, op1.mapPartition(new PythonMapPartition<IN, OUT>(info.envID, info.setID, type)).setParallelism(getParallelism(info)).name(info.name));
+		sets.add(info.setID, op1
+			.mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type))
+			.setParallelism(getParallelism(info)).name(info.name));
 	}
 
 	private void createReduceOperation(PythonOperationInfo info) {
@@ -650,12 +665,14 @@ public class PythonPlanBinder {
 	private <IN, OUT> DataSet<OUT> applyReduceOperation(DataSet<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) {
 		return op1
 			.reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonReducePreStep")
-			.mapPartition(new PythonMapPartition<IN, OUT>(info.envID, info.setID, type)).setParallelism(getParallelism(info)).name(info.name);
+			.mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type))
+			.setParallelism(getParallelism(info)).name(info.name);
 	}
 
 	private <IN, OUT> DataSet<OUT> applyReduceOperation(UnsortedGrouping<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) {
 		return op1
 			.reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonReducePreStep")
-			.mapPartition(new PythonMapPartition<IN, OUT>(info.envID, info.setID, type)).setParallelism(getParallelism(info)).name(info.name);
+			.mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type))
+			.setParallelism(getParallelism(info)).name(info.name);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
index ff5a8d4..a5e3e75 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
@@ -35,9 +35,9 @@ public class PythonCoGroup<IN1, IN2, OUT> extends RichCoGroupFunction<IN1, IN2,
 	private final PythonDualInputStreamer<IN1, IN2, OUT> streamer;
 	private final transient TypeInformation<OUT> typeInformation;
 
-	public PythonCoGroup(int envID, int setID, TypeInformation<OUT> typeInformation) {
+	public PythonCoGroup(Configuration config, int envID, int setID, TypeInformation<OUT> typeInformation) {
 		this.typeInformation = typeInformation;
-		streamer = new PythonDualInputStreamer<>(this, envID, setID, typeInformation instanceof PrimitiveArrayTypeInfo);
+		streamer = new PythonDualInputStreamer<>(this, config, envID, setID, typeInformation instanceof PrimitiveArrayTypeInfo);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
index 9142581..207ead9 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
@@ -35,9 +35,9 @@ public class PythonMapPartition<IN, OUT> extends RichMapPartitionFunction<IN, OU
 	private final PythonSingleInputStreamer<IN, OUT> streamer;
 	private final transient TypeInformation<OUT> typeInformation;
 
-	public PythonMapPartition(int envId, int setId, TypeInformation<OUT> typeInformation) {
+	public PythonMapPartition(Configuration config, int envId, int setId, TypeInformation<OUT> typeInformation) {
 		this.typeInformation = typeInformation;
-		streamer = new PythonSingleInputStreamer<>(this, envId, setId, typeInformation instanceof PrimitiveArrayTypeInfo);
+		streamer = new PythonSingleInputStreamer<>(this, config, envId, setId, typeInformation instanceof PrimitiveArrayTypeInfo);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java
index 3b8e423..a16f522 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.python.api.streaming.data;
 
+import org.apache.flink.configuration.Configuration;
+
 import java.io.IOException;
 
 /**
@@ -32,6 +34,10 @@ public class PythonDualInputSender<IN1, IN2> extends PythonSender {
 	private transient Serializer<IN1> serializer1;
 	private transient Serializer<IN2> serializer2;
 
+	protected PythonDualInputSender(Configuration config) {
+		super(config);
+	}
+
 	/**
 	 * Extracts records from an iterator and writes them to the memory-mapped file. This method assumes that all values
 	 * in the iterator are of the same type. This method does NOT take care of synchronization. The caller must

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
index 2e9ba2c..b7e8a25 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
@@ -18,6 +18,7 @@
 package org.apache.flink.python.api.streaming.data;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
 
 import java.io.IOException;
@@ -35,8 +36,8 @@ public class PythonDualInputStreamer<IN1, IN2, OUT> extends PythonStreamer<Pytho
 
 	private static final long serialVersionUID = -607175070491761873L;
 
-	public PythonDualInputStreamer(AbstractRichFunction function, int envID, int setID, boolean usesByteArray) {
-		super(function, envID, setID, usesByteArray, new PythonDualInputSender<IN1, IN2>());
+	public PythonDualInputStreamer(AbstractRichFunction function, Configuration config, int envID, int setID, boolean usesByteArray) {
+		super(function, config, envID, setID, usesByteArray, new PythonDualInputSender<IN1, IN2>(config));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
index 838a261..c7c1f7a 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
@@ -20,8 +20,9 @@ import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
-import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
-import static org.apache.flink.python.api.PythonPlanBinder.MAPPED_FILE_SIZE;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.api.PythonOptions;
 import org.apache.flink.util.Collector;
 
 /**
@@ -30,43 +31,40 @@ import org.apache.flink.util.Collector;
 public class PythonReceiver<OUT> implements Serializable {
 	private static final long serialVersionUID = -2474088929850009968L;
 
-	private transient File inputFile;
 	private transient RandomAccessFile inputRAF;
 	private transient FileChannel inputChannel;
 	private transient MappedByteBuffer fileBuffer;
 
+	private final long mappedFileSizeBytes;
+
 	private final boolean readAsByteArray;
 
 	private transient Deserializer<OUT> deserializer;
 
-	public PythonReceiver(boolean usesByteArray) {
+	public PythonReceiver(Configuration config, boolean usesByteArray) {
 		readAsByteArray = usesByteArray;
+		mappedFileSizeBytes = config.getLong(PythonOptions.MMAP_FILE_SIZE) << 10;
 	}
 
 	//=====Setup========================================================================================================
 
 	@SuppressWarnings("unchecked")
-	public void open(String path) throws IOException {
-		setupMappedFile(path);
+	public void open(File inputFile) throws IOException {
 		deserializer = (Deserializer<OUT>) (readAsByteArray ? new ByteArrayDeserializer() : new TupleDeserializer());
-	}
 
-	private void setupMappedFile(String inputFilePath) throws IOException {
-		File x = new File(FLINK_TMP_DATA_DIR);
-		x.mkdirs();
+		inputFile.getParentFile().mkdirs();
 
-		inputFile = new File(inputFilePath);
 		if (inputFile.exists()) {
 			inputFile.delete();
 		}
 		inputFile.createNewFile();
-		inputRAF = new RandomAccessFile(inputFilePath, "rw");
-		inputRAF.setLength(MAPPED_FILE_SIZE);
-		inputRAF.seek(MAPPED_FILE_SIZE - 1);
+		inputRAF = new RandomAccessFile(inputFile, "rw");
+		inputRAF.setLength(mappedFileSizeBytes);
+		inputRAF.seek(mappedFileSizeBytes - 1);
 		inputRAF.writeByte(0);
 		inputRAF.seek(0);
 		inputChannel = inputRAF.getChannel();
-		fileBuffer = inputChannel.map(FileChannel.MapMode.READ_WRITE, 0, MAPPED_FILE_SIZE);
+		fileBuffer = inputChannel.map(FileChannel.MapMode.READ_WRITE, 0, mappedFileSizeBytes);
 	}
 
 	public void close() throws IOException {
@@ -76,7 +74,6 @@ public class PythonReceiver<OUT> implements Serializable {
 	private void closeMappedFile() throws IOException {
 		inputChannel.close();
 		inputRAF.close();
-		inputFile.delete();
 	}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
index 9ada758..3d13271 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
@@ -14,6 +14,8 @@ package org.apache.flink.python.api.streaming.data;
 
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.api.PythonOptions;
 
 import java.io.File;
 import java.io.IOException;
@@ -23,9 +25,6 @@ import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 
-import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
-import static org.apache.flink.python.api.PythonPlanBinder.MAPPED_FILE_SIZE;
-
 /**
  * General-purpose class to write data to memory-mapped files.
  */
@@ -37,32 +36,36 @@ public abstract class PythonSender implements Serializable {
 	public static final byte TYPE_KEY_VALUE = 62;
 	public static final byte TYPE_VALUE_VALUE = 61;
 
-	private transient File outputFile;
 	private transient RandomAccessFile outputRAF;
 	private transient FileChannel outputChannel;
 	private transient MappedByteBuffer fileBuffer;
 
-	//=====Setup========================================================================================================
-	public void open(String path) throws IOException {
-		setupMappedFile(path);
+	private final long mappedFileSizeBytes;
+	
+	private final Configuration config;
+
+	protected PythonSender(Configuration config) {
+		this.config = config;
+		mappedFileSizeBytes = config.getLong(PythonOptions.MMAP_FILE_SIZE) << 10;
 	}
 
-	private void setupMappedFile(String outputFilePath) throws IOException {
-		File x = new File(FLINK_TMP_DATA_DIR);
-		x.mkdirs();
+	//=====Setup========================================================================================================
+	public void open(File outputFile) throws IOException {
+		outputFile.mkdirs();
 
-		outputFile = new File(outputFilePath);
 		if (outputFile.exists()) {
 			outputFile.delete();
 		}
 		outputFile.createNewFile();
-		outputRAF = new RandomAccessFile(outputFilePath, "rw");
-		outputRAF.setLength(MAPPED_FILE_SIZE);
-		outputRAF.seek(MAPPED_FILE_SIZE - 1);
+		outputRAF = new RandomAccessFile(outputFile, "rw");
+
+		
+		outputRAF.setLength(mappedFileSizeBytes);
+		outputRAF.seek(mappedFileSizeBytes - 1);
 		outputRAF.writeByte(0);
 		outputRAF.seek(0);
 		outputChannel = outputRAF.getChannel();
-		fileBuffer = outputChannel.map(FileChannel.MapMode.READ_WRITE, 0, MAPPED_FILE_SIZE);
+		fileBuffer = outputChannel.map(FileChannel.MapMode.READ_WRITE, 0, mappedFileSizeBytes);
 	}
 
 	public void close() throws IOException {
@@ -72,7 +75,6 @@ public abstract class PythonSender implements Serializable {
 	private void closeMappedFile() throws IOException {
 		outputChannel.close();
 		outputRAF.close();
-		outputFile.delete();
 	}
 
 	//=====IO===========================================================================================================
@@ -92,7 +94,7 @@ public abstract class PythonSender implements Serializable {
 		while (input.hasNext()) {
 			IN value = input.next();
 			ByteBuffer bb = serializer.serialize(value);
-			if (bb.remaining() > MAPPED_FILE_SIZE) {
+			if (bb.remaining() > mappedFileSizeBytes) {
 				throw new RuntimeException("Serialized object does not fit into a single buffer.");
 			}
 			if (bb.remaining() <= fileBuffer.remaining()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java
index 42a1799..74d0604 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.python.api.streaming.data;
 
+import org.apache.flink.configuration.Configuration;
+
 import java.io.IOException;
 
 /**
@@ -30,6 +32,10 @@ public class PythonSingleInputSender<IN> extends PythonSender {
 
 	private transient Serializer<IN> serializer;
 
+	protected PythonSingleInputSender(Configuration config) {
+		super(config);
+	}
+
 	/**
 	 * Extracts records from an iterator and writes them to the memory-mapped file. This method assumes that all values
 	 * in the iterator are of the same type. This method does NOT take care of synchronization. The caller must

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
index d013111..e7f018c 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
@@ -18,6 +18,7 @@
 package org.apache.flink.python.api.streaming.data;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
 
 import java.io.IOException;
@@ -33,8 +34,8 @@ public class PythonSingleInputStreamer<IN, OUT> extends PythonStreamer<PythonSin
 
 	private static final long serialVersionUID = -5149905918522069034L;
 
-	public PythonSingleInputStreamer(AbstractRichFunction function, int envID, int setID, boolean usesByteArray) {
-		super(function, envID, setID, usesByteArray, new PythonSingleInputSender<IN>());
+	public PythonSingleInputStreamer(AbstractRichFunction function, Configuration config, int envID, int setID, boolean usesByteArray) {
+		super(function, config, envID, setID, usesByteArray, new PythonSingleInputSender<IN>(config));
 	}
 
 	/**
@@ -83,7 +84,9 @@ public class PythonSingleInputStreamer<IN, OUT> extends PythonStreamer<PythonSin
 				}
 			}
 		} catch (SocketTimeoutException ignored) {
-			throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
+			throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg.get());
+		} catch (Exception e) {
+			throw new RuntimeException("Critical failure. " + msg.get(), e);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 006a1b2..97d5780 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -15,7 +15,7 @@ package org.apache.flink.python.api.streaming.data;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.api.PythonPlanBinder;
+import org.apache.flink.python.api.PythonOptions;
 import org.apache.flink.python.api.streaming.util.SerializationUtils.IntSerializer;
 import org.apache.flink.python.api.streaming.util.SerializationUtils.StringSerializer;
 import org.apache.flink.python.api.streaming.util.StreamPrinter;
@@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.Serializable;
@@ -34,13 +35,11 @@ import java.net.SocketTimeoutException;
 import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
-import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
 import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_DC_ID;
 import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_PLAN_NAME;
-import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
 import static org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_COUNT;
 import static org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_NAME_PREFIX;
+import static org.apache.flink.python.api.PythonPlanBinder.PLAN_ARGUMENTS_KEY;
 
 /**
  * This streamer is used by functions to send/receive data to/from an external python process.
@@ -56,10 +55,9 @@ public class PythonStreamer<S extends PythonSender, OUT> implements Serializable
 	protected static final int SIGNAL_ERROR = -2;
 	protected static final byte SIGNAL_LAST = 32;
 
+	private final Configuration config;
 	private final int envID;
 	private final int setID;
-	private final boolean usePython3;
-	private final String planArguments;
 
 	private transient Process process;
 	private transient Thread shutdownThread;
@@ -79,12 +77,11 @@ public class PythonStreamer<S extends PythonSender, OUT> implements Serializable
 	protected transient Thread outPrinter;
 	protected transient Thread errorPrinter;
 
-	public PythonStreamer(AbstractRichFunction function, int envID, int setID, boolean usesByteArray, S sender) {
+	public PythonStreamer(AbstractRichFunction function, Configuration config, int envID, int setID, boolean usesByteArray, S sender) {
+		this.config = config;
 		this.envID = envID;
 		this.setID = setID;
-		this.usePython3 = PythonPlanBinder.usePython3;
-		planArguments = PythonPlanBinder.arguments.toString();
-		receiver = new PythonReceiver<>(usesByteArray);
+		this.receiver = new PythonReceiver<>(config, usesByteArray);
 		this.function = function;
 		this.sender = sender;
 	}
@@ -101,16 +98,21 @@ public class PythonStreamer<S extends PythonSender, OUT> implements Serializable
 	}
 
 	private void startPython() throws IOException {
-		String outputFilePath = FLINK_TMP_DATA_DIR + "/" + envID + "_" + setID + this.function.getRuntimeContext().getIndexOfThisSubtask() + "output";
-		String inputFilePath = FLINK_TMP_DATA_DIR + "/" + envID + "_" + setID + this.function.getRuntimeContext().getIndexOfThisSubtask() + "input";
+		String tmpDir = config.getString(PythonOptions.DATA_TMP_DIR);
+		if (tmpDir == null) {
+			tmpDir = System.getProperty("java.io.tmpdir");
+		}
+		File outputFile = new File(tmpDir, envID + "_" + setID + this.function.getRuntimeContext().getIndexOfThisSubtask() + "_output");
+		File inputFile = new File(tmpDir, envID + "_" + setID + this.function.getRuntimeContext().getIndexOfThisSubtask() + "_input)");
 
-		sender.open(inputFilePath);
-		receiver.open(outputFilePath);
+		sender.open(inputFile);
+		receiver.open(outputFile);
 
 		String path = function.getRuntimeContext().getDistributedCache().getFile(FLINK_PYTHON_DC_ID).getAbsolutePath();
+
 		String planPath = path + FLINK_PYTHON_PLAN_NAME;
 
-		String pythonBinaryPath = usePython3 ? FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH;
+		String pythonBinaryPath = config.getString(PythonOptions.PYTHON_BINARY_PATH);
 
 		try {
 			Runtime.getRuntime().exec(pythonBinaryPath);
@@ -118,7 +120,7 @@ public class PythonStreamer<S extends PythonSender, OUT> implements Serializable
 			throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary.");
 		}
 
-		process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + planArguments);
+		process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + config.getString(PLAN_ARGUMENTS_KEY, ""));
 		outPrinter = new Thread(new StreamPrinter(process.getInputStream()));
 		outPrinter.start();
 		errorPrinter = new Thread(new StreamPrinter(process.getErrorStream(), msg));
@@ -143,8 +145,9 @@ public class PythonStreamer<S extends PythonSender, OUT> implements Serializable
 		processOutput.write(("" + server.getLocalPort() + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
 		processOutput.write((this.function.getRuntimeContext().getIndexOfThisSubtask() + "\n")
 			.getBytes(ConfigConstants.DEFAULT_CHARSET));
-		processOutput.write((inputFilePath + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
-		processOutput.write((outputFilePath + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
+		processOutput.write(((config.getLong(PythonOptions.MMAP_FILE_SIZE) << 10) + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
+		processOutput.write((inputFile + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
+		processOutput.write((outputFile + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
 		processOutput.flush();
 
 		while (true) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
index 9b62563..9e93dda 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
@@ -13,6 +13,8 @@
 package org.apache.flink.python.api.streaming.plan;
 
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.api.PythonOptions;
 import org.apache.flink.python.api.streaming.util.StreamPrinter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -22,10 +24,7 @@ import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
 
-import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
-import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
 import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_PLAN_NAME;
-import static org.apache.flink.python.api.PythonPlanBinder.usePython3;
 
 /**
  * Generic class to exchange data during the plan phase.
@@ -33,6 +32,7 @@ import static org.apache.flink.python.api.PythonPlanBinder.usePython3;
 public class PythonPlanStreamer {
 
 	protected static final Logger LOG = LoggerFactory.getLogger(PythonPlanStreamer.class);
+	private final Configuration config;
 
 	protected PythonPlanSender sender;
 	protected PythonPlanReceiver receiver;
@@ -40,6 +40,10 @@ public class PythonPlanStreamer {
 	private Process process;
 	private ServerSocket server;
 	private Socket socket;
+	
+	public PythonPlanStreamer(Configuration config) {
+		this.config = config;
+	}
 
 	public Object getRecord() throws IOException {
 		return getRecord(false);
@@ -58,7 +62,7 @@ public class PythonPlanStreamer {
 	}
 
 	private void startPython(String tmpPath, String args) throws IOException {
-		String pythonBinaryPath = usePython3 ? FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH;
+		String pythonBinaryPath = config.getString(PythonOptions.PYTHON_BINARY_PATH);
 
 		try {
 			Runtime.getRuntime().exec(pythonBinaryPath);

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
index 293f5e9..09e6b36 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
@@ -23,8 +23,6 @@ import sys
 PY2 = sys.version_info[0] == 2
 PY3 = sys.version_info[0] == 3
 
-MAPPED_FILE_SIZE = 1024 * 1024 * 64
-
 SIGNAL_REQUEST_BUFFER = b"\x00\x00\x00\x00"
 SIGNAL_REQUEST_BUFFER_G0 = b"\xFF\xFF\xFF\xFD"
 SIGNAL_REQUEST_BUFFER_G1 = b"\xFF\xFF\xFF\xFC"
@@ -67,15 +65,16 @@ class PureTCPConnection(object):
 
 
 class BufferingTCPMappedFileConnection(object):
-    def __init__(self, input_file, output_file, port):
+    def __init__(self, input_file, output_file, mmap_size, port):
         self._input_file = open(input_file, "rb+")
         self._output_file = open(output_file, "rb+")
+        self._mmap_size = mmap_size
         if hasattr(mmap, 'MAP_SHARED'):
-            self._file_input_buffer = mmap.mmap(self._input_file.fileno(), MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_READ)
-            self._file_output_buffer = mmap.mmap(self._output_file.fileno(), MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_WRITE)
+            self._file_input_buffer = mmap.mmap(self._input_file.fileno(), mmap_size, mmap.MAP_SHARED, mmap.ACCESS_READ)
+            self._file_output_buffer = mmap.mmap(self._output_file.fileno(), mmap_size, mmap.MAP_SHARED, mmap.ACCESS_WRITE)
         else:
-            self._file_input_buffer = mmap.mmap(self._input_file.fileno(), MAPPED_FILE_SIZE, None, mmap.ACCESS_READ)
-            self._file_output_buffer = mmap.mmap(self._output_file.fileno(), MAPPED_FILE_SIZE, None, mmap.ACCESS_WRITE)
+            self._file_input_buffer = mmap.mmap(self._input_file.fileno(), mmap_size, None, mmap.ACCESS_READ)
+            self._file_output_buffer = mmap.mmap(self._output_file.fileno(), mmap_size, None, mmap.ACCESS_WRITE)
         self._socket = SOCKET.socket(family=SOCKET.AF_INET, type=SOCKET.SOCK_STREAM)
         self._socket.connect((SOCKET.gethostbyname("localhost"), port))
 
@@ -92,10 +91,10 @@ class BufferingTCPMappedFileConnection(object):
 
     def write(self, msg):
         length = len(msg)
-        if length > MAPPED_FILE_SIZE:
+        if length > self._mmap_size:
             raise Exception("Serialized object does not fit into a single buffer.")
         tmp = self._out_size + length
-        if tmp > MAPPED_FILE_SIZE:
+        if tmp > self._mmap_size:
             self._write_buffer()
             self.write(msg)
         else:
@@ -150,8 +149,8 @@ class BufferingTCPMappedFileConnection(object):
 
 
 class TwinBufferingTCPMappedFileConnection(BufferingTCPMappedFileConnection):
-    def __init__(self, input_file, output_file, port):
-        super(TwinBufferingTCPMappedFileConnection, self).__init__(input_file, output_file, port)
+    def __init__(self, input_file, output_file, mmap_size, port):
+        super(TwinBufferingTCPMappedFileConnection, self).__init__(input_file, output_file, mmap_size, port)
         self._input = [b"", b""]
         self._input_offset = [0, 0]
         self._input_size = [0, 0]

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py
index 83f563b..edd2c61 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py
@@ -25,8 +25,8 @@ class CoGroupFunction(Function.Function):
         self._keys1 = None
         self._keys2 = None
 
-    def _configure(self, input_file, output_file, port, env, info, subtask_index):
-        self._connection = Connection.TwinBufferingTCPMappedFileConnection(input_file, output_file, port)
+    def _configure(self, input_file, output_file, mmap_size, port, env, info, subtask_index):
+        self._connection = Connection.TwinBufferingTCPMappedFileConnection(input_file, output_file, mmap_size, port)
         self._iterator = Iterator.Iterator(self._connection, env, 0)
         self._iterator2 = Iterator.Iterator(self._connection, env, 1)
         self._cgiter = Iterator.CoGroupIterator(self._iterator, self._iterator2, self._keys1, self._keys2)

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
index 45a0f2e..a70a359 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
@@ -32,8 +32,8 @@ class Function(object):
         self.context = None
         self._env = None
 
-    def _configure(self, input_file, output_file, port, env, info, subtask_index):
-        self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
+    def _configure(self, input_file, output_file, mmap_size, port, env, info, subtask_index):
+        self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, mmap_size, port)
         self._iterator = Iterator.Iterator(self._connection, env)
         self._collector = Collector.Collector(self._connection, env, info)
         self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector, subtask_index)

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
index 77b53a2..5a353a1 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
@@ -25,8 +25,8 @@ class GroupReduceFunction(Function.Function):
     def __init__(self):
         super(GroupReduceFunction, self).__init__()
 
-    def _configure(self, input_file, output_file, port, env, info, subtask_index):
-        super(GroupReduceFunction, self)._configure(input_file, output_file, port, env, info, subtask_index)
+    def _configure(self, input_file, output_file, mmap_size, port, env, info, subtask_index):
+        super(GroupReduceFunction, self)._configure(input_file, output_file, mmap_size, port, env, info, subtask_index)
         if len(info.key1) == 0:
             self._run = self._run_all_group_reduce
         else:

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
index 08af276..5acabe0 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
@@ -24,8 +24,8 @@ class ReduceFunction(Function.Function):
     def __init__(self):
         super(ReduceFunction, self).__init__()
 
-    def _configure(self, input_file, output_file, port, env, info, subtask_index):
-        super(ReduceFunction, self)._configure(input_file, output_file, port, env, info, subtask_index)
+    def _configure(self, input_file, output_file, mmap_size, port, env, info, subtask_index):
+        super(ReduceFunction, self)._configure(input_file, output_file, mmap_size, port, env, info, subtask_index)
         if len(info.key1) == 0:
             self._run = self._run_all_reduce
         else:

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index 6e496de..797ae96 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -234,6 +234,7 @@ class Environment(object):
 
                     port = int(sys.stdin.readline().rstrip('\n'))
                     subtask_index = int(sys.stdin.readline().rstrip('\n'))
+                    mmap_size = int(sys.stdin.readline().rstrip('\n'))
                     input_path = sys.stdin.readline().rstrip('\n')
                     output_path = sys.stdin.readline().rstrip('\n')
 
@@ -244,7 +245,7 @@ class Environment(object):
                         if set.id == id:
                             used_set = set
                             operator = set.operator
-                    operator._configure(input_path, output_path, port, self, used_set, subtask_index)
+                    operator._configure(input_path, output_path, mmap_size, port, self, used_set, subtask_index)
                     operator._go()
                     operator._close()
                     sys.stdout.flush()
@@ -252,7 +253,7 @@ class Environment(object):
             except:
                 sys.stdout.flush()
                 sys.stderr.flush()
-                if operator is not None:
+                if operator is not None and operator._connection is not None:
                     operator._connection._socket.send(struct.pack(">i", -2))
                 elif port is not None:
                     socket = SOCKET.socket(family=SOCKET.AF_INET, type=SOCKET.SOCK_STREAM)

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
index d144298..ba8ea78 100644
--- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
+++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
@@ -12,6 +12,7 @@
  */
 package org.apache.flink.python.api;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -21,9 +22,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.flink.python.api.PythonPlanBinder.ARGUMENT_PYTHON_2;
-import static org.apache.flink.python.api.PythonPlanBinder.ARGUMENT_PYTHON_3;
-
 public class PythonPlanBinderTest extends JavaProgramTestBase {
 	
 	@Override
@@ -75,12 +73,16 @@ public class PythonPlanBinderTest extends JavaProgramTestBase {
 		String utils = findUtilsFile();
 		if (isPython2Supported()) {
 			for (String file : findTestFiles()) {
-				PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_2, file, utils});
+				Configuration configuration = new Configuration();
+				config.setString(PythonOptions.PYTHON_BINARY_PATH, "python");
+				new PythonPlanBinder(configuration).runPlan(new String[]{file, utils});
 			}
 		}
 		if (isPython3Supported()) {
 			for (String file : findTestFiles()) {
-				PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_3, file, utils});
+				Configuration configuration = new Configuration();
+				config.setString(PythonOptions.PYTHON_BINARY_PATH, "python3");
+				new PythonPlanBinder(configuration).runPlan(new String[]{file, utils});
 			}
 		}
 	}


Mime
View raw message