pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sanjee...@apache.org
Subject [pulsar] branch master updated: Add support for running python functions with wheel file (#2593)
Date Mon, 24 Sep 2018 18:30:34 GMT
This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 200f97a  Add support for running python functions with wheel file (#2593)
200f97a is described below

commit 200f97a54cea8d68b4a8506804cfef3ea56e74f1
Author: Sanjeev Kulkarni <sanjeevrk@gmail.com>
AuthorDate: Mon Sep 24 11:30:29 2018 -0700

    Add support for running python functions with wheel file (#2593)
    
    * Added support for accepting Python Wheel packages
    
    * Preserve filename of the original function submission
    
    * Fix bug
    
    * Fixed bugs
    
    * Fixed bug
    
    * Made process runtime do pip install
    
    * Generated proto generated
    
    * remove unnesessary change
    
    * Redid the work in python
    
    * User pipmain
    
    * Fix indentation error
    
    * pipmain muddles logs so revert to subprocess
    
    * Added licence header
    
    * [build] Fix docker organization parameter
    
    *Motivation*
    
    docker orgnization is missing for building test image. so the build will be failing with
`-Pdocker`.
    
    *Changes*
    
    Move the docker organization parameter to root pom file.
    
    * Use virtualenv to isolate wheel installations
    
    * Fix bug
    
    * Updated python generated files
    
    * Reverted using virtualenv, now unzip the wheel file and add it to the path
    
    * Removed unused var
    
    * Removed wheel as a runtime
    
    * Removed unzip dependency
---
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  |   2 +-
 .../instance/src/main/python/Function_pb2.py       | 119 ++++++++++++++++-----
 .../src/main/python/python_instance_main.py        |   8 +-
 pulsar-functions/instance/src/main/python/util.py  |   2 +
 4 files changed, 101 insertions(+), 30 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 5284b59..b176fe1 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -226,7 +226,7 @@ public class CmdFunctions extends CmdBase {
         protected String jarFile;
         @Parameter(
                 names = "--py",
-                description = "Path to the main Python file for the function (if the function
is written in Python)",
+                description = "Path to the main Python file/Python Wheel file for the function
(if the function is written in Python)",
                 listConverter = StringConverter.class)
         protected String pyFile;
         @Parameter(names = { "-i",
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py b/pulsar-functions/instance/src/main/python/Function_pb2.py
index c6dbd99..b09e105 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -39,7 +39,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   name='Function.proto',
   package='proto',
   syntax='proto3',
-  serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01
\x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"\xa9\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01
\x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04
\x01(\t\x12\x10\n\x08logTopic\x18\x05 \x01(\t\x12\x39\n\x14processingGuarantees\x18\x06 \x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12\
[...]
+  serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01
\x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01
\x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\xd4\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01
\x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04
\x01(\t\x1 [...]
 )
 
 _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
@@ -63,8 +63,8 @@ _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=1583,
-  serialized_end=1662,
+  serialized_start=1724,
+  serialized_end=1803,
 )
 _sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES)
 
@@ -86,8 +86,8 @@ _SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=1664,
-  serialized_end=1708,
+  serialized_start=1805,
+  serialized_end=1849,
 )
 _sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE)
 
@@ -116,8 +116,8 @@ _FUNCTIONDETAILS_RUNTIME = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=473,
-  serialized_end=504,
+  serialized_start=584,
+  serialized_end=615,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_RUNTIME)
 
@@ -167,6 +167,44 @@ _RESOURCES = _descriptor.Descriptor(
 )
 
 
+_RETRYDETAILS = _descriptor.Descriptor(
+  name='RetryDetails',
+  full_name='proto.RetryDetails',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='maxMessageRetries', full_name='proto.RetryDetails.maxMessageRetries', index=0,
+      number=1, type=5, cpp_type=1, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='deadLetterTopic', full_name='proto.RetryDetails.deadLetterTopic', index=1,
+      number=2, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=78,
+  serialized_end=144,
+)
+
+
 _FUNCTIONDETAILS = _descriptor.Descriptor(
   name='FunctionDetails',
   full_name='proto.FunctionDetails',
@@ -272,6 +310,13 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='retryDetails', full_name='proto.FunctionDetails.retryDetails', index=14,
+      number=15, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
@@ -285,8 +330,8 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=79,
-  serialized_end=504,
+  serialized_start=147,
+  serialized_end=615,
 )
 
 
@@ -330,8 +375,8 @@ _CONSUMERSPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=506,
-  serialized_end=588,
+  serialized_start=617,
+  serialized_end=699,
 )
 
 
@@ -368,8 +413,8 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=938,
-  serialized_end=999,
+  serialized_start=1053,
+  serialized_end=1114,
 )
 
 _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
@@ -405,8 +450,8 @@ _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1001,
-  serialized_end=1071,
+  serialized_start=1116,
+  serialized_end=1186,
 )
 
 _SOURCESPEC = _descriptor.Descriptor(
@@ -471,7 +516,7 @@ _SOURCESPEC = _descriptor.Descriptor(
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      options=_descriptor._ParseOptions(descriptor_pb2.FieldOptions(), _b('\030\001')), file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='builtin', full_name='proto.SourceSpec.builtin', index=8,
       number=8, type=9, cpp_type=9, label=1,
@@ -498,8 +543,8 @@ _SOURCESPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=591,
-  serialized_end=1071,
+  serialized_start=702,
+  serialized_end=1186,
 )
 
 
@@ -571,8 +616,8 @@ _SINKSPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1074,
-  serialized_end=1219,
+  serialized_start=1189,
+  serialized_end=1334,
 )
 
 
@@ -590,6 +635,13 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='originalFileName', full_name='proto.PackageLocationMetaData.originalFileName',
index=1,
+      number=2, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
@@ -602,8 +654,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1221,
-  serialized_end=1267,
+  serialized_start=1336,
+  serialized_end=1408,
 )
 
 
@@ -654,8 +706,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1270,
-  serialized_end=1431,
+  serialized_start=1411,
+  serialized_end=1572,
 )
 
 
@@ -692,8 +744,8 @@ _INSTANCE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1433,
-  serialized_end=1514,
+  serialized_start=1574,
+  serialized_end=1655,
 )
 
 
@@ -730,8 +782,8 @@ _ASSIGNMENT = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1516,
-  serialized_end=1581,
+  serialized_start=1657,
+  serialized_end=1722,
 )
 
 _FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = _PROCESSINGGUARANTEES
@@ -739,6 +791,7 @@ _FUNCTIONDETAILS.fields_by_name['runtime'].enum_type = _FUNCTIONDETAILS_RUNTIME
 _FUNCTIONDETAILS.fields_by_name['source'].message_type = _SOURCESPEC
 _FUNCTIONDETAILS.fields_by_name['sink'].message_type = _SINKSPEC
 _FUNCTIONDETAILS.fields_by_name['resources'].message_type = _RESOURCES
+_FUNCTIONDETAILS.fields_by_name['retryDetails'].message_type = _RETRYDETAILS
 _FUNCTIONDETAILS_RUNTIME.containing_type = _FUNCTIONDETAILS
 _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY.containing_type = _SOURCESPEC
 _SOURCESPEC_INPUTSPECSENTRY.fields_by_name['value'].message_type = _CONSUMERSPEC
@@ -751,6 +804,7 @@ _FUNCTIONMETADATA.fields_by_name['packageLocation'].message_type = _PACKAGELOCAT
 _INSTANCE.fields_by_name['functionMetaData'].message_type = _FUNCTIONMETADATA
 _ASSIGNMENT.fields_by_name['instance'].message_type = _INSTANCE
 DESCRIPTOR.message_types_by_name['Resources'] = _RESOURCES
+DESCRIPTOR.message_types_by_name['RetryDetails'] = _RETRYDETAILS
 DESCRIPTOR.message_types_by_name['FunctionDetails'] = _FUNCTIONDETAILS
 DESCRIPTOR.message_types_by_name['ConsumerSpec'] = _CONSUMERSPEC
 DESCRIPTOR.message_types_by_name['SourceSpec'] = _SOURCESPEC
@@ -770,6 +824,13 @@ Resources = _reflection.GeneratedProtocolMessageType('Resources', (_message.Mess
   ))
 _sym_db.RegisterMessage(Resources)
 
+RetryDetails = _reflection.GeneratedProtocolMessageType('RetryDetails', (_message.Message,),
dict(
+  DESCRIPTOR = _RETRYDETAILS,
+  __module__ = 'Function_pb2'
+  # @@protoc_insertion_point(class_scope:proto.RetryDetails)
+  ))
+_sym_db.RegisterMessage(RetryDetails)
+
 FunctionDetails = _reflection.GeneratedProtocolMessageType('FunctionDetails', (_message.Message,),
dict(
   DESCRIPTOR = _FUNCTIONDETAILS,
   __module__ = 'Function_pb2'
@@ -851,4 +912,6 @@ _SOURCESPEC_INPUTSPECSENTRY.has_options = True
 _SOURCESPEC_INPUTSPECSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(),
_b('8\001'))
 _SOURCESPEC.fields_by_name['topicsToSerDeClassName'].has_options = True
 _SOURCESPEC.fields_by_name['topicsToSerDeClassName']._options = _descriptor._ParseOptions(descriptor_pb2.FieldOptions(),
_b('\030\001'))
+_SOURCESPEC.fields_by_name['topicsPattern'].has_options = True
+_SOURCESPEC.fields_by_name['topicsPattern']._options = _descriptor._ParseOptions(descriptor_pb2.FieldOptions(),
_b('\030\001'))
 # @@protoc_insertion_point(module_scope)
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 2f5c895..3012e6d 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -28,8 +28,8 @@ import os
 import sys
 import signal
 import time
+import zipfile
 
-from pulsar import Authentication
 import pulsar
 
 import Function_pb2
@@ -75,6 +75,12 @@ def main():
   args = parser.parse_args()
   function_details = Function_pb2.FunctionDetails()
   json_format.Parse(args.function_details, function_details)
+
+  if os.path.splitext(str(args.py))[1] == '.whl':
+    zpfile = zipfile.ZipFile(str(args.py), 'r')
+    zpfile.extractall(os.path.dirname(str(args.py)))
+    sys.path.insert(0, os.path.dirname(str(args.py)))
+
   log_file = os.path.join(args.logging_directory,
                           util.getFullyQualifiedFunctionName(function_details.tenant, function_details.namespace,
function_details.name),
                           "%s-%s.log" % (args.logging_file, args.instance_id))
diff --git a/pulsar-functions/instance/src/main/python/util.py b/pulsar-functions/instance/src/main/python/util.py
index de39006..c9337d9 100644
--- a/pulsar-functions/instance/src/main/python/util.py
+++ b/pulsar-functions/instance/src/main/python/util.py
@@ -33,6 +33,8 @@ PULSAR_API_ROOT = 'pulsar'
 PULSAR_FUNCTIONS_API_ROOT = 'functions'
 
 def import_class(from_path, full_class_name):
+  from_path = str(from_path)
+  full_class_name = str(full_class_name)
   kclass = import_class_from_path(from_path, full_class_name)
   if kclass is None:
     our_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))


Mime
View raw message