flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject flink git commit: [FLINK-3165] [py] Windows OS support
Date Tue, 19 Jan 2016 17:09:55 GMT
Repository: flink
Updated Branches:
  refs/heads/master 018f1fee9 -> 0ac5d4020


[FLINK-3165] [py] Windows OS support

This closes #1454


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ac5d402
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ac5d402
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ac5d402

Branch: refs/heads/master
Commit: 0ac5d4020d9aa441d7c8d439280d4ea2d01bcade
Parents: 018f1fe
Author: zentol <chesnay@apache.org>
Authored: Tue Dec 15 09:38:04 2015 +0100
Committer: zentol <s.motsu@web.de>
Committed: Tue Jan 19 18:09:36 2016 +0100

----------------------------------------------------------------------
 flink-dist/src/main/flink-bin/bin/pyflink2.bat  | 25 +++++++++
 flink-dist/src/main/flink-bin/bin/pyflink3.bat  | 25 +++++++++
 .../flink/python/api/PythonPlanBinder.java      | 58 +++++++++++---------
 .../python/api/flink/connection/Connection.py   | 40 ++++++++++++--
 .../python/api/flink/functions/Function.py      |  1 +
 .../flink/python/api/flink/plan/Environment.py  |  8 +++
 6 files changed, 125 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0ac5d402/flink-dist/src/main/flink-bin/bin/pyflink2.bat
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/pyflink2.bat b/flink-dist/src/main/flink-bin/bin/pyflink2.bat
new file mode 100644
index 0000000..9d94a69
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/pyflink2.bat
@@ -0,0 +1,25 @@
+::###############################################################################
+::  Licensed to the Apache Software Foundation (ASF) under one
+::  or more contributor license agreements.  See the NOTICE file
+::  distributed with this work for additional information
+::  regarding copyright ownership.  The ASF licenses this file
+::  to you under the Apache License, Version 2.0 (the
+::  "License"); you may not use this file except in compliance
+::  with the License.  You may obtain a copy of the License at
+::
+::      http://www.apache.org/licenses/LICENSE-2.0
+::
+::  Unless required by applicable law or agreed to in writing, software
+::  distributed under the License is distributed on an "AS IS" BASIS,
+::  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+::  See the License for the specific language governing permissions and
+:: limitations under the License.
+::###############################################################################
+
+@echo off
+setlocal EnableDelayedExpansion
+
+SET bin=%~dp0
+SET FLINK_ROOT_DIR=%bin%..
+
+%FLINK_ROOT_DIR%\bin\flink run -v %FLINK_ROOT_DIR%\lib\flink-python*.jar 2 %*
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac5d402/flink-dist/src/main/flink-bin/bin/pyflink3.bat
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/pyflink3.bat b/flink-dist/src/main/flink-bin/bin/pyflink3.bat
new file mode 100644
index 0000000..43c9384
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/pyflink3.bat
@@ -0,0 +1,25 @@
+::###############################################################################
+::  Licensed to the Apache Software Foundation (ASF) under one
+::  or more contributor license agreements.  See the NOTICE file
+::  distributed with this work for additional information
+::  regarding copyright ownership.  The ASF licenses this file
+::  to you under the Apache License, Version 2.0 (the
+::  "License"); you may not use this file except in compliance
+::  with the License.  You may obtain a copy of the License at
+::
+::      http://www.apache.org/licenses/LICENSE-2.0
+::
+::  Unless required by applicable law or agreed to in writing, software
+::  distributed under the License is distributed on an "AS IS" BASIS,
+::  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+::  See the License for the specific language governing permissions and
+:: limitations under the License.
+::###############################################################################
+
+@echo off
+setlocal EnableDelayedExpansion
+
+SET bin=%~dp0
+SET FLINK_ROOT_DIR=%bin%..
+
+%FLINK_ROOT_DIR%\bin\flink run -v %FLINK_ROOT_DIR%\lib\flink-python*.jar 3 %*
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac5d402/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index a27a589..7c74054 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -12,11 +12,13 @@
  */
 package org.apache.flink.python.api;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Random;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.LocalEnvironment;
@@ -63,7 +65,7 @@ public class PythonPlanBinder {
 	public static final String ARGUMENT_PYTHON_3 = "3";
 
 	public static final String FLINK_PYTHON_DC_ID = "flink";
-	public static final String FLINK_PYTHON_PLAN_NAME = "/plan.py";
+	public static final String FLINK_PYTHON_PLAN_NAME = File.separator + "plan.py";
 
 	public static final String FLINK_PYTHON2_BINARY_KEY = "python.binary.python2";
 	public static final String FLINK_PYTHON3_BINARY_KEY = "python.binary.python3";
@@ -72,8 +74,10 @@ public class PythonPlanBinder {
 	public static String FLINK_PYTHON2_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY,
"python");
 	public static String FLINK_PYTHON3_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY,
"python3");
 
-	private static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir")
+ "/flink_plan";
-	private static final String FLINK_PYTHON_REL_LOCAL_PATH = "/resources/python";
+	private static final Random r = new Random();
+
+	private static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir")
+ File.separator + "flink_plan";
+	private static final String FLINK_PYTHON_REL_LOCAL_PATH = File.separator + "resources" +
File.separator + "python";
 	private static final String FLINK_DIR = System.getenv("FLINK_ROOT_DIR");
 	private static String FULL_PATH;
 
@@ -84,7 +88,7 @@ public class PythonPlanBinder {
 	public static boolean usePython3 = false;
 
 	private static String FLINK_HDFS_PATH = "hdfs:/tmp";
-	public static final String FLINK_TMP_DATA_DIR = System.getProperty("java.io.tmpdir") + "/flink_data";
+	public static final String FLINK_TMP_DATA_DIR = System.getProperty("java.io.tmpdir") + File.separator
+ "flink_data";
 
 	public static boolean DEBUG = false;
 
@@ -102,7 +106,7 @@ public class PythonPlanBinder {
 	 */
 	public static void main(String[] args) throws Exception {
 		if (args.length < 2) {
-			System.out.println("Usage: ./bin/pyflink<2/3>.sh <pathToScript>[ <pathToPackage1>[
<pathToPackageX]][ - <parameter1>[ <parameterX>]]");
+			System.out.println("Usage: ./bin/pyflink<2/3>.[sh/bat] <pathToScript>[ <pathToPackage1>[
<pathToPackageX]][ - <parameter1>[ <parameterX>]]");
 			return;
 		}
 		usePython3 = args[0].equals(ARGUMENT_PYTHON_3);
@@ -114,9 +118,10 @@ public class PythonPlanBinder {
 		FLINK_PYTHON2_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python");
 		FLINK_PYTHON3_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
 		FULL_PATH = FLINK_DIR != null
-				? FLINK_DIR + FLINK_PYTHON_REL_LOCAL_PATH //command-line
-				: FileSystem.getLocalFileSystem().getWorkingDirectory().toString() //testing
-				+ "/src/main/python/org/apache/flink/python/api";
+				//command-line
+				? FLINK_DIR + FLINK_PYTHON_REL_LOCAL_PATH
+				//testing
+				: new Path(FileSystem.getLocalFileSystem().getWorkingDirectory(), "src/main/python/org/apache/flink/python/api").toString();
 	}
 
 	private void runPlan(String[] args) throws Exception {
@@ -130,15 +135,16 @@ public class PythonPlanBinder {
 		}
 
 		try {
-			prepareFiles(Arrays.copyOfRange(args, 0, split == 0 ? 1 : split));
-			startPython(Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length));
+			String tmpPath = FLINK_PYTHON_FILE_PATH + r.nextInt();
+			prepareFiles(tmpPath, Arrays.copyOfRange(args, 0, split == 0 ? 1 : split));
+			startPython(tmpPath, Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length));
 			receivePlan();
 
 			if (env instanceof LocalEnvironment) {
-				FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + "/flink";
+				FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + File.separator + "flink";
 			}
 
-			distributeFiles(env);
+			distributeFiles(tmpPath, env);
 			env.execute();
 			close();
 		} catch (Exception e) {
@@ -156,52 +162,52 @@ public class PythonPlanBinder {
 	 * @throws IOException
 	 * @throws URISyntaxException
 	 */
-	private void prepareFiles(String... filePaths) throws IOException, URISyntaxException {
+	private void prepareFiles(String tempFilePath, String... filePaths) throws IOException,
URISyntaxException {
 		//Flink python package
-		String tempFilePath = FLINK_PYTHON_FILE_PATH;
 		clearPath(tempFilePath);
 		FileCache.copy(new Path(FULL_PATH), new Path(tempFilePath), false);
 
 		//plan file		
-		copyFile(filePaths[0], FLINK_PYTHON_PLAN_NAME);
+		copyFile(filePaths[0], tempFilePath, FLINK_PYTHON_PLAN_NAME);
 
 		//additional files/folders
 		for (int x = 1; x < filePaths.length; x++) {
-			copyFile(filePaths[x], null);
+			copyFile(filePaths[x], tempFilePath, null);
 		}
 	}
 
 	private static void clearPath(String path) throws IOException, URISyntaxException {
-		FileSystem fs = FileSystem.get(new URI(path));
+		FileSystem fs = FileSystem.get(new Path(path).toUri());
 		if (fs.exists(new Path(path))) {
 			fs.delete(new Path(path), true);
 		}
 	}
 
-	private static void copyFile(String path, String name) throws IOException, URISyntaxException
{
+	private static void copyFile(String path, String target, String name) throws IOException,
URISyntaxException {
 		if (path.endsWith("/")) {
 			path = path.substring(0, path.length() - 1);
 		}
 		String identifier = name == null ? path.substring(path.lastIndexOf("/")) : name;
-		String tmpFilePath = FLINK_PYTHON_FILE_PATH + "/" + identifier;
+		String tmpFilePath = target + "/" + identifier;
 		clearPath(tmpFilePath);
 		Path p = new Path(path);
 		FileCache.copy(p.makeQualified(FileSystem.get(p.toUri())), new Path(tmpFilePath), true);
 	}
 
-	private static void distributeFiles(ExecutionEnvironment env) throws IOException, URISyntaxException
{
+	private static void distributeFiles(String tmpPath, ExecutionEnvironment env) throws IOException,
URISyntaxException {
 		clearPath(FLINK_HDFS_PATH);
-		FileCache.copy(new Path(FLINK_PYTHON_FILE_PATH), new Path(FLINK_HDFS_PATH), true);
+		FileCache.copy(new Path(tmpPath), new Path(FLINK_HDFS_PATH), true);
 		env.registerCachedFile(FLINK_HDFS_PATH, FLINK_PYTHON_DC_ID);
-		clearPath(FLINK_PYTHON_FILE_PATH);
+		clearPath(tmpPath);
 	}
 
-	private void startPython(String[] args) throws IOException {
+	private void startPython(String tempPath, String[] args) throws IOException {
 		for (String arg : args) {
 			arguments.append(" ").append(arg);
 		}
+		String mappedFilePath = FLINK_TMP_DATA_DIR + "/output" + r.nextInt();
 		receiver = new Receiver(null);
-		receiver.open(FLINK_TMP_DATA_DIR + "/output");
+		receiver.open(mappedFilePath);
 
 		String pythonBinaryPath = usePython3 ? FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH;
 
@@ -210,7 +216,7 @@ public class PythonPlanBinder {
 		} catch (IOException ex) {
 			throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary.");
 		}
-		process = Runtime.getRuntime().exec(pythonBinaryPath + " -B " + FLINK_PYTHON_FILE_PATH
+ FLINK_PYTHON_PLAN_NAME + arguments.toString());
+		process = Runtime.getRuntime().exec(pythonBinaryPath + " -B " + tempPath + FLINK_PYTHON_PLAN_NAME
+ arguments.toString());
 
 		new StreamPrinter(process.getInputStream()).start();
 		new StreamPrinter(process.getErrorStream()).start();
@@ -232,7 +238,7 @@ public class PythonPlanBinder {
 		}
 
 		process.getOutputStream().write("plan\n".getBytes());
-		process.getOutputStream().write((FLINK_TMP_DATA_DIR + "/output\n").getBytes());
+		process.getOutputStream().write((mappedFilePath + "\n").getBytes());
 		process.getOutputStream().flush();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac5d402/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
index 988bf25..143cea7 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
@@ -36,10 +36,28 @@ else:
     SIGNAL_WAS_LAST = 32
 
 
+def recv_all(socket, toread):
+    initial = socket.recv(toread)
+    bytes_read = len(initial)
+    if bytes_read == toread:
+        return initial
+    else:
+        bits = [initial]
+        toread = toread - bytes_read
+        while toread:
+            bit = socket.recv(toread)
+            bits.append(bit)
+            toread = toread - len(bit)
+        return b"".join(bits)
+
+
 class OneWayBusyBufferingMappedFileConnection(object):
     def __init__(self, output_path):
         self._output_file = open(output_path, "rb+")
-        self._file_output_buffer = mmap.mmap(self._output_file.fileno(), MAPPED_FILE_SIZE,
mmap.MAP_SHARED, mmap.ACCESS_WRITE)
+        if hasattr(mmap, 'MAP_SHARED'):
+            self._file_output_buffer = mmap.mmap(self._output_file.fileno(), MAPPED_FILE_SIZE,
mmap.MAP_SHARED, mmap.ACCESS_WRITE)
+        else:
+            self._file_output_buffer = mmap.mmap(self._output_file.fileno(), MAPPED_FILE_SIZE,
None, mmap.ACCESS_WRITE)
 
         self._out = deque()
         self._out_size = 0
@@ -58,13 +76,20 @@ class OneWayBusyBufferingMappedFileConnection(object):
         self._file_output_buffer.seek(0, 0)
         self._file_output_buffer.write(b'\x01')
 
+    def close(self):
+        self._file_output_buffer.close()
+
 
 class BufferingTCPMappedFileConnection(object):
     def __init__(self, input_file, output_file, port):
         self._input_file = open(input_file, "rb+")
         self._output_file = open(output_file, "rb+")
-        self._file_input_buffer = mmap.mmap(self._input_file.fileno(), MAPPED_FILE_SIZE,
mmap.MAP_SHARED, mmap.ACCESS_READ)
-        self._file_output_buffer = mmap.mmap(self._output_file.fileno(), MAPPED_FILE_SIZE,
mmap.MAP_SHARED, mmap.ACCESS_WRITE)
+        if hasattr(mmap, 'MAP_SHARED'):
+            self._file_input_buffer = mmap.mmap(self._input_file.fileno(), MAPPED_FILE_SIZE,
mmap.MAP_SHARED, mmap.ACCESS_READ)
+            self._file_output_buffer = mmap.mmap(self._output_file.fileno(), MAPPED_FILE_SIZE,
mmap.MAP_SHARED, mmap.ACCESS_WRITE)
+        else:
+            self._file_input_buffer = mmap.mmap(self._input_file.fileno(), MAPPED_FILE_SIZE,
None, mmap.ACCESS_READ)
+            self._file_output_buffer = mmap.mmap(self._output_file.fileno(), MAPPED_FILE_SIZE,
None, mmap.ACCESS_WRITE)
         self._socket = SOCKET.socket(family=SOCKET.AF_INET, type=SOCKET.SOCK_STREAM)
         self._socket.connect((SOCKET.gethostbyname("localhost"), port))
 
@@ -76,6 +101,9 @@ class BufferingTCPMappedFileConnection(object):
         self._input_size = 0
         self._was_last = False
 
+    def close(self):
+        self._socket.close()
+
     def write(self, msg):
         length = len(msg)
         if length > MAPPED_FILE_SIZE:
@@ -94,7 +122,7 @@ class BufferingTCPMappedFileConnection(object):
         self._socket.send(pack(">i", self._out_size))
         self._out.clear()
         self._out_size = 0
-        self._socket.recv(1, SOCKET.MSG_WAITALL)
+        recv_all(self._socket, 1)
 
     def read(self, des_size, ignored=None):
         if self._input_size == self._input_offset:
@@ -107,7 +135,7 @@ class BufferingTCPMappedFileConnection(object):
         self._socket.send(SIGNAL_REQUEST_BUFFER)
         self._file_input_buffer.seek(0, 0)
         self._input_offset = 0
-        meta_size = self._socket.recv(5, SOCKET.MSG_WAITALL)
+        meta_size = recv_all(self._socket, 5)
         self._input_size = unpack(">I", meta_size[:4])[0]
         self._was_last = meta_size[4] == SIGNAL_WAS_LAST
         self._input = self._file_input_buffer.read(self._input_size)
@@ -149,7 +177,7 @@ class TwinBufferingTCPMappedFileConnection(BufferingTCPMappedFileConnection):
             self._socket.send(SIGNAL_REQUEST_BUFFER_G0)
         self._file_input_buffer.seek(0, 0)
         self._input_offset[group] = 0
-        meta_size = self._socket.recv(5, SOCKET.MSG_WAITALL)
+        meta_size = recv_all(self._socket, 5)
         self._input_size[group] = unpack(">I", meta_size[:4])[0]
         self._was_last[group] = meta_size[4] == SIGNAL_WAS_LAST
         self._input[group] = self._file_input_buffer.read(self._input_size[group])

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac5d402/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
index 4bf8b3a..f874a25 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
@@ -59,6 +59,7 @@ class Function(object):
 
     def _close(self):
         self._collector._close()
+        self._connection.close()
 
     def _go(self):
         self._receive_broadcast_variables()

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac5d402/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index 169e31b..865487d 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -21,6 +21,7 @@ from flink.plan.DataSet import DataSet
 from flink.plan.Constants import _Identifier
 from flink.plan.OperationInfo import OperationInfo
 from flink.utilities import Switch
+import socket as SOCKET
 import copy
 import sys
 from struct import pack
@@ -160,6 +161,7 @@ class Environment(object):
             self._collector = Collector.TypedCollector(self._connection, self)
             self._send_plan()
             self._connection._write_buffer()
+            self._connection.close()
         else:
             import struct
             operator = None
@@ -178,6 +180,7 @@ class Environment(object):
                         operator = set.combineop
                 operator._configure(input_path, output_path, port, self)
                 operator._go()
+                operator._close()
                 sys.stdout.flush()
                 sys.stderr.flush()
             except:
@@ -185,6 +188,11 @@ class Environment(object):
                 sys.stderr.flush()
                 if operator is not None:
                     operator._connection._socket.send(struct.pack(">i", -2))
+                else:
+                    socket = SOCKET.socket(family=SOCKET.AF_INET, type=SOCKET.SOCK_STREAM)
+                    socket.connect((SOCKET.gethostbyname("localhost"), port))
+                    socket.send(struct.pack(">i", -2))
+                    socket.close()
                 raise
 
     def _optimize_plan(self):


Mime
View raw message