pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] jerrypeng closed pull request #3299: Cleanup consumer subscriptions and fix graceful shutdown for functions/sinks
Date Fri, 04 Jan 2019 08:00:04 GMT
jerrypeng closed pull request #3299:  Cleanup consumer subscriptions and fix graceful shutdown
for functions/sinks
URL: https://github.com/apache/pulsar/pull/3299
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 94e7bfabbe..fd2df08592 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -260,6 +260,7 @@ protected static FunctionConfig createFunctionConfig(String tenant, String
names
         functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
         functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
         functionConfig.setOutput(sinkTopic);
+        functionConfig.setCleanupSubscription(true);
         return functionConfig;
     }
 
@@ -283,6 +284,7 @@ private static SinkConfig createSinkConfig(String tenant, String namespace,
Stri
         sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
         sinkConfig.setInputs(Collections.singleton(sourceTopic));
         sinkConfig.setSourceSubscriptionName(subName);
+        sinkConfig.setCleanupSubscription(true);
         return sinkConfig;
     }
     /**
@@ -350,6 +352,20 @@ public void testE2EPulsarFunction() throws Exception {
         assertNotEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
                 totalMsgs);
 
+        // delete functions
+        admin.functions().deleteFunction(tenant, namespacePortion, functionName);
+
+        retryStrategically((test) -> {
+            try {
+                return admin.topics().getStats(sourceTopic).subscriptions.size() == 0;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+
+        // make sure subscriptions are cleanup
+        assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
+
     }
 
     @Test(timeOut = 20000)
@@ -535,6 +551,21 @@ public void testPulsarSinkStats() throws Exception {
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
         assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant,
namespacePortion, functionName));
         assertTrue(m.value > 0.0);
+
+
+        // delete functions
+        admin.sink().deleteSink(tenant, namespacePortion, functionName);
+
+        retryStrategically((test) -> {
+            try {
+                return admin.topics().getStats(sourceTopic).subscriptions.size() == 0;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+
+        // make sure subscriptions are cleanup
+        assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
     }
 
     @Test(timeOut = 20000)
@@ -945,6 +976,20 @@ public void testPulsarFunctionStats() throws Exception {
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
         assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant,
namespacePortion, functionName));
         assertEquals(m.value, (double) totalMsgs);
+
+        // delete functions
+        admin.functions().deleteFunction(tenant, namespacePortion, functionName);
+
+        retryStrategically((test) -> {
+            try {
+                return admin.topics().getStats(sourceTopic).subscriptions.size() == 0;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+
+        // make sure subscriptions are cleanup
+        assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
     }
 
     @Test(timeOut = 20000)
@@ -1012,6 +1057,20 @@ public void testPulsarFunctionStatus() throws Exception {
         assertEquals((int)count, totalMsgs);
         assertEquals((int) success, totalMsgs);
         assertEquals(ownerWorkerId, workerId);
+
+        // delete functions
+        admin.functions().deleteFunction(tenant, namespacePortion, functionName);
+
+        retryStrategically((test) -> {
+            try {
+                return admin.topics().getStats(sourceTopic).subscriptions.size() == 0;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+
+        // make sure subscriptions are cleanup
+        assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
     }
 
     @Test(dataProvider = "validRoleName")
@@ -1063,6 +1122,7 @@ public void testFunctionStopAndRestartApi() throws Exception {
         String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
         FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
                 sourceTopicName, sinkTopic, subscriptionName);
+        functionConfig.setCleanupSubscription(false);
         admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
 
         retryStrategically((test) -> {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index bf271556ee..68a9a7d0e4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -93,4 +93,6 @@
     private Long timeoutMs;
     private String jar;
     private String py;
+    // Whether the subscriptions the functions created/used should be deleted when the functions
is deleted
+    private Boolean cleanupSubscription;
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
index b85de0633c..abb2067485 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
@@ -66,6 +66,7 @@
     private Resources resources;
     private Boolean autoAck;
     private Long timeoutMs;
-
     private String archive;
+    // Whether the subscriptions the functions created/used should be deleted when the functions
is deleted
+    private Boolean cleanupSubscription;
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 7c36b584d0..29b6946d0e 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -289,7 +289,7 @@ public void run() {
 
     private void loadJars() throws Exception {
         try {
-            log.info("jarFile: {}", jarFile);
+            log.info("Load JAR: {}", jarFile);
             // Let's first try to treat it as a nar archive
             fnCache.registerFunctionInstanceWithArchive(
                 instanceConfig.getFunctionId(),
@@ -629,6 +629,7 @@ public void setupInput(ContextImpl contextImpl) throws Exception {
                     FunctionConfig.ProcessingGuarantees.valueOf(
                             this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
 
+            pulsarSourceConfig.setCleanupSubscription(this.instanceConfig.getFunctionDetails().getSource().getCleanupSubscription());
             switch (sourceSpec.getSubscriptionType()) {
                 case FAILOVER:
                     pulsarSourceConfig.setSubscriptionType(SubscriptionType.Failover);
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index ff41dc846d..704846405d 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -145,7 +145,15 @@ public void close() throws Exception {
         if (inputConsumers != null ) {
             inputConsumers.forEach(consumer -> {
                 try {
-                    consumer.close();
+                    if (pulsarSourceConfig.getCleanupSubscription()) {
+                        try {
+                            consumer.unsubscribe();
+                        } catch (PulsarClientException.AlreadyClosedException ex) {
+                            //no-op another instance may have already unsubscribed
+                        }
+                    } else {
+                        consumer.close();
+                    }
                 } catch (PulsarClientException e) {
                 }
             });
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
index cf843be8de..c281740931 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
@@ -37,6 +37,8 @@
     private FunctionConfig.ProcessingGuarantees processingGuarantees;
     SubscriptionType subscriptionType;
     private String subscriptionName;
+    // Whether the subscriptions the functions created/used should be deleted when the functions
is deleted
+    private Boolean cleanupSubscription;
     private Integer maxMessageRetries = -1;
     private String deadLetterTopic;
 
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py b/pulsar-functions/instance/src/main/python/Function_pb2.py
index 3979d490b7..f5141d4425 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -39,7 +39,7 @@
   name='Function.proto',
   package='proto',
   syntax='proto3',
-  serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01
\x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01
\x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\xe8\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01
\x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04
\x01(\t\x12\x10\n\x08logTopic\x18\x05 \x01(\t\x12\x39\n\x14processingGuarantees\x18\x06 \x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12\x12\n\nuserConfig\x18\x07
\x01(\t\x12\x12\n\nsecretsMap\x18\x10 \x01(\t\x12/\n\x07runtime\x18\x08 \x01(\x0e\x32\x1e.proto.FunctionDetails.Runtime\x12\x0f\n\x07\x61utoAck\x18\t
\x01(\x08\x12\x13\n\x0bparallelism\x18\n \x01(\x05\x12!\n\x06source\x18\x0b \x01(\x0b\x32\x11.proto.SourceSpec\x12\x1d\n\x04sink\x18\x0c
\x01(\x0b\x32\x0f.proto.SinkSpec\x12#\n\tresources\x18\r \x01(\x0b\x32\x10.proto.Resources\x12\x12\n\npackageUrl\x18\x0e
\x01(\t\x12)\n\x0cretryDetails\x18\x0f \x01(\x0b\x32\x13.proto.RetryDetails\"\x1f\n\x07Runtime\x12\x08\n\x04JAVA\x10\x00\x12\n\n\x06PYTHON\x10\x01\"R\n\x0c\x43onsumerSpec\x12\x12\n\nschemaType\x18\x01
\x01(\t\x12\x16\n\x0eserdeClassName\x18\x02 \x01(\t\x12\x16\n\x0eisRegexPattern\x18\x03 \x01(\x08\"\xe4\x03\n\nSourceSpec\x12\x11\n\tclassName\x18\x01
\x01(\t\x12\x0f\n\x07\x63onfigs\x18\x02 \x01(\t\x12\x15\n\rtypeClassName\x18\x05 \x01(\t\x12\x31\n\x10subscriptionType\x18\x03
\x01(\x0e\x32\x17.proto.SubscriptionType\x12Q\n\x16topicsToSerDeClassName\x18\x04 \x03(\x0b\x32-.proto.SourceSpec.TopicsToSerDeClassNameEntryB\x02\x18\x01\x12\x35\n\ninputSpecs\x18\n
\x03(\x0b\x32!.proto.SourceSpec.InputSpecsEntry\x12\x11\n\ttimeoutMs\x18\x06 \x01(\x04\x12\x19\n\rtopicsPattern\x18\x07
\x01(\tB\x02\x18\x01\x12\x0f\n\x07\x62uiltin\x18\x08 \x01(\t\x12\x18\n\x10subscriptionName\x18\t
\x01(\t\x1a=\n\x1bTopicsToSerDeClassNameEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02
\x01(\t:\x02\x38\x01\x1a\x46\n\x0fInputSpecsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\"\n\x05value\x18\x02
\x01(\x0b\x32\x13.proto.ConsumerSpec:\x02\x38\x01\"\x91\x01\n\x08SinkSpec\x12\x11\n\tclassName\x18\x01
\x01(\t\x12\x0f\n\x07\x63onfigs\x18\x02 \x01(\t\x12\x15\n\rtypeClassName\x18\x05 \x01(\t\x12\r\n\x05topic\x18\x03
\x01(\t\x12\x16\n\x0eserDeClassName\x18\x04 \x01(\t\x12\x0f\n\x07\x62uiltin\x18\x06 \x01(\t\x12\x12\n\nschemaType\x18\x07
\x01(\t\"H\n\x17PackageLocationMetaData\x12\x13\n\x0bpackagePath\x18\x01 \x01(\t\x12\x18\n\x10originalFileName\x18\x02
\x01(\t\"\xa1\x01\n\x10\x46unctionMetaData\x12/\n\x0f\x66unctionDetails\x18\x01 \x01(\x0b\x32\x16.proto.FunctionDetails\x12\x37\n\x0fpackageLocation\x18\x02
\x01(\x0b\x32\x1e.proto.PackageLocationMetaData\x12\x0f\n\x07version\x18\x03 \x01(\x04\x12\x12\n\ncreateTime\x18\x04
\x01(\x04\"Q\n\x08Instance\x12\x31\n\x10\x66unctionMetaData\x18\x01 \x01(\x0b\x32\x17.proto.FunctionMetaData\x12\x12\n\ninstanceId\x18\x02
\x01(\x05\"A\n\nAssignment\x12!\n\x08instance\x18\x01 \x01(\x0b\x32\x0f.proto.Instance\x12\x10\n\x08workerId\x18\x02
\x01(\t*O\n\x14ProcessingGuarantees\x12\x10\n\x0c\x41TLEAST_ONCE\x10\x00\x12\x0f\n\x0b\x41TMOST_ONCE\x10\x01\x12\x14\n\x10\x45\x46\x46\x45\x43TIVELY_ONCE\x10\x02*,\n\x10SubscriptionType\x12\n\n\x06SHARED\x10\x00\x12\x0c\n\x08\x46\x41ILOVER\x10\x01\x42-\n!org.apache.pulsar.functions.protoB\x08\x46unctionb\x06proto3')
+  serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01
\x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01
\x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\xe8\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01
\x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04
\x01(\t\x12\x10\n\x08logTopic\x18\x05 \x01(\t\x12\x39\n\x14processingGuarantees\x18\x06 \x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12\x12\n\nuserConfig\x18\x07
\x01(\t\x12\x12\n\nsecretsMap\x18\x10 \x01(\t\x12/\n\x07runtime\x18\x08 \x01(\x0e\x32\x1e.proto.FunctionDetails.Runtime\x12\x0f\n\x07\x61utoAck\x18\t
\x01(\x08\x12\x13\n\x0bparallelism\x18\n \x01(\x05\x12!\n\x06source\x18\x0b \x01(\x0b\x32\x11.proto.SourceSpec\x12\x1d\n\x04sink\x18\x0c
\x01(\x0b\x32\x0f.proto.SinkSpec\x12#\n\tresources\x18\r \x01(\x0b\x32\x10.proto.Resources\x12\x12\n\npackageUrl\x18\x0e
\x01(\t\x12)\n\x0cretryDetails\x18\x0f \x01(\x0b\x32\x13.proto.RetryDetails\"\x1f\n\x07Runtime\x12\x08\n\x04JAVA\x10\x00\x12\n\n\x06PYTHON\x10\x01\"R\n\x0c\x43onsumerSpec\x12\x12\n\nschemaType\x18\x01
\x01(\t\x12\x16\n\x0eserdeClassName\x18\x02 \x01(\t\x12\x16\n\x0eisRegexPattern\x18\x03 \x01(\x08\"\x81\x04\n\nSourceSpec\x12\x11\n\tclassName\x18\x01
\x01(\t\x12\x0f\n\x07\x63onfigs\x18\x02 \x01(\t\x12\x15\n\rtypeClassName\x18\x05 \x01(\t\x12\x31\n\x10subscriptionType\x18\x03
\x01(\x0e\x32\x17.proto.SubscriptionType\x12Q\n\x16topicsToSerDeClassName\x18\x04 \x03(\x0b\x32-.proto.SourceSpec.TopicsToSerDeClassNameEntryB\x02\x18\x01\x12\x35\n\ninputSpecs\x18\n
\x03(\x0b\x32!.proto.SourceSpec.InputSpecsEntry\x12\x11\n\ttimeoutMs\x18\x06 \x01(\x04\x12\x19\n\rtopicsPattern\x18\x07
\x01(\tB\x02\x18\x01\x12\x0f\n\x07\x62uiltin\x18\x08 \x01(\t\x12\x18\n\x10subscriptionName\x18\t
\x01(\t\x12\x1b\n\x13\x63leanupSubscription\x18\x0b \x01(\x08\x1a=\n\x1bTopicsToSerDeClassNameEntry\x12\x0b\n\x03key\x18\x01
\x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x46\n\x0fInputSpecsEntry\x12\x0b\n\x03key\x18\x01
\x01(\t\x12\"\n\x05value\x18\x02 \x01(\x0b\x32\x13.proto.ConsumerSpec:\x02\x38\x01\"\x91\x01\n\x08SinkSpec\x12\x11\n\tclassName\x18\x01
\x01(\t\x12\x0f\n\x07\x63onfigs\x18\x02 \x01(\t\x12\x15\n\rtypeClassName\x18\x05 \x01(\t\x12\r\n\x05topic\x18\x03
\x01(\t\x12\x16\n\x0eserDeClassName\x18\x04 \x01(\t\x12\x0f\n\x07\x62uiltin\x18\x06 \x01(\t\x12\x12\n\nschemaType\x18\x07
\x01(\t\"H\n\x17PackageLocationMetaData\x12\x13\n\x0bpackagePath\x18\x01 \x01(\t\x12\x18\n\x10originalFileName\x18\x02
\x01(\t\"\xa1\x01\n\x10\x46unctionMetaData\x12/\n\x0f\x66unctionDetails\x18\x01 \x01(\x0b\x32\x16.proto.FunctionDetails\x12\x37\n\x0fpackageLocation\x18\x02
\x01(\x0b\x32\x1e.proto.PackageLocationMetaData\x12\x0f\n\x07version\x18\x03 \x01(\x04\x12\x12\n\ncreateTime\x18\x04
\x01(\x04\"Q\n\x08Instance\x12\x31\n\x10\x66unctionMetaData\x18\x01 \x01(\x0b\x32\x17.proto.FunctionMetaData\x12\x12\n\ninstanceId\x18\x02
\x01(\x05\"A\n\nAssignment\x12!\n\x08instance\x18\x01 \x01(\x0b\x32\x0f.proto.Instance\x12\x10\n\x08workerId\x18\x02
\x01(\t*O\n\x14ProcessingGuarantees\x12\x10\n\x0c\x41TLEAST_ONCE\x10\x00\x12\x0f\n\x0b\x41TMOST_ONCE\x10\x01\x12\x14\n\x10\x45\x46\x46\x45\x43TIVELY_ONCE\x10\x02*,\n\x10SubscriptionType\x12\n\n\x06SHARED\x10\x00\x12\x0c\n\x08\x46\x41ILOVER\x10\x01\x42-\n!org.apache.pulsar.functions.protoB\x08\x46unctionb\x06proto3')
 )
 
 _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
@@ -63,8 +63,8 @@
   ],
   containing_type=None,
   options=None,
-  serialized_start=1744,
-  serialized_end=1823,
+  serialized_start=1773,
+  serialized_end=1852,
 )
 _sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES)
 
@@ -86,8 +86,8 @@
   ],
   containing_type=None,
   options=None,
-  serialized_start=1825,
-  serialized_end=1869,
+  serialized_start=1854,
+  serialized_end=1898,
 )
 _sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE)
 
@@ -420,8 +420,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1073,
-  serialized_end=1134,
+  serialized_start=1102,
+  serialized_end=1163,
 )
 
 _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
@@ -457,8 +457,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1136,
-  serialized_end=1206,
+  serialized_start=1165,
+  serialized_end=1235,
 )
 
 _SOURCESPEC = _descriptor.Descriptor(
@@ -538,6 +538,13 @@
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='cleanupSubscription', full_name='proto.SourceSpec.cleanupSubscription', index=10,
+      number=11, type=8, cpp_type=7, label=1,
+      has_default_value=False, default_value=False,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
@@ -551,7 +558,7 @@
   oneofs=[
   ],
   serialized_start=722,
-  serialized_end=1206,
+  serialized_end=1235,
 )
 
 
@@ -623,8 +630,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1209,
-  serialized_end=1354,
+  serialized_start=1238,
+  serialized_end=1383,
 )
 
 
@@ -661,8 +668,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1356,
-  serialized_end=1428,
+  serialized_start=1385,
+  serialized_end=1457,
 )
 
 
@@ -713,8 +720,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1431,
-  serialized_end=1592,
+  serialized_start=1460,
+  serialized_end=1621,
 )
 
 
@@ -751,8 +758,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1594,
-  serialized_end=1675,
+  serialized_start=1623,
+  serialized_end=1704,
 )
 
 
@@ -789,8 +796,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1677,
-  serialized_end=1742,
+  serialized_start=1706,
+  serialized_end=1771,
 )
 
 _FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = _PROCESSINGGUARANTEES
diff --git a/pulsar-functions/instance/src/main/python/function_stats.py b/pulsar-functions/instance/src/main/python/function_stats.py
index d3dc32294f..34793a52eb 100644
--- a/pulsar-functions/instance/src/main/python/function_stats.py
+++ b/pulsar-functions/instance/src/main/python/function_stats.py
@@ -103,7 +103,7 @@ def __init__(self, metrics_labels):
     self._stat_total_received_1min = self.stat_total_received_1min.labels(*self.metrics_labels)
 
     # start time for windowed metrics
-    util.FixedTimer(60, self.reset).start()
+    util.FixedTimer(60, self.reset, name="windowed-metrics-timer").start()
 
   def get_total_received(self):
     return self._stat_total_received._value.get();
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index d4c3da5a95..254855d27d 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -42,7 +42,6 @@
 
 from functools import partial
 from collections import namedtuple
-from threading import Timer
 from function_stats import Stats
 
 Log = log.Log
@@ -90,6 +89,7 @@ def __init__(self, instance_id, function_id, function_version, function_details,
     self.contextimpl = None
     self.last_health_check_ts = time.time()
     self.timeout_ms = function_details.source.timeoutMs if function_details.source.timeoutMs
> 0 else None
+    self.cleanup_subscription = function_details.source.cleanupSubscription
     self.expected_healthcheck_interval = expected_healthcheck_interval
     self.secrets_provider = secrets_provider
     self.metrics_labels = [function_details.tenant,
@@ -111,8 +111,6 @@ def process_spawner_health_check_timer(self):
       os.kill(os.getpid(), signal.SIGKILL)
       sys.exit(1)
 
-    Timer(self.expected_healthcheck_interval, self.process_spawner_health_check_timer).start()
-
   def run(self):
     # Setup consumers and input deserializers
     mode = pulsar._pulsar.ConsumerType.Shared
@@ -177,7 +175,8 @@ def run(self):
     # start proccess spawner health check timer
     self.last_health_check_ts = time.time()
     if self.expected_healthcheck_interval > 0:
-      Timer(self.expected_healthcheck_interval, self.process_spawner_health_check_timer).start()
+      timer = util.FixedTimer(self.expected_healthcheck_interval, self.process_spawner_health_check_timer,
name="health-check-timer")
+      timer.start()
 
   def actual_execution(self):
     Log.debug("Started Thread for executing the function")
@@ -368,3 +367,22 @@ def get_function_status(self):
   def join(self):
     self.queue.put(InternalQuitMessage(True), True)
     self.execution_thread.join()
+    self.close()
+
+  def close(self):
+    Log.info("Closing python instance...")
+    if self.producer:
+      self.producer.close()
+
+    if self.consumers:
+      for consumer in self.consumers.values():
+        try:
+          if self.cleanup_subscription:
+            consumer.unsubscribe()
+          else:
+            consumer.close()
+        except:
+          pass
+
+    if self.pulsar_client:
+      self.pulsar_client.close()
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index b7b1bfce46..c93d6e1743 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -31,6 +31,7 @@
 import zipfile
 import json
 import inspect
+import threading
 
 import pulsar
 
@@ -193,7 +194,8 @@ def main():
     time.sleep(1)
 
   pyinstance.join()
-  sys.exit(1)
+  # make sure to close all non-daemon threads before this!
+  sys.exit(0)
 
 if __name__ == '__main__':
   main()
diff --git a/pulsar-functions/instance/src/main/python/util.py b/pulsar-functions/instance/src/main/python/util.py
index 76f75bd228..6ab213f712 100644
--- a/pulsar-functions/instance/src/main/python/util.py
+++ b/pulsar-functions/instance/src/main/python/util.py
@@ -70,10 +70,12 @@ def getFullyQualifiedFunctionName(tenant, namespace, name):
 
 class FixedTimer():
 
-    def __init__(self, t, hFunction):
+    def __init__(self, t, hFunction, name="timer-thread"):
         self.t = t
         self.hFunction = hFunction
         self.thread = Timer(self.t, self.handle_function)
+        self.thread.setName(name)
+        self.thread.setDaemon(True)
 
     def handle_function(self):
         self.hFunction()
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index 8d93764854..cb5021b8a6 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -98,6 +98,7 @@ message SourceSpec {
      * already present in the server */
     string builtin = 8;
     string subscriptionName = 9;
+    bool cleanupSubscription = 11;
 }
 
 message SinkSpec {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index d3046ba99b..14e68cc01b 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -179,18 +179,39 @@ public void join() throws Exception {
     }
 
     @Override
-    public void stop() {
+    public void stop() throws InterruptedException {
         if (timer != null) {
             timer.cancel(false);
         }
-        if (process != null) {
-            process.destroyForcibly();
-        }
         if (channel != null) {
             channel.shutdown();
         }
         channel = null;
         stub = null;
+
+        // kill process
+        if (process != null) {
+            process.destroy();
+            int i = 0;
+            // gracefully terminate at first
+            while(process.isAlive()) {
+                Thread.sleep(100);
+                if (i > 100) {
+                    break;
+                }
+                i++;
+            }
+
+            // forcibly kill after timeout
+            if (process.isAlive()) {
+                log.warn("Process for instance {} did not exit within timeout. Forcibly killing
process...",
+                        Utils.getFullyQualifiedInstanceId(
+                                instanceConfig.getFunctionDetails().getTenant(),
+                                instanceConfig.getFunctionDetails().getNamespace(),
+                                instanceConfig.getFunctionDetails().getName(), instanceConfig.getInstanceId()));
+                process.destroyForcibly();
+            }
+        }
     }
 
     @Override
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 2459cfdf5b..04300db377 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -118,6 +118,11 @@ public static FunctionDetails convert(FunctionConfig functionConfig,
ClassLoader
         if (functionConfig.getTimeoutMs() != null) {
             sourceSpecBuilder.setTimeoutMs(functionConfig.getTimeoutMs());
         }
+        if (functionConfig.getCleanupSubscription() != null) {
+            sourceSpecBuilder.setCleanupSubscription(functionConfig.getCleanupSubscription());
+        } else {
+            sourceSpecBuilder.setCleanupSubscription(true);
+        }
         functionDetailsBuilder.setSource(sourceSpecBuilder);
 
         // Setup sink
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 413070a954..a972947198 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -151,6 +151,12 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail
             sourceSpecBuilder.setTimeoutMs(sinkConfig.getTimeoutMs());
         }
 
+        if (sinkConfig.getCleanupSubscription() != null) {
+            sourceSpecBuilder.setCleanupSubscription(sinkConfig.getCleanupSubscription());
+        } else {
+            sourceSpecBuilder.setCleanupSubscription(true);
+        }
+
         functionDetailsBuilder.setSource(sourceSpecBuilder);
 
         // set up sink spec
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 0469d80269..23b7259cde 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -46,6 +46,7 @@
 import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.common.policies.data.SinkStatus;
 import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
 import org.apache.pulsar.functions.api.examples.serde.CustomObject;
 import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
@@ -751,6 +752,10 @@ private void testExclamationFunction(Runtime runtime,
 
         // get function info
         getFunctionInfoNotFound(functionName);
+
+        // make sure subscriptions are cleanup
+        checkSubscriptionsCleanup(inputTopicName);
+
     }
 
     private static void submitExclamationFunction(Runtime runtime,
@@ -943,6 +948,21 @@ private static void getFunctionInfoNotFound(String functionName) throws
Exceptio
         }
     }
 
+    private static void checkSubscriptionsCleanup(String topic) throws Exception {
+        try {
+            ContainerExecResult result = pulsarCluster.getAnyBroker().execCmd(
+                    PulsarCluster.ADMIN_SCRIPT,
+                    "topics",
+                    "stats",
+                     topic);
+            TopicStats topicStats = new Gson().fromJson(result.getStdout(), TopicStats.class);
+            assertEquals(topicStats.subscriptions.size(), 0);
+
+        } catch (ContainerExecException e) {
+            fail("Command should have exited with non-zero");
+        }
+    }
+
     private static void getFunctionStatus(String functionName, int numMessages) throws Exception
{
         ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
             PulsarCluster.ADMIN_SCRIPT,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message