From issues-return-148846-archive-asf-public=cust-asf.ponee.io@flink.apache.org Mon Jan 22 15:50:39 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 8A91F180609 for ; Mon, 22 Jan 2018 15:50:39 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 79879160C4B; Mon, 22 Jan 2018 14:50:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 73264160C3A for ; Mon, 22 Jan 2018 15:50:38 +0100 (CET) Received: (qmail 85195 invoked by uid 500); 22 Jan 2018 14:50:37 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 85186 invoked by uid 99); 22 Jan 2018 14:50:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Jan 2018 14:50:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8466FE04DB; Mon, 22 Jan 2018 14:50:37 +0000 (UTC) From: zentol To: issues@flink.apache.org Reply-To: issues@flink.apache.org Message-ID: Subject: [GitHub] flink pull request #5333: Review5886 Content-Type: text/plain Date: Mon, 22 Jan 2018 14:50:37 +0000 (UTC) GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5333 Review5886 This PR is an extension of #3838 resolving all issues that i found during the review. The change log below is roughly grouped into categories to provide a better overview. Change log: General: - rebase branch to current master - incremented version to 1.5-SNAPSHOT - fixed kafka-connector dependency declaration - set to provided - scala version set to scala.binary.version - flink version set to project.version - applied checkstyle - disabled method/parameter name rules for API classes - assigned flink-python-streaming to 'libraries' travis profile - copy streaming-python jar to /opt - change the name of the final jar to flink-streaming-python (previously flink-python) - replace maven-jar-plugin with maven-shade-plugin API: - PDS#map()/flat_map() now return PythonSingleOutputStreamOperator - renamed PDS#print() to PDS#output() - print is a keyword in python and thus not usable in native python APIs - added PythonSingleOutputStreamOperator#name() - removed env#execute methods that accepted local execution argument as they are redundant due to environment factory methods - narrow visibility of *DataStream constructors Moved/Renamed: - made SerializerMap top-level class and renamed it to AdapterMap - Moved UtilityFunctions#adapt to AdapterMap class - renamed UtilityFunctions to InterpreterUtils - moved PythonobjectInputStream2 to SerializationUtils - renamed PythonObjectInputStream2 to SerialVersionOverridingPythonObjectInputStream Jython: - renamed InterpreterUtils#smartFunctionDeserialization to deserializeFunction - added generic return type to #deserializeFunction - #deserializeFunction uses static initialization flag to detect whether it has to load jython instead of waiting for exception to happen - removed file cleanup in #initAndExecPythonScript as it is the binders' responsibility Connectors: - replaced usage of deprecated serialiation schema interfaces - P(S/D)Schema#(de)serialize now fails with RuntimeException if schema deserialization fails - remove kafka code - not really tested, and I'd rather tackle connector support in a follow-up Functions: - Introduced AbstractPythonUDF class for sharing RichRunction#open()/close() implementations - PythonOutputSelector now throws FlinkRuntimeException when failing during initialization - added generic return type to Serializationutils#deserializeObject - added new serializers for PyBoolean/-Float/-Integer/-Long/-String - PyObjectSerializer not properly fails when an exceptioin occurs - improved error printing - PythonCollector now typed to Object and properly converts non-PyObjects - jython functions that use a collector now have Object has output type - otherwise you would get ClassCastException if jython returns something that isn't a PyObject PythonStreamBinder - adjusted to follow PythonPlanBinder structure - client-like main() exception handling - replaced Random usage with UUID.randomUIID() - now loads GlobalConfiguration - local/distributed tmp dir now configurable - introduced PythonOptions - no longer generate plan.py but instead import it directly via the PythonInterpreter Environment: - Reworked static environment factory methods from PythonStreamExecutionEnvironment into a PythonEnvironmentFactory - program main() method now accepts a PythonEnvironmentFactory - directories are now passed properly to the environment instead of using static fields - removed PythonEnvironmentConfig . #registerJythonSerializers now static Examples: - move examples to flink-streaming-python - change examples location in dist to examples/python/streaming - replace ParameterTool usage with argparse - pass arguments via run instead of constructor - remove 'if __name__ == '__main__':' block - remove exception wrapping around source/sink creation - add WordCount example Tests: - removed 'if __name__ == '__main__':' blocks from tests since the condition is never fulfilled - removed python TestBase class - removed print statements from tests - standardized test job names - cleaned up PythonStreamBinderTest / made it more consistent with PythonPlanBinderTest - run_all_tests improvements - stop after first failure - print stacktrace on failure - no longer relies on dirname() to get cwd but uses the module file location instead - added log4j properties file - added end-to-end test You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink review5886 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5333.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5333 ---- commit a19c960b0d888926eac21287b1363e9cc3b77ce4 Author: Zohar Mizrahi Date: 2016-11-15T12:46:36Z [FLINK-5886] Python API for streaming applications commit 82e92c8b4898c224f8ea7ea9333056a971a0a3f4 Author: Zohar Mizrahi Date: 2017-05-09T06:28:39Z [FLINK-5886] Improve 'fibonacci' example 1) Descrease the number of source samples, so the program finishes earlier 2) Add support for 'local' and 'cluster' execution modes (determined by input argument): 'cluster' (default) - pyflink-stream.sh fibonacci.py 'local' - pyflink-stream.sh fibonacci.py - --local commit fae67be3b9baf1028dc3d3c99b5d949e65fac813 Author: Zohar Mizrahi Date: 2017-05-22T22:52:55Z [FLINK-5886] Apply fixes following a pull request review commit b528ec17124528396d235b244511813bd7e099c2 Author: Zohar Mizrahi Date: 2017-05-23T17:36:00Z [FLINK-5886] Apply additional fix following code review commit 801d60a7b04b9ef6d6899ffc2ba08a73ff123cab Author: zentol Date: 2018-01-15T12:03:42Z Various refactorings Changelog: General: - rebase branch to current master - incremented version to 1.5-SNAPSHOT - fixed kafka-connector dependency declaration - set to provided - scala version set to scala.binary.version - flink version set to project.version - applied checkstyle - disabled method/parameter name rules for API classes - assigned flink-python-streaming to 'libraries' travis profile - copy streaming-python jar to /opt - change the name of the final jar to flink-streaming-python (previously flink-python) - replace maven-jar-plugin with maven-shade-plugin API: - PDS#map()/flat_map() now return PythonSingleOutputStreamOperator - renamed PDS#print() to PDS#output() - print is a keyword in python and thus not usable in native python APIs - added PythonSingleOutputStreamOperator#name() - removed env#execute methods that accepted local execution argument as they are redundant due to environment factory methods - narrow visibility of *DataStream constructors Moved/Renamed: - made SerializerMap top-level class and renamed it to AdapterMap - Moved UtilityFunctions#adapt to AdapterMap class - renamed UtilityFunctions to InterpreterUtils - moved PythonobjectInputStream2 to SerializationUtils - renamed PythonObjectInputStream2 to SerialVersionOverridingPythonObjectInputStream Jython: - renamed InterpreterUtils#smartFunctionDeserialization to deserializeFunction - added generic return type to #deserializeFunction - #deserializeFunction uses static initialization flag to detect whether it has to load jython instead of waiting for exception to happen - removed file cleanup in #initAndExecPythonScript as it is the binders' responsibility Connectors: - replaced usage of deprecated serialiation schema interfaces - P(S/D)Schema#(de)serialize now fails with RuntimeException if schema deserialization fails Functions: - Introduced AbstractPythonUDF class for sharing RichRunction#open()/close() implementations - PythonOutputSelector now throws FlinkRuntimeException when failing during initialization - added generic return type to Serializationutils#deserializeObject - added new serializers for PyBoolean/-Float/-Integer/-Long/-String - PyObjectSerializer not properly fails when an exceptioin occurs - improved error printing - PythonCollector now typed to Object and properly converts non-PyObjects - jython functions that use a collector now have Object has output type - otherwise you would get ClassCastException if jython returns something that isn't a PyObject PythonStreamBinder - adjusted to follow PythonPlanBinder structure - client-like main() exception handling - replaced Random usage with UUID.randomUIID() - now loads GlobalConfiguration - local/distributed tmp dir now configurable - introduced PythonOptions - no longer generate plan.py but instead import it directly via the PythonInterpreter Environment: - Reworked static environment factory methods from PythonStreamExecutionEnvironment into a PythonEnvironmentFactory - program main() method now accepts a PythonEnvironmentFactory - directories are now passed properly to the environment instead of using static fields - removed PythonEnvironmentConfig . #registerJythonSerializers now static Examples: - move examples to flink-streaming-python - change examples location in dist to examples/python/streaming - replace ParameterTool usage with argparse - pass arguments via run instead of constructor - remove 'if __name__ == '__main__':' block - remove exception wrapping around source/sink creation - add WordCount example Tests: - removed 'if __name__ == '__main__':' blocks from tests since the condition is never fulfilled - removed python TestBase class - removed print statements from tests - standardized test job names - cleaned up PythonStreamBinderTest / made it more consistent with PythonPlanBinderTest - run_all_tests improvements - stop after first failure - print stacktrace on failure - no longer relies on dirname() to get cwd but uses the module file location instead - added log4j properties file - added end-to-end test commit d928e02b62bed798882bc7cb33badb5fab78f71c Author: zentol Date: 2018-01-22T12:20:18Z remove kafka code ---- ---