pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] sijie closed pull request #1700: adding sink spec
Date Tue, 01 May 2018 06:51:44 GMT
sijie closed pull request #1700: adding sink spec
URL: https://github.com/apache/incubator-pulsar/pull/1700
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index e2f0a7172a..293fadee41 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -62,6 +62,7 @@
 import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBuf;
 import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBufUtil;
 import org.apache.pulsar.functions.shaded.io.netty.buffer.Unpooled;
+import org.apache.pulsar.functions.shaded.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.shaded.proto.Function.SubscriptionType;
@@ -553,6 +554,16 @@ protected FunctionDetails convert(FunctionConfig functionConfig)
             }
             functionDetailsBuilder.setSource(sourceSpecBuilder);
 
+            // Setup sink
+            SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
+            if (functionConfig.getOutput() != null) {
+                sinkSpecBuilder.setTopic(functionConfig.getOutput());
+            }
+            if (functionConfig.getOutputSerdeClassName() != null) {
+                sinkSpecBuilder.setSerDeClassName(functionConfig.getOutputSerdeClassName());
+            }
+            functionDetailsBuilder.setSink(sinkSpecBuilder);
+
             if (functionConfig.getTenant() != null) {
                 functionDetailsBuilder.setTenant(functionConfig.getTenant());
             }
@@ -565,12 +576,6 @@ protected FunctionDetails convert(FunctionConfig functionConfig)
             if (functionConfig.getClassName() != null) {
                 functionDetailsBuilder.setClassName(functionConfig.getClassName());
             }
-            if (functionConfig.getOutput() != null) {
-                functionDetailsBuilder.setOutput(functionConfig.getOutput());
-            }
-            if (functionConfig.getOutputSerdeClassName() != null) {
-                functionDetailsBuilder.setOutputSerdeClassName(functionConfig.getOutputSerdeClassName());
-            }
             if (functionConfig.getLogTopic() != null) {
                 functionDetailsBuilder.setLogTopic(functionConfig.getLogTopic());
             }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 16a04f49ae..563e8e34f4 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -137,12 +137,12 @@ public String getCurrentMessageTopicName() {
 
     @Override
     public String getOutputTopic() {
-        return config.getFunctionDetails().getOutput();
+        return config.getFunctionDetails().getSink().getTopic();
     }
 
     @Override
     public String getOutputSerdeClassName() {
-        return config.getFunctionDetails().getOutputSerdeClassName();
+        return config.getFunctionDetails().getSink().getSerDeClassName();
     }
 
     @Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index e181204189..c1466d0f85 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -310,7 +310,7 @@ private void processResult(Record srcRecord,
             throw result.getSystemException();
         } else {
             stats.incrementSuccessfullyProcessed(endTime - startTime);
-            if (result.getResult() != null && instanceConfig.getFunctionDetails().getOutput()
!= null) {
+            if (result.getResult() != null && instanceConfig.getFunctionDetails().getSink().getTopic()
!= null) {
                 byte[] output;
                 try {
                     output = outputSerDe.serialize(result.getResult());
@@ -415,12 +415,12 @@ private static void addSystemMetrics(String metricName, double value,
InstanceCo
 
     private void setupSerDe(Class<?>[] typeArgs, ClassLoader clsLoader) {
         if (!Void.class.equals(typeArgs[1])) { // return type is not `Void.class`
-            if (instanceConfig.getFunctionDetails().getOutputSerdeClassName() == null
-                    || instanceConfig.getFunctionDetails().getOutputSerdeClassName().isEmpty()
-                    || instanceConfig.getFunctionDetails().getOutputSerdeClassName().equals(DefaultSerDe.class.getName()))
{
+            if (instanceConfig.getFunctionDetails().getSink().getSerDeClassName() == null
+                    || instanceConfig.getFunctionDetails().getSink().getSerDeClassName().isEmpty()
+                    || instanceConfig.getFunctionDetails().getSink().getSerDeClassName().equals(DefaultSerDe.class.getName()))
{
                 outputSerDe = InstanceUtils.initializeDefaultSerDe(typeArgs[1]);
             } else {
-                this.outputSerDe = InstanceUtils.initializeSerDe(instanceConfig.getFunctionDetails().getOutputSerdeClassName(),
clsLoader, typeArgs[1]);
+                this.outputSerDe = InstanceUtils.initializeSerDe(instanceConfig.getFunctionDetails().getSink().getSerDeClassName(),
clsLoader, typeArgs[1]);
             }
             Class<?>[] outputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class,
outputSerDe.getClass());
             if (outputSerDe.getClass().getName().equals(DefaultSerDe.class.getName())) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
index d51ae02f01..8e149b0388 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
@@ -70,7 +70,7 @@ public void close() {
             try {
                 producer.close();
             } catch (PulsarClientException e) {
-                log.warn("Fail to close producer for processor {}", functionDetails.getOutput(),
e);
+                log.warn("Fail to close producer for processor {}", functionDetails.getSink().getTopic(),
e);
             }
         }
     }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
index 30b1630717..930161ed99 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
@@ -72,7 +72,7 @@ public void close() {
             try {
                 producer.close();
             } catch (PulsarClientException e) {
-                log.warn("Fail to close producer for processor {}", functionDetails.getOutput(),
e);
+                log.warn("Fail to close producer for processor {}", functionDetails.getSink().getTopic(),
e);
             }
         }
     }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
index a2a5d8bc8a..33b699a63c 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
@@ -128,9 +128,9 @@ public void postReceiveMessage(Record record) {}
 
     @Override
     public void setupOutput(SerDe outputSerDe) throws Exception {
-        String outputTopic = functionDetails.getOutput();
+        String outputTopic = functionDetails.getSink().getTopic();
         if (outputTopic != null
-                && !functionDetails.getOutput().isEmpty()
+                && !outputTopic.isEmpty()
                 && outputSerDe != null) {
             log.info("Starting producer for output topic {}", outputTopic);
             initializeOutputProducer(outputTopic);
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py b/pulsar-functions/instance/src/main/python/Function_pb2.py
index b9883830e7..d1a249620b 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -39,7 +39,7 @@
   name='Function.proto',
   package='proto',
   syntax='proto3',
-  serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"\xf9\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01
\x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04
\x01(\t\x12\x1c\n\x14outputSerdeClassName\x18\x05 \x01(\t\x12\x0e\n\x06output\x18\x06 \x01(\t\x12\x10\n\x08logTopic\x18\x07
\x01(\t\x12\x39\n\x14processingGuarantees\x18\x08 \x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12:\n\nuserConfig\x18\t
\x03(\x0b\x32&.proto.FunctionDetails.UserConfigEntry\x12/\n\x07runtime\x18\n \x01(\x0e\x32\x1e.proto.FunctionDetails.Runtime\x12\x0f\n\x07\x61utoAck\x18\x0b
\x01(\x08\x12\x13\n\x0bparallelism\x18\x0c \x01(\x05\x12!\n\x06source\x18\r \x01(\x0b\x32\x11.proto.SourceSpec\x12\x1d\n\x04sink\x18\x0e
\x01(\x0b\x32\x0f.proto.SinkSpec\x1a\x31\n\x0fUserConfigEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02
\x01(\t:\x02\x38\x01\"\x1f\n\x07Runtime\x12\x08\n\x04JAVA\x10\x00\x12\n\n\x06PYTHON\x10\x01\"\xf1\x01\n\nSourceSpec\x12\x11\n\tclassName\x18\x01
\x01(\t\x12\x0f\n\x07\x63onfigs\x18\x02 \x01(\t\x12\x31\n\x10subscriptionType\x18\x03 \x01(\x0e\x32\x17.proto.SubscriptionType\x12M\n\x16topicsToSerDeClassName\x18\x04
\x03(\x0b\x32-.proto.SourceSpec.TopicsToSerDeClassNameEntry\x1a=\n\x1bTopicsToSerDeClassNameEntry\x12\x0b\n\x03key\x18\x01
\x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\".\n\x08SinkSpec\x12\x11\n\tclassName\x18\x01
\x01(\t\x12\x0f\n\x07\x63onfigs\x18\x02 \x01(\t\".\n\x17PackageLocationMetaData\x12\x13\n\x0bpackagePath\x18\x01
\x01(\t\"\xa1\x01\n\x10\x46unctionMetaData\x12/\n\x0f\x66unctionDetails\x18\x01 \x01(\x0b\x32\x16.proto.FunctionDetails\x12\x37\n\x0fpackageLocation\x18\x02
\x01(\x0b\x32\x1e.proto.PackageLocationMetaData\x12\x0f\n\x07version\x18\x03 \x01(\x04\x12\x12\n\ncreateTime\x18\x04
\x01(\x04\"Q\n\x08Instance\x12\x31\n\x10\x66unctionMetaData\x18\x01 \x01(\x0b\x32\x17.proto.FunctionMetaData\x12\x12\n\ninstanceId\x18\x02
\x01(\x05\"A\n\nAssignment\x12!\n\x08instance\x18\x01 \x01(\x0b\x32\x0f.proto.Instance\x12\x10\n\x08workerId\x18\x02
\x01(\t*O\n\x14ProcessingGuarantees\x12\x10\n\x0c\x41TLEAST_ONCE\x10\x00\x12\x0f\n\x0b\x41TMOST_ONCE\x10\x01\x12\x14\n\x10\x45\x46\x46\x45\x43TIVELY_ONCE\x10\x02*,\n\x10SubscriptionType\x12\n\n\x06SHARED\x10\x00\x12\x0c\n\x08\x46\x41ILOVER\x10\x01\x42-\n!org.apache.pulsar.functions.protoB\x08\x46unctionb\x06proto3')
+  serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"\xcb\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01
\x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04
\x01(\t\x12\x10\n\x08logTopic\x18\x05 \x01(\t\x12\x39\n\x14processingGuarantees\x18\x06 \x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12:\n\nuserConfig\x18\x07
\x03(\x0b\x32&.proto.FunctionDetails.UserConfigEntry\x12/\n\x07runtime\x18\x08 \x01(\x0e\x32\x1e.proto.FunctionDetails.Runtime\x12\x0f\n\x07\x61utoAck\x18\t
\x01(\x08\x12\x13\n\x0bparallelism\x18\n \x01(\x05\x12!\n\x06source\x18\x0b \x01(\x0b\x32\x11.proto.SourceSpec\x12\x1d\n\x04sink\x18\x0c
\x01(\x0b\x32\x0f.proto.SinkSpec\x1a\x31\n\x0fUserConfigEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02
\x01(\t:\x02\x38\x01\"\x1f\n\x07Runtime\x12\x08\n\x04JAVA\x10\x00\x12\n\n\x06PYTHON\x10\x01\"\xf1\x01\n\nSourceSpec\x12\x11\n\tclassName\x18\x01
\x01(\t\x12\x0f\n\x07\x63onfigs\x18\x02 \x01(\t\x12\x31\n\x10subscriptionType\x18\x03 \x01(\x0e\x32\x17.proto.SubscriptionType\x12M\n\x16topicsToSerDeClassName\x18\x04
\x03(\x0b\x32-.proto.SourceSpec.TopicsToSerDeClassNameEntry\x1a=\n\x1bTopicsToSerDeClassNameEntry\x12\x0b\n\x03key\x18\x01
\x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"U\n\x08SinkSpec\x12\x11\n\tclassName\x18\x01
\x01(\t\x12\x0f\n\x07\x63onfigs\x18\x02 \x01(\t\x12\r\n\x05topic\x18\x04 \x01(\t\x12\x16\n\x0eserDeClassName\x18\x05
\x01(\t\".\n\x17PackageLocationMetaData\x12\x13\n\x0bpackagePath\x18\x01 \x01(\t\"\xa1\x01\n\x10\x46unctionMetaData\x12/\n\x0f\x66unctionDetails\x18\x01
\x01(\x0b\x32\x16.proto.FunctionDetails\x12\x37\n\x0fpackageLocation\x18\x02 \x01(\x0b\x32\x1e.proto.PackageLocationMetaData\x12\x0f\n\x07version\x18\x03
\x01(\x04\x12\x12\n\ncreateTime\x18\x04 \x01(\x04\"Q\n\x08Instance\x12\x31\n\x10\x66unctionMetaData\x18\x01
\x01(\x0b\x32\x17.proto.FunctionMetaData\x12\x12\n\ninstanceId\x18\x02 \x01(\x05\"A\n\nAssignment\x12!\n\x08instance\x18\x01
\x01(\x0b\x32\x0f.proto.Instance\x12\x10\n\x08workerId\x18\x02 \x01(\t*O\n\x14ProcessingGuarantees\x12\x10\n\x0c\x41TLEAST_ONCE\x10\x00\x12\x0f\n\x0b\x41TMOST_ONCE\x10\x01\x12\x14\n\x10\x45\x46\x46\x45\x43TIVELY_ONCE\x10\x02*,\n\x10SubscriptionType\x12\n\n\x06SHARED\x10\x00\x12\x0c\n\x08\x46\x41ILOVER\x10\x01\x42-\n!org.apache.pulsar.functions.protoB\x08\x46unctionb\x06proto3')
 )
 
 _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
@@ -63,8 +63,8 @@
   ],
   containing_type=None,
   options=None,
-  serialized_start=1187,
-  serialized_end=1266,
+  serialized_start=1180,
+  serialized_end=1259,
 )
 _sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES)
 
@@ -86,8 +86,8 @@
   ],
   containing_type=None,
   options=None,
-  serialized_start=1268,
-  serialized_end=1312,
+  serialized_start=1261,
+  serialized_end=1305,
 )
 _sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE)
 
@@ -116,8 +116,8 @@
   ],
   containing_type=None,
   options=None,
-  serialized_start=500,
-  serialized_end=531,
+  serialized_start=454,
+  serialized_end=485,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_RUNTIME)
 
@@ -155,8 +155,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=449,
-  serialized_end=498,
+  serialized_start=403,
+  serialized_end=452,
 )
 
 _FUNCTIONDETAILS = _descriptor.Descriptor(
@@ -195,71 +195,57 @@
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='outputSerdeClassName', full_name='proto.FunctionDetails.outputSerdeClassName',
index=4,
+      name='logTopic', full_name='proto.FunctionDetails.logTopic', index=4,
       number=5, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='output', full_name='proto.FunctionDetails.output', index=5,
-      number=6, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
-    _descriptor.FieldDescriptor(
-      name='logTopic', full_name='proto.FunctionDetails.logTopic', index=6,
-      number=7, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
-    _descriptor.FieldDescriptor(
-      name='processingGuarantees', full_name='proto.FunctionDetails.processingGuarantees',
index=7,
-      number=8, type=14, cpp_type=8, label=1,
+      name='processingGuarantees', full_name='proto.FunctionDetails.processingGuarantees',
index=5,
+      number=6, type=14, cpp_type=8, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='userConfig', full_name='proto.FunctionDetails.userConfig', index=8,
-      number=9, type=11, cpp_type=10, label=3,
+      name='userConfig', full_name='proto.FunctionDetails.userConfig', index=6,
+      number=7, type=11, cpp_type=10, label=3,
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='runtime', full_name='proto.FunctionDetails.runtime', index=9,
-      number=10, type=14, cpp_type=8, label=1,
+      name='runtime', full_name='proto.FunctionDetails.runtime', index=7,
+      number=8, type=14, cpp_type=8, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='autoAck', full_name='proto.FunctionDetails.autoAck', index=10,
-      number=11, type=8, cpp_type=7, label=1,
+      name='autoAck', full_name='proto.FunctionDetails.autoAck', index=8,
+      number=9, type=8, cpp_type=7, label=1,
       has_default_value=False, default_value=False,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='parallelism', full_name='proto.FunctionDetails.parallelism', index=11,
-      number=12, type=5, cpp_type=1, label=1,
+      name='parallelism', full_name='proto.FunctionDetails.parallelism', index=9,
+      number=10, type=5, cpp_type=1, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='source', full_name='proto.FunctionDetails.source', index=12,
-      number=13, type=11, cpp_type=10, label=1,
+      name='source', full_name='proto.FunctionDetails.source', index=10,
+      number=11, type=11, cpp_type=10, label=1,
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='sink', full_name='proto.FunctionDetails.sink', index=13,
-      number=14, type=11, cpp_type=10, label=1,
+      name='sink', full_name='proto.FunctionDetails.sink', index=11,
+      number=12, type=11, cpp_type=10, label=1,
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
@@ -278,7 +264,7 @@
   oneofs=[
   ],
   serialized_start=26,
-  serialized_end=531,
+  serialized_end=485,
 )
 
 
@@ -315,8 +301,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=714,
-  serialized_end=775,
+  serialized_start=668,
+  serialized_end=729,
 )
 
 _SOURCESPEC = _descriptor.Descriptor(
@@ -366,8 +352,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=534,
-  serialized_end=775,
+  serialized_start=488,
+  serialized_end=729,
 )
 
 
@@ -392,6 +378,20 @@
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='topic', full_name='proto.SinkSpec.topic', index=2,
+      number=4, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='serDeClassName', full_name='proto.SinkSpec.serDeClassName', index=3,
+      number=5, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
@@ -404,8 +404,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=777,
-  serialized_end=823,
+  serialized_start=731,
+  serialized_end=816,
 )
 
 
@@ -435,8 +435,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=825,
-  serialized_end=871,
+  serialized_start=818,
+  serialized_end=864,
 )
 
 
@@ -487,8 +487,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=874,
-  serialized_end=1035,
+  serialized_start=867,
+  serialized_end=1028,
 )
 
 
@@ -525,8 +525,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1037,
-  serialized_end=1118,
+  serialized_start=1030,
+  serialized_end=1111,
 )
 
 
@@ -563,8 +563,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1120,
-  serialized_end=1185,
+  serialized_start=1113,
+  serialized_end=1178,
 )
 
 _FUNCTIONDETAILS_USERCONFIGENTRY.containing_type = _FUNCTIONDETAILS
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index 947ff239a3..57bb292b2b 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -226,9 +226,9 @@ def process_result(self, output, msg):
       msg.consumer.acknowledge(msg.message)
 
   def setup_output_serde(self):
-    if self.instance_config.function_details.outputSerdeClassName != None and \
-            len(self.instance_config.function_details.outputSerdeClassName) > 0:
-      serde_kclass = util.import_class(os.path.dirname(self.user_code), self.instance_config.function_details.outputSerdeClassName)
+    if self.instance_config.function_details.sink.serDeClassName != None and \
+            len(self.instance_config.function_details.sink.serDeClassName) > 0:
+      serde_kclass = util.import_class(os.path.dirname(self.user_code), self.instance_config.function_details.sink.serDeClassName)
       self.output_serde = serde_kclass()
     else:
       global DEFAULT_SERIALIZER
@@ -236,11 +236,11 @@ def setup_output_serde(self):
       self.output_serde = serde_kclass()
 
   def setup_producer(self):
-    if self.instance_config.function_details.output != None and \
-            len(self.instance_config.function_details.output) > 0:
-      Log.info("Setting up producer for topic %s" % self.instance_config.function_details.output)
+    if self.instance_config.function_details.sink.topic != None and \
+            len(self.instance_config.function_details.sink.topic) > 0:
+      Log.info("Setting up producer for topic %s" % self.instance_config.function_details.sink.topic)
       self.producer = self.pulsar_client.create_producer(
-        str(self.instance_config.function_details.output),
+        str(self.instance_config.function_details.sink.topic),
         block_if_queue_full=True,
         batching_enabled=True,
         batching_max_publish_delay_ms=1,
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 59ebce6896..f2763a9a48 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -103,10 +103,13 @@ def main():
     sourceSpec.topicsToSerDeClassName[topics] = serde_classname
   function_details.source.MergeFrom(sourceSpec)
 
+  sinkSpec = Function_pb2.SinkSpec()
   if args.output_topic != None and len(args.output_topic) != 0:
-    function_details.output = args.output_topic
+    sinkSpec.topic = args.output_topic
   if args.output_serde_classname != None and len(args.output_serde_classname) != 0:
-    function_details.outputSerdeClassName = args.output_serde_classname
+    sinkSpec.serDeClassName = args.output_serde_classname
+  function_details.sink.MergeFrom(sinkSpec)
+
   function_details.processingGuarantees = Function_pb2.ProcessingGuarantees.Value(args.processing_guarantees)
   if args.auto_ack == "true":
     function_details.autoAck = True
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index 898d777a1a..289dbbaa72 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -26,6 +26,7 @@
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.testng.annotations.Test;
 
 import java.lang.reflect.InvocationTargetException;
@@ -50,7 +51,7 @@ public Integer deserialize(byte[] input) {
     private static InstanceConfig createInstanceConfig(boolean addCustom, String outputSerde)
{
         FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
         if (outputSerde != null) {
-            functionDetailsBuilder.setOutputSerdeClassName(outputSerde);
+            functionDetailsBuilder.setSink(SinkSpec.newBuilder().setSerDeClassName(outputSerde).build());
         }
         InstanceConfig instanceConfig = new InstanceConfig();
         instanceConfig.setFunctionDetails(functionDetailsBuilder.build());
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
index e83a706d1f..4c301602f6 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
@@ -26,6 +26,7 @@
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.source.PulsarSource;
 import org.testng.annotations.Test;
 
@@ -33,7 +34,7 @@
 
     private static InstanceConfig createInstanceConfig() {
         FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
-        functionDetailsBuilder.setOutputSerdeClassName(DefaultSerDe.class.getName());
+        functionDetailsBuilder.setSink(SinkSpec.newBuilder().setSerDeClassName(DefaultSerDe.class.getName()).build());
         InstanceConfig instanceConfig = new InstanceConfig();
         instanceConfig.setFunctionDetails(functionDetailsBuilder.build());
         return instanceConfig;
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index 063b6aa6dd..f4b1fb85e5 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -43,15 +43,14 @@ message FunctionDetails {
     string namespace = 2;
     string name = 3;
     string className = 4;
-    string outputSerdeClassName = 5;
-    string output = 6;
-    string logTopic = 7;
-    ProcessingGuarantees processingGuarantees = 8;
-    map<string,string> userConfig = 9;
-    Runtime runtime = 10;
-    bool autoAck = 11;
-    int32 parallelism = 12;
-    SourceSpec source = 13;
+    string logTopic = 5;
+    ProcessingGuarantees processingGuarantees = 6;
+    map<string,string> userConfig = 7;
+    Runtime runtime = 8;
+    bool autoAck = 9;
+    int32 parallelism = 10;
+    SourceSpec source = 11;
+    SinkSpec sink = 12;
 }
 
 message SourceSpec {
@@ -64,6 +63,16 @@ message SourceSpec {
     map<string,string> topicsToSerDeClassName = 4;
 }
 
+message SinkSpec {
+    string className = 1;
+    // map in json format
+    string configs = 2;
+
+    // configs used only when functions output to sink
+    string topic = 3;
+    string serDeClassName = 4;
+}
+
 message PackageLocationMetaData {
     string packagePath = 1;
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index ff6098cee2..e0330bb6fc 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -32,6 +32,7 @@
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
+import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -129,12 +130,15 @@ public void start() throws Exception {
         functionDetailsBuilder.setName(functionName);
         functionDetailsBuilder.setClassName(className);
 
+        SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
         if (outputSerdeClassName != null) {
-            functionDetailsBuilder.setOutputSerdeClassName(outputSerdeClassName);
+            sinkSpecBuilder.setSerDeClassName(outputSerdeClassName);
         }
         if (outputTopicName != null) {
-            functionDetailsBuilder.setOutput(outputTopicName);
+            sinkSpecBuilder.setTopic(outputTopicName);
         }
+        functionDetailsBuilder.setSink(sinkSpecBuilder);
+
         if (logTopic != null) {
             functionDetailsBuilder.setLogTopic(logTopic);
         }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index bc49899cdf..3aae26f8c3 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -118,15 +118,15 @@
         } else {
             args.add("false");
         }
-        if (instanceConfig.getFunctionDetails().getOutput() != null
-                && !instanceConfig.getFunctionDetails().getOutput().isEmpty()) {
+        if (instanceConfig.getFunctionDetails().getSink().getTopic() != null
+                && !instanceConfig.getFunctionDetails().getSink().getTopic().isEmpty())
{
             args.add("--output_topic");
-            args.add(instanceConfig.getFunctionDetails().getOutput());
+            args.add(instanceConfig.getFunctionDetails().getSink().getTopic());
         }
-        if (instanceConfig.getFunctionDetails().getOutputSerdeClassName() != null
-                && !instanceConfig.getFunctionDetails().getOutputSerdeClassName().isEmpty())
{
+        if (instanceConfig.getFunctionDetails().getSink().getSerDeClassName() != null
+                && !instanceConfig.getFunctionDetails().getSink().getSerDeClassName().isEmpty())
{
             args.add("--output_serde_classname");
-            args.add(instanceConfig.getFunctionDetails().getOutputSerdeClassName());
+            args.add(instanceConfig.getFunctionDetails().getSink().getSerDeClassName());
         }
         args.add("--processing_guarantees");
         args.add(String.valueOf(instanceConfig.getFunctionDetails().getProcessingGuarantees()));
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 96fb2c222f..d0c6b3ce21 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -81,8 +81,10 @@ FunctionDetails createFunctionDetails(FunctionDetails.Runtime runtime)
{
         functionDetailsBuilder.setNamespace(TEST_NAMESPACE);
         functionDetailsBuilder.setName(TEST_NAME);
         functionDetailsBuilder.setClassName("org.apache.pulsar.functions.utils.functioncache.AddFunction");
-        functionDetailsBuilder.setOutput(TEST_NAME + "-output");
-        functionDetailsBuilder.setOutputSerdeClassName("org.apache.pulsar.functions.runtime.serde.Utf8Serializer");
+        functionDetailsBuilder.setSink(Function.SinkSpec.newBuilder()
+                .setTopic(TEST_NAME + "-output")
+                .setSerDeClassName("org.apache.pulsar.functions.runtime.serde.Utf8Serializer")
+                .build());
         functionDetailsBuilder.setLogTopic(TEST_NAME + "-log");
         functionDetailsBuilder.setSource(Function.SourceSpec.newBuilder()
                 .setSubscriptionType(Function.SubscriptionType.FAILOVER)
@@ -121,8 +123,8 @@ public void testJavaConstructor() {
                 + " --function_classname " + config.getFunctionDetails().getClassName()
                 + " --log_topic " + config.getFunctionDetails().getLogTopic()
                 + " --auto_ack false"
-                + " --output_topic " + config.getFunctionDetails().getOutput()
-                + " --output_serde_classname " + config.getFunctionDetails().getOutputSerdeClassName()
+                + " --output_topic " + config.getFunctionDetails().getSink().getTopic()
+                + " --output_serde_classname " + config.getFunctionDetails().getSink().getSerDeClassName()
                 + " --processing_guarantees ATLEAST_ONCE"
                 + " --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port " + args.get(38)
@@ -149,8 +151,8 @@ public void testPythonConstructor() {
                 + " --function_classname " + config.getFunctionDetails().getClassName()
                 + " --log_topic " + config.getFunctionDetails().getLogTopic()
                 + " --auto_ack false"
-                + " --output_topic " + config.getFunctionDetails().getOutput()
-                + " --output_serde_classname " + config.getFunctionDetails().getOutputSerdeClassName()
+                + " --output_topic " + config.getFunctionDetails().getSink().getTopic()
+                + " --output_serde_classname " + config.getFunctionDetails().getSink().getSerDeClassName()
                 + " --processing_guarantees ATLEAST_ONCE"
                 + " --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port " + args.get(37)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 04b6c706f0..1d6c20b345 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -486,7 +486,7 @@ public Response triggerFunction(final @PathParam("tenant") String tenant,
         } else {
             return Response.status(Status.BAD_REQUEST).build();
         }
-        String outputTopic = functionMetaData.getFunctionDetails().getOutput();
+        String outputTopic = functionMetaData.getFunctionDetails().getSink().getTopic();
         Reader reader = null;
         Producer producer = null;
         try {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 68be7641b1..31c928d8ae 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -51,6 +51,7 @@
 import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
 import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
+import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SubscriptionType;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
@@ -279,12 +280,14 @@ private void testRegisterFunctionMissingArguments(
         if (function != null) {
             functionDetailsBuilder.setName(function);
         }
+        SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
         if (outputTopic != null) {
-            functionDetailsBuilder.setOutput(outputTopic);
+            sinkSpecBuilder.setTopic(outputTopic);
         }
         if (outputSerdeClassName != null) {
-            functionDetailsBuilder.setOutputSerdeClassName(outputSerdeClassName);
+            sinkSpecBuilder.setSerDeClassName(outputSerdeClassName);
         }
+        functionDetailsBuilder.setSink(sinkSpecBuilder);
         if (className != null) {
             functionDetailsBuilder.setClassName(className);
         }
@@ -318,10 +321,12 @@ private void testRegisterFunctionMissingArguments(
     }
 
     private Response registerDefaultFunction() throws IOException {
+        SinkSpec sinkSpec = SinkSpec.newBuilder()
+                .setTopic(outputTopic)
+                .setSerDeClassName(outputSerdeClassName).build();
         FunctionDetails functionDetails = FunctionDetails.newBuilder()
                 .setTenant(tenant).setNamespace(namespace).setName(function)
-                .setOutput(outputTopic)
-                .setOutputSerdeClassName(outputSerdeClassName)
+                .setSink(sinkSpec)
                 .setClassName(className)
                 .setParallelism(parallelism)
                 .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)
@@ -571,12 +576,14 @@ private void testUpdateFunctionMissingArguments(
         if (function != null) {
             functionDetailsBuilder.setName(function);
         }
+        SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
         if (outputTopic != null) {
-            functionDetailsBuilder.setOutput(outputTopic);
+            sinkSpecBuilder.setTopic(outputTopic);
         }
         if (outputSerdeClassName != null) {
-            functionDetailsBuilder.setOutputSerdeClassName(outputSerdeClassName);
+            sinkSpecBuilder.setSerDeClassName(outputSerdeClassName);
         }
+        functionDetailsBuilder.setSink(sinkSpecBuilder);
         if (className != null) {
             functionDetailsBuilder.setClassName(className);
         }
@@ -610,10 +617,12 @@ private void testUpdateFunctionMissingArguments(
     }
 
     private Response updateDefaultFunction() throws IOException {
+        SinkSpec sinkSpec = SinkSpec.newBuilder()
+                .setTopic(outputTopic)
+                .setSerDeClassName(outputSerdeClassName).build();
         FunctionDetails functionDetails = FunctionDetails.newBuilder()
                 .setTenant(tenant).setNamespace(namespace).setName(function)
-                .setOutput(outputTopic)
-                .setOutputSerdeClassName(outputSerdeClassName)
+                .setSink(sinkSpec)
                 .setClassName(className)
                 .setParallelism(parallelism)
                 .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)
@@ -886,13 +895,15 @@ public void testGetNotExistedFunction() throws IOException {
     public void testGetFunctionSuccess() throws Exception {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
+        SinkSpec sinkSpec = SinkSpec.newBuilder()
+                .setTopic(outputTopic)
+                .setSerDeClassName(outputSerdeClassName).build();
         FunctionDetails functionDetails = FunctionDetails.newBuilder()
                 .setClassName(className)
-                .setOutputSerdeClassName(outputSerdeClassName)
+                .setSink(sinkSpec)
                 .setName(function)
                 .setNamespace(namespace)
                 .setProcessingGuarantees(ProcessingGuarantees.ATMOST_ONCE)
-                .setOutput(outputTopic)
                 .setTenant(tenant)
                 .setParallelism(parallelism)
                 .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message