From commits-return-20468-archive-asf-public=cust-asf.ponee.io@pulsar.apache.org Mon Jan 14 18:54:33 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id E3F34180675 for ; Mon, 14 Jan 2019 18:54:31 +0100 (CET) Received: (qmail 9412 invoked by uid 500); 14 Jan 2019 17:54:30 -0000 Mailing-List: contact commits-help@pulsar.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.apache.org Delivered-To: mailing list commits@pulsar.apache.org Received: (qmail 9244 invoked by uid 99); 14 Jan 2019 17:54:30 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Jan 2019 17:54:30 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 8697A870BA; Mon, 14 Jan 2019 17:54:29 +0000 (UTC) Date: Mon, 14 Jan 2019 17:54:29 +0000 To: "commits@pulsar.apache.org" Subject: [pulsar] branch master updated: Cleanup consumer subscriptions and fix graceful shutdown for functions/sinks (#3299) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <154748846933.8043.5497103991631487403@gitbox.apache.org> From: sanjeevrk@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: pulsar X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: c793ad68a3b3bf6d094924e7c0ee7f3f4eba1f7d X-Git-Newrev: 034f6ba04e9c48abec1517668cb4fa46efdf02bc X-Git-Rev: 034f6ba04e9c48abec1517668cb4fa46efdf02bc X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 034f6ba Cleanup consumer subscriptions and fix graceful shutdown for functions/sinks (#3299) 034f6ba is described below commit 034f6ba04e9c48abec1517668cb4fa46efdf02bc Author: Boyang Jerry Peng AuthorDate: Mon Jan 14 09:54:23 2019 -0800 Cleanup consumer subscriptions and fix graceful shutdown for functions/sinks (#3299) * Cleanup consumer subscriptions and fix graceful shutdown for functions * cleaning up * removing testing files * add unit tests * adding integration testing * refactoring * refactoring and adding tests * cleaning up --- .../worker/PulsarWorkerAssignmentTest.java | 2 + .../apache/pulsar/io/PulsarFunctionE2ETest.java | 168 ++++++++++++++++++++- .../pulsar/common/functions/FunctionConfig.java | 2 + .../org/apache/pulsar/common/io/SinkConfig.java | 3 +- .../pulsar/functions/instance/InstanceUtils.java | 12 ++ .../functions/instance/JavaInstanceRunnable.java | 4 +- .../functions/source/PulsarSourceConfig.java | 1 + .../instance/src/main/python/Function_pb2.py | 47 +++--- .../instance/src/main/python/function_stats.py | 2 +- .../instance/src/main/python/python_instance.py | 22 ++- .../src/main/python/python_instance_main.py | 4 +- pulsar-functions/instance/src/main/python/util.py | 4 +- .../proto/src/main/proto/Function.proto | 1 + .../pulsar/functions/runtime/ProcessRuntime.java | 29 +++- .../pulsar/functions/runtime/ThreadRuntime.java | 2 + .../functions/utils/FunctionConfigUtils.java | 5 + .../pulsar/functions/utils/SinkConfigUtils.java | 6 + .../pulsar/functions/worker/FunctionAction.java | 3 +- .../pulsar/functions/worker/FunctionActioner.java | 85 +++++++++-- .../functions/worker/FunctionRuntimeManager.java | 34 ++++- .../pulsar/functions/worker/WorkerService.java | 8 +- .../functions/worker/FunctionActionerTest.java | 5 +- .../worker/FunctionRuntimeManagerTest.java | 25 +-- .../functions/worker/MembershipManagerTest.java | 32 ++-- .../integration/functions/PulsarFunctionsTest.java | 20 +++ 25 files changed, 439 insertions(+), 87 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java index 5cdf5a4..4b4597e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import com.google.gson.Gson; import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -213,6 +214,7 @@ public class PulsarWorkerAssignmentTest { } }, 5, 150); // validate pulsar sink consumer has started on the topic + log.info("admin.topics().getStats(sinkTopic): {}", new Gson().toJson(admin.topics().getStats(sinkTopic))); assertEquals(admin.topics().getStats(sinkTopic).subscriptions.values().iterator().next().consumers.size(), 1); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java index 94e7bfa..f04fede 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java @@ -20,11 +20,13 @@ package org.apache.pulsar.io; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.gson.Gson; import lombok.ToString; import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.ServiceConfigurationUtils; +import org.apache.pulsar.broker.authentication.AuthenticationProviderBasic; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; import org.apache.pulsar.client.admin.BrokerStats; @@ -42,11 +44,13 @@ import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Utils; import org.apache.pulsar.common.io.SinkConfig; import org.apache.pulsar.common.io.SourceConfig; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.FunctionStats; import org.apache.pulsar.common.policies.data.FunctionStatus; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.functions.instance.InstanceUtils; import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.apache.pulsar.functions.worker.FunctionRuntimeManager; import org.apache.pulsar.functions.worker.WorkerConfig; @@ -119,6 +123,7 @@ public class PulsarFunctionE2ETest { private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem"; private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem"; private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem"; + private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem"; private static final Logger log = LoggerFactory.getLogger(PulsarFunctionE2ETest.class); @@ -148,14 +153,24 @@ public class PulsarFunctionE2ETest { config.setBrokerServicePort(brokerServicePort); config.setBrokerServicePortTls(brokerServiceTlsPort); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); + config.setTlsAllowInsecureConnection(true); Set providers = new HashSet<>(); providers.add(AuthenticationProviderTls.class.getName()); config.setAuthenticationEnabled(true); config.setAuthenticationProviders(providers); + config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); - config.setTlsAllowInsecureConnection(true); + config.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); + + config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName()); + config.setBrokerClientAuthenticationParameters( + "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH); + config.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); + config.setBrokerClientTlsEnabled(true); + + functionsWorkerService = createPulsarFunctionWorker(config); urlTls = new URL(brokerServiceUrl); @@ -170,7 +185,7 @@ public class PulsarFunctionE2ETest { authTls.configure(authParams); admin = spy( - PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).tlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH) + PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH) .allowTlsInsecureConnection(true).authentication(authTls).build()); brokerStatsClient = admin.brokerStats(); @@ -211,6 +226,7 @@ public class PulsarFunctionE2ETest { } private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) { + workerConfig = new WorkerConfig(); workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace); workerConfig.setSchedulerClassName( @@ -237,7 +253,7 @@ public class PulsarFunctionE2ETest { String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH)); workerConfig.setUseTls(true); workerConfig.setTlsAllowInsecureConnection(true); - workerConfig.setTlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH); + workerConfig.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); workerConfig.setAuthenticationEnabled(true); workerConfig.setAuthorizationEnabled(true); @@ -260,6 +276,7 @@ public class PulsarFunctionE2ETest { functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction"); functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); functionConfig.setOutput(sinkTopic); + functionConfig.setCleanupSubscription(true); return functionConfig; } @@ -283,6 +300,7 @@ public class PulsarFunctionE2ETest { sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); sinkConfig.setInputs(Collections.singleton(sourceTopic)); sinkConfig.setSourceSubscriptionName(subName); + sinkConfig.setCleanupSubscription(true); return sinkConfig; } /** @@ -350,6 +368,20 @@ public class PulsarFunctionE2ETest { assertNotEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages, totalMsgs); + // delete functions + admin.functions().deleteFunction(tenant, namespacePortion, functionName); + + retryStrategically((test) -> { + try { + return admin.topics().getStats(sourceTopic).subscriptions.size() == 0; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + + // make sure subscriptions are cleanup + assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0); + } @Test(timeOut = 20000) @@ -535,6 +567,21 @@ public class PulsarFunctionE2ETest { assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertTrue(m.value > 0.0); + + + // delete functions + admin.sink().deleteSink(tenant, namespacePortion, functionName); + + retryStrategically((test) -> { + try { + return admin.topics().getStats(sourceTopic).subscriptions.size() == 0; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + + // make sure subscriptions are cleanup + assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0); } @Test(timeOut = 20000) @@ -945,6 +992,20 @@ public class PulsarFunctionE2ETest { assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, (double) totalMsgs); + + // delete functions + admin.functions().deleteFunction(tenant, namespacePortion, functionName); + + retryStrategically((test) -> { + try { + return admin.topics().getStats(sourceTopic).subscriptions.size() == 0; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + + // make sure subscriptions are cleanup + assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0); } @Test(timeOut = 20000) @@ -1012,6 +1073,20 @@ public class PulsarFunctionE2ETest { assertEquals((int)count, totalMsgs); assertEquals((int) success, totalMsgs); assertEquals(ownerWorkerId, workerId); + + // delete functions + admin.functions().deleteFunction(tenant, namespacePortion, functionName); + + retryStrategically((test) -> { + try { + return admin.topics().getStats(sourceTopic).subscriptions.size() == 0; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + + // make sure subscriptions are cleanup + assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0); } @Test(dataProvider = "validRoleName") @@ -1110,6 +1185,93 @@ public class PulsarFunctionE2ETest { producer.close(); } + @Test(timeOut = 20000) + public void testFunctionAutomaticSubCleanup() throws Exception { + final String namespacePortion = "io"; + final String replNamespace = tenant + "/" + namespacePortion; + final String sourceTopic = "persistent://" + replNamespace + "/my-topic1"; + final String sinkTopic = "persistent://" + replNamespace + "/output"; + final String propertyKey = "key"; + final String propertyValue = "value"; + final String functionName = "PulsarFunction-test"; + admin.namespaces().createNamespace(replNamespace); + Set clusters = Sets.newHashSet(Lists.newArrayList("use")); + admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); + + // create a producer that creates a topic at broker + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create(); + + String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile(); + FunctionConfig functionConfig = new FunctionConfig(); + functionConfig.setTenant(tenant); + functionConfig.setNamespace(namespacePortion); + functionConfig.setName(functionName); + functionConfig.setParallelism(1); + functionConfig.setInputs(Collections.singleton(sourceTopic)); + functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction"); + functionConfig.setOutput(sinkTopic); + functionConfig.setCleanupSubscription(true); + functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); + + admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl); + + retryStrategically((test) -> { + try { + return admin.topics().getStats(sourceTopic).subscriptions.size() == 1; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + // validate pulsar source consumer has started on the topic + assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1); + + int totalMsgs = 10; + for (int i = 0; i < totalMsgs; i++) { + String data = "my-message-" + i; + producer.newMessage().property(propertyKey, propertyValue).value(data).send(); + } + retryStrategically((test) -> { + try { + SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get( + InstanceUtils.getDefaultSubscriptionName(tenant, namespacePortion, functionName)); + return subStats.unackedMessages == 0 && subStats.msgThroughputOut == totalMsgs; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 200); + + FunctionStatus functionStatus = admin.functions().getFunctionStatus(tenant, namespacePortion, + functionName); + + int numInstances = functionStatus.getNumInstances(); + assertEquals(numInstances, 1); + + FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData status + = functionStatus.getInstances().get(0).getStatus(); + + double count = status.getNumReceived(); + double success = status.getNumSuccessfullyProcessed(); + String ownerWorkerId = status.getWorkerId(); + assertEquals((int)count, totalMsgs); + assertEquals((int) success, totalMsgs); + assertEquals(ownerWorkerId, workerId); + + // delete functions + admin.functions().deleteFunction(tenant, namespacePortion, functionName); + + retryStrategically((test) -> { + try { + return admin.topics().getStats(sourceTopic).subscriptions.size() == 0; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + + // make sure subscriptions are cleanup + assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0); + } + + public static String getPrometheusMetrics(int metricsPort) throws IOException { StringBuilder result = new StringBuilder(); URL url = new URL(String.format("http://%s:%s/metrics", InetAddress.getLocalHost().getHostAddress(), metricsPort)); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java index bf27155..68a9a7d 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java @@ -93,4 +93,6 @@ public class FunctionConfig { private Long timeoutMs; private String jar; private String py; + // Whether the subscriptions the functions created/used should be deleted when the functions is deleted + private Boolean cleanupSubscription; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java index b85de06..abb2067 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java @@ -66,6 +66,7 @@ public class SinkConfig { private Resources resources; private Boolean autoAck; private Long timeoutMs; - private String archive; + // Whether the subscriptions the functions created/used should be deleted when the functions is deleted + private Boolean cleanupSubscription; } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java index 88a9df3..9db47cf 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java @@ -31,6 +31,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.sink.PulsarSink; +import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.apache.pulsar.functions.utils.Reflections; import net.jodah.typetools.TypeResolver; @@ -107,6 +108,17 @@ public class InstanceUtils { return SINK; } + public static String getDefaultSubscriptionName(String tenant, String namespace, String name) { + return FunctionDetailsUtils.getFullyQualifiedName(tenant, namespace, name); + } + + public static String getDefaultSubscriptionName(Function.FunctionDetails functionDetails) { + return getDefaultSubscriptionName( + functionDetails.getTenant(), + functionDetails.getNamespace(), + functionDetails.getName()); + } + public static Map getProperties(Utils.ComponentType componentType, String fullyQualifiedName, int instanceId) { Map properties = new HashMap<>(); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index fde9628..668b8bd 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -299,7 +299,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private void loadJars() throws Exception { try { - log.info("jarFile: {}", jarFile); + log.info("Load JAR: {}", jarFile); // Let's first try to treat it as a nar archive fnCache.registerFunctionInstanceWithArchive( instanceConfig.getFunctionId(), @@ -634,7 +634,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { pulsarSourceConfig.setSubscriptionName( StringUtils.isNotBlank(sourceSpec.getSubscriptionName()) ? sourceSpec.getSubscriptionName() - : FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails())); + : InstanceUtils.getDefaultSubscriptionName(instanceConfig.getFunctionDetails())); pulsarSourceConfig.setProcessingGuarantees( FunctionConfig.ProcessingGuarantees.valueOf( this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name())); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java index cf843be..ba7de68 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java @@ -37,6 +37,7 @@ public class PulsarSourceConfig { private FunctionConfig.ProcessingGuarantees processingGuarantees; SubscriptionType subscriptionType; private String subscriptionName; + // Whether the subscriptions the functions created/used should be deleted when the functions is deleted private Integer maxMessageRetries = -1; private String deadLetterTopic; diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py b/pulsar-functions/instance/src/main/python/Function_pb2.py index 3979d49..f5141d4 100644 --- a/pulsar-functions/instance/src/main/python/Function_pb2.py +++ b/pulsar-functions/instance/src/main/python/Function_pb2.py @@ -39,7 +39,7 @@ DESCRIPTOR = _descriptor.FileDescriptor( name='Function.proto', package='proto', syntax='proto3', - serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 \x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\xe8\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x1 [...] + serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 \x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\xe8\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x1 [...] ) _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor( @@ -63,8 +63,8 @@ _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=1744, - serialized_end=1823, + serialized_start=1773, + serialized_end=1852, ) _sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES) @@ -86,8 +86,8 @@ _SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=1825, - serialized_end=1869, + serialized_start=1854, + serialized_end=1898, ) _sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE) @@ -420,8 +420,8 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1073, - serialized_end=1134, + serialized_start=1102, + serialized_end=1163, ) _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor( @@ -457,8 +457,8 @@ _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1136, - serialized_end=1206, + serialized_start=1165, + serialized_end=1235, ) _SOURCESPEC = _descriptor.Descriptor( @@ -538,6 +538,13 @@ _SOURCESPEC = _descriptor.Descriptor( message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='cleanupSubscription', full_name='proto.SourceSpec.cleanupSubscription', index=10, + number=11, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), ], extensions=[ ], @@ -551,7 +558,7 @@ _SOURCESPEC = _descriptor.Descriptor( oneofs=[ ], serialized_start=722, - serialized_end=1206, + serialized_end=1235, ) @@ -623,8 +630,8 @@ _SINKSPEC = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1209, - serialized_end=1354, + serialized_start=1238, + serialized_end=1383, ) @@ -661,8 +668,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1356, - serialized_end=1428, + serialized_start=1385, + serialized_end=1457, ) @@ -713,8 +720,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1431, - serialized_end=1592, + serialized_start=1460, + serialized_end=1621, ) @@ -751,8 +758,8 @@ _INSTANCE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1594, - serialized_end=1675, + serialized_start=1623, + serialized_end=1704, ) @@ -789,8 +796,8 @@ _ASSIGNMENT = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1677, - serialized_end=1742, + serialized_start=1706, + serialized_end=1771, ) _FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = _PROCESSINGGUARANTEES diff --git a/pulsar-functions/instance/src/main/python/function_stats.py b/pulsar-functions/instance/src/main/python/function_stats.py index d3dc322..34793a5 100644 --- a/pulsar-functions/instance/src/main/python/function_stats.py +++ b/pulsar-functions/instance/src/main/python/function_stats.py @@ -103,7 +103,7 @@ class Stats(object): self._stat_total_received_1min = self.stat_total_received_1min.labels(*self.metrics_labels) # start time for windowed metrics - util.FixedTimer(60, self.reset).start() + util.FixedTimer(60, self.reset, name="windowed-metrics-timer").start() def get_total_received(self): return self._stat_total_received._value.get(); diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index cfd9eae..d86173b 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -42,7 +42,6 @@ import InstanceCommunication_pb2 from functools import partial from collections import namedtuple -from threading import Timer from function_stats import Stats Log = log.Log @@ -111,8 +110,6 @@ class PythonInstance(object): os.kill(os.getpid(), signal.SIGKILL) sys.exit(1) - Timer(self.expected_healthcheck_interval, self.process_spawner_health_check_timer).start() - def run(self): # Setup consumers and input deserializers mode = pulsar._pulsar.ConsumerType.Shared @@ -187,7 +184,8 @@ class PythonInstance(object): # start proccess spawner health check timer self.last_health_check_ts = time.time() if self.expected_healthcheck_interval > 0: - Timer(self.expected_healthcheck_interval, self.process_spawner_health_check_timer).start() + timer = util.FixedTimer(self.expected_healthcheck_interval, self.process_spawner_health_check_timer, name="health-check-timer") + timer.start() def actual_execution(self): Log.debug("Started Thread for executing the function") @@ -384,3 +382,19 @@ class PythonInstance(object): def join(self): self.queue.put(InternalQuitMessage(True), True) self.execution_thread.join() + self.close() + + def close(self): + Log.info("Closing python instance...") + if self.producer: + self.producer.close() + + if self.consumers: + for consumer in self.consumers.values(): + try: + consumer.close() + except: + pass + + if self.pulsar_client: + self.pulsar_client.close() diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py index b7b1bfc..c93d6e1 100644 --- a/pulsar-functions/instance/src/main/python/python_instance_main.py +++ b/pulsar-functions/instance/src/main/python/python_instance_main.py @@ -31,6 +31,7 @@ import time import zipfile import json import inspect +import threading import pulsar @@ -193,7 +194,8 @@ def main(): time.sleep(1) pyinstance.join() - sys.exit(1) + # make sure to close all non-daemon threads before this! + sys.exit(0) if __name__ == '__main__': main() diff --git a/pulsar-functions/instance/src/main/python/util.py b/pulsar-functions/instance/src/main/python/util.py index 0978f39..390aed1 100644 --- a/pulsar-functions/instance/src/main/python/util.py +++ b/pulsar-functions/instance/src/main/python/util.py @@ -76,10 +76,12 @@ def get_properties(fullyQualifiedName, instanceId): class FixedTimer(): - def __init__(self, t, hFunction): + def __init__(self, t, hFunction, name="timer-thread"): self.t = t self.hFunction = hFunction self.thread = Timer(self.t, self.handle_function) + self.thread.setName(name) + self.thread.setDaemon(True) def handle_function(self): self.hFunction() diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index 8d93764..cb5021b 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -98,6 +98,7 @@ message SourceSpec { * already present in the server */ string builtin = 8; string subscriptionName = 9; + bool cleanupSubscription = 11; } message SinkSpec { diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index d3046ba..14e68cc 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -179,18 +179,39 @@ class ProcessRuntime implements Runtime { } @Override - public void stop() { + public void stop() throws InterruptedException { if (timer != null) { timer.cancel(false); } - if (process != null) { - process.destroyForcibly(); - } if (channel != null) { channel.shutdown(); } channel = null; stub = null; + + // kill process + if (process != null) { + process.destroy(); + int i = 0; + // gracefully terminate at first + while(process.isAlive()) { + Thread.sleep(100); + if (i > 100) { + break; + } + i++; + } + + // forcibly kill after timeout + if (process.isAlive()) { + log.warn("Process for instance {} did not exit within timeout. Forcibly killing process...", + Utils.getFullyQualifiedInstanceId( + instanceConfig.getFunctionDetails().getTenant(), + instanceConfig.getFunctionDetails().getNamespace(), + instanceConfig.getFunctionDetails().getName(), instanceConfig.getInstanceId())); + process.destroyForcibly(); + } + } } @Override diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java index be049c3..2bd4644 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java @@ -124,6 +124,8 @@ class ThreadRuntime implements Runtime { } catch (InterruptedException e) { // ignore this } + // make sure JavaInstanceRunnable is closed + this.javaInstanceRunnable.close(); } } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index 2459cfd..04300db 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -118,6 +118,11 @@ public class FunctionConfigUtils { if (functionConfig.getTimeoutMs() != null) { sourceSpecBuilder.setTimeoutMs(functionConfig.getTimeoutMs()); } + if (functionConfig.getCleanupSubscription() != null) { + sourceSpecBuilder.setCleanupSubscription(functionConfig.getCleanupSubscription()); + } else { + sourceSpecBuilder.setCleanupSubscription(true); + } functionDetailsBuilder.setSource(sourceSpecBuilder); // Setup sink diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index 413070a..a972947 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -151,6 +151,12 @@ public class SinkConfigUtils { sourceSpecBuilder.setTimeoutMs(sinkConfig.getTimeoutMs()); } + if (sinkConfig.getCleanupSubscription() != null) { + sourceSpecBuilder.setCleanupSubscription(sinkConfig.getCleanupSubscription()); + } else { + sourceSpecBuilder.setCleanupSubscription(true); + } + functionDetailsBuilder.setSource(sourceSpecBuilder); // set up sink spec diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAction.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAction.java index 23a8154..ded8268 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAction.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAction.java @@ -31,7 +31,8 @@ public class FunctionAction { public enum Action { START, - STOP + STOP, + TERMINATE } private Action action; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index 46343a5..2ab4828 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.functions.worker; +import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.pulsar.common.functions.Utils.FILE; import static org.apache.pulsar.common.functions.Utils.HTTP; import static org.apache.pulsar.functions.utils.Utils.getSourceType; @@ -37,10 +38,13 @@ import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Collections; +import java.util.Map; import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import com.google.gson.Gson; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -50,8 +54,12 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.functions.instance.InstanceConfig; +import org.apache.pulsar.functions.instance.InstanceUtils; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.FunctionDetailsOrBuilder; @@ -78,35 +86,43 @@ public class FunctionActioner implements AutoCloseable { private volatile boolean running; private Thread actioner; private final ConnectorsManager connectorsManager; + private final PulsarAdmin pulsarAdmin; public FunctionActioner(WorkerConfig workerConfig, RuntimeFactory runtimeFactory, Namespace dlogNamespace, LinkedBlockingQueue actionQueue, - ConnectorsManager connectorsManager) { + ConnectorsManager connectorsManager, PulsarAdmin pulsarAdmin) { this.workerConfig = workerConfig; this.runtimeFactory = runtimeFactory; this.dlogNamespace = dlogNamespace; this.actionQueue = actionQueue; this.connectorsManager = connectorsManager; + this.pulsarAdmin = pulsarAdmin; actioner = new Thread(() -> { log.info("Starting Actioner Thread..."); while(running) { try { FunctionAction action = actionQueue.poll(1, TimeUnit.SECONDS); if (action == null) continue; - if (action.getAction() == FunctionAction.Action.START) { - try { - startFunction(action.getFunctionRuntimeInfo()); - } catch (Exception ex) { - FunctionDetails details = action.getFunctionRuntimeInfo().getFunctionInstance() - .getFunctionMetaData().getFunctionDetails(); - log.info("{}/{}/{} Error starting function", details.getTenant(), details.getNamespace(), - details.getName(), ex); - action.getFunctionRuntimeInfo().setStartupException(ex); - } - } else { - stopFunction(action.getFunctionRuntimeInfo()); + switch (action.getAction()) { + case START: + try { + startFunction(action.getFunctionRuntimeInfo()); + } catch (Exception ex) { + FunctionDetails details = action.getFunctionRuntimeInfo().getFunctionInstance() + .getFunctionMetaData().getFunctionDetails(); + log.info("{}/{}/{} Error starting function", details.getTenant(), details.getNamespace(), + details.getName(), ex); + action.getFunctionRuntimeInfo().setStartupException(ex); + } + break; + case STOP: + stopFunction(action.getFunctionRuntimeInfo()); + break; + case TERMINATE: + terminateFunction(action.getFunctionRuntimeInfo()); + break; } } catch (InterruptedException ex) { } @@ -266,6 +282,49 @@ public class FunctionActioner implements AutoCloseable { } } + private void terminateFunction(FunctionRuntimeInfo functionRuntimeInfo) { + FunctionDetails details = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData() + .getFunctionDetails(); + log.info("{}/{}/{}-{} Terminating function...", details.getTenant(), details.getNamespace(), details.getName(), + functionRuntimeInfo.getFunctionInstance().getInstanceId()); + + stopFunction(functionRuntimeInfo); + //cleanup subscriptions + if (details.getSource().getCleanupSubscription()) { + Map consumerSpecMap = details.getSource().getInputSpecsMap(); + consumerSpecMap.entrySet().forEach(new Consumer>() { + @Override + public void accept(Map.Entry stringConsumerSpecEntry) { + + Function.ConsumerSpec consumerSpec = stringConsumerSpecEntry.getValue(); + String topic = stringConsumerSpecEntry.getKey(); + String subscriptionName = functionRuntimeInfo + .getFunctionInstance().getFunctionMetaData() + .getFunctionDetails().getSource().getSubscriptionName(); + // if user specified subscription name is empty use default subscription name + if (isBlank(subscriptionName)) { + subscriptionName = InstanceUtils.getDefaultSubscriptionName( + functionRuntimeInfo.getFunctionInstance() + .getFunctionMetaData().getFunctionDetails()); + } + + try { + if (consumerSpec.getIsRegexPattern()) { + pulsarAdmin.namespaces().unsubscribeNamespace(TopicName.get(topic).getNamespace(), subscriptionName); + } else { + pulsarAdmin.topics().deleteSubscription(topic, subscriptionName); + } + } catch (PulsarAdminException e) { + log.warn("Failed to cleanup {} subscription for {}", subscriptionName, + FunctionDetailsUtils.getFullyQualifiedName( + functionRuntimeInfo.getFunctionInstance() + .getFunctionMetaData().getFunctionDetails()), e); + } + } + }); + } + } + private String getDownloadPackagePath(FunctionMetaData functionMetaData, int instanceId) { return StringUtils.join( new String[]{ diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 152c1db..fdd62d7 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -32,6 +32,7 @@ import org.apache.pulsar.common.functions.WorkerInfo; import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.common.policies.data.FunctionStats; import org.apache.pulsar.functions.instance.AuthenticationConfig; +import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.Assignment; import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory; import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory; @@ -102,8 +103,11 @@ public class FunctionRuntimeManager implements AutoCloseable{ @Getter boolean isInitializePhase = false; + private final FunctionMetaDataManager functionMetaDataManager; + public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerService, Namespace dlogNamespace, - MembershipManager membershipManager, ConnectorsManager connectorsManager) throws Exception { + MembershipManager membershipManager, ConnectorsManager connectorsManager, + FunctionMetaDataManager functionMetaDataManager) throws Exception { this.workerConfig = workerConfig; this.workerService = workerService; this.functionAdmin = workerService.getFunctionAdmin(); @@ -168,9 +172,10 @@ public class FunctionRuntimeManager implements AutoCloseable{ this.actionQueue = new LinkedBlockingQueue<>(); this.functionActioner = new FunctionActioner(this.workerConfig, runtimeFactory, - dlogNamespace, actionQueue, connectorsManager); + dlogNamespace, actionQueue, connectorsManager, workerService.getBrokerAdmin()); this.membershipManager = membershipManager; + this.functionMetaDataManager = functionMetaDataManager; } /** @@ -636,7 +641,17 @@ public class FunctionRuntimeManager implements AutoCloseable{ public synchronized void deleteAssignment(String fullyQualifiedInstanceId) { FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId); if (functionRuntimeInfo != null) { - this.insertStopAction(functionRuntimeInfo); + Function.FunctionDetails functionDetails = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails(); + + // check if this is part of a function delete operation or update operation + // TODO could be a race condition here if functionMetaDataTailer somehow does not receive the functionMeta prior to the functionAssignmentsTailer gets the assignment for the function. + if (this.functionMetaDataManager.containsFunction(functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName())) { + // function still exists thus probably an update or stop operation + this.insertStopAction(functionRuntimeInfo); + } else { + // function doesn't exist anymore thus we should terminate + this.insertTerminateAction(functionRuntimeInfo); + } this.deleteFunctionRuntimeInfo(fullyQualifiedInstanceId); } @@ -729,6 +744,19 @@ public class FunctionRuntimeManager implements AutoCloseable{ } } + void insertTerminateAction(FunctionRuntimeInfo functionRuntimeInfo) { + if (!this.isInitializePhase) { + FunctionAction functionAction = new FunctionAction(); + functionAction.setAction(FunctionAction.Action.TERMINATE); + functionAction.setFunctionRuntimeInfo(functionRuntimeInfo); + try { + actionQueue.put(functionAction); + } catch (InterruptedException ex) { + throw new RuntimeException("Interrupted while putting action"); + } + } + } + private Assignment findAssignment(String tenant, String namespace, String functionName, int instanceId) { String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(tenant, namespace, functionName, instanceId); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index 43404ce..30d5cbd 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -163,10 +163,7 @@ public class WorkerService { // create function runtime manager this.functionRuntimeManager = new FunctionRuntimeManager( - this.workerConfig, this, this.dlogNamespace, this.membershipManager, connectorsManager); - - // initialize function runtime manager - this.functionRuntimeManager.initialize(); + this.workerConfig, this, this.dlogNamespace, this.membershipManager, connectorsManager, functionMetaDataManager); // Setting references to managers in scheduler this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager); @@ -176,6 +173,9 @@ public class WorkerService { // initialize function metadata manager this.functionMetaDataManager.initialize(); + // initialize function runtime manager + this.functionRuntimeManager.initialize(); + authenticationService = new AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig)); // Starting cluster services diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java index a4926e3..65ead73 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java @@ -32,6 +32,7 @@ import java.net.UnknownHostException; import java.util.concurrent.LinkedBlockingQueue; import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData; import org.apache.pulsar.functions.runtime.Runtime; @@ -69,7 +70,7 @@ public class FunctionActionerTest { @SuppressWarnings("resource") FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, queue, - new ConnectorsManager(workerConfig)); + new ConnectorsManager(workerConfig), mock(PulsarAdmin.class)); Runtime runtime = mock(Runtime.class); Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder() .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant") @@ -112,7 +113,7 @@ public class FunctionActionerTest { @SuppressWarnings("resource") FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, queue, - new ConnectorsManager(workerConfig)); + new ConnectorsManager(workerConfig), mock(PulsarAdmin.class)); // (1) test with file url. functionActioner should be able to consider file-url and it should be able to call // RuntimeSpawner diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index 651e5d0..17d6642 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -85,8 +85,8 @@ public class FunctionRuntimeManagerTest { workerService, mock(Namespace.class), mock(MembershipManager.class), - mock(ConnectorsManager.class) - )); + mock(ConnectorsManager.class), + mock(FunctionMetaDataManager.class))); Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( Function.FunctionDetails.newBuilder() @@ -179,8 +179,8 @@ public class FunctionRuntimeManagerTest { workerService, mock(Namespace.class), mock(MembershipManager.class), - mock(ConnectorsManager.class) - )); + mock(ConnectorsManager.class), + mock(FunctionMetaDataManager.class))); Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( Function.FunctionDetails.newBuilder() @@ -225,8 +225,8 @@ public class FunctionRuntimeManagerTest { .get("worker-2").get("test-tenant/test-namespace/func-2:0"), assignment2); verify(functionRuntimeManager, times(0)).insertStartAction(any(FunctionRuntimeInfo.class)); - verify(functionRuntimeManager, times(1)).insertStopAction(any(FunctionRuntimeInfo.class)); - verify(functionRuntimeManager).insertStopAction(argThat(new ArgumentMatcher() { + verify(functionRuntimeManager, times(1)).insertTerminateAction(any(FunctionRuntimeInfo.class)); + verify(functionRuntimeManager).insertTerminateAction(argThat(new ArgumentMatcher() { @Override public boolean matches(Object o) { if (o instanceof FunctionRuntimeInfo) { @@ -244,7 +244,7 @@ public class FunctionRuntimeManagerTest { Assert.assertEquals(functionRuntimeManager.actionQueue.size(), 1); Assert.assertTrue(functionRuntimeManager.actionQueue.contains( new FunctionAction() - .setAction(FunctionAction.Action.STOP) + .setAction(FunctionAction.Action.TERMINATE) .setFunctionRuntimeInfo(new FunctionRuntimeInfo().setFunctionInstance( Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0) .build())))); @@ -277,8 +277,8 @@ public class FunctionRuntimeManagerTest { workerService, mock(Namespace.class), mock(MembershipManager.class), - mock(ConnectorsManager.class) - )); + mock(ConnectorsManager.class), + mock(FunctionMetaDataManager.class))); Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( Function.FunctionDetails.newBuilder() @@ -323,6 +323,9 @@ public class FunctionRuntimeManagerTest { functionRuntimeManager.processAssignment(assignment3); verify(functionRuntimeManager, times(1)).insertStopAction(any(FunctionRuntimeInfo.class)); + // make sure terminate is not called since this is a update operation + verify(functionRuntimeManager, times(0)).insertTerminateAction(any(FunctionRuntimeInfo.class)); + verify(functionRuntimeManager).insertStopAction(argThat(new ArgumentMatcher() { @Override public boolean matches(Object o) { @@ -470,8 +473,8 @@ public class FunctionRuntimeManagerTest { workerService, mock(Namespace.class), mock(MembershipManager.class), - mock(ConnectorsManager.class) - )); + mock(ConnectorsManager.class), + mock(FunctionMetaDataManager.class))); functionRuntimeManager.initialize(); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java index 678d338..527557b 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java @@ -138,15 +138,15 @@ public class MembershipManagerTest { doReturn(pulsarClient).when(workerService).getClient(); doReturn(workerConfig).when(workerService).getWorkerConfig(); doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin(); - + + FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( workerConfig, workerService, mock(Namespace.class), mock(MembershipManager.class), - mock(ConnectorsManager.class) - )); - FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); + mock(ConnectorsManager.class), + functionMetaDataManager)); MembershipManager membershipManager = spy(new MembershipManager(workerService, pulsarClient)); List workerInfoList = new LinkedList<>(); @@ -209,16 +209,16 @@ public class MembershipManagerTest { doReturn(pulsarClient).when(workerService).getClient(); doReturn(workerConfig).when(workerService).getWorkerConfig(); doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin(); - + + FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( workerConfig, workerService, mock(Namespace.class), mock(MembershipManager.class), - mock(ConnectorsManager.class) - )); + mock(ConnectorsManager.class), + functionMetaDataManager)); - FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient())); List workerInfoList = new LinkedList<>(); @@ -305,15 +305,15 @@ public class MembershipManagerTest { doReturn(pulsarClient).when(workerService).getClient(); doReturn(workerConfig).when(workerService).getWorkerConfig(); doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin(); - + + FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( workerConfig, workerService, mock(Namespace.class), mock(MembershipManager.class), - mock(ConnectorsManager.class) - )); - FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); + mock(ConnectorsManager.class), + functionMetaDataManager)); MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient())); List workerInfoList = new LinkedList<>(); @@ -380,15 +380,15 @@ public class MembershipManagerTest { doReturn(pulsarClient).when(workerService).getClient(); doReturn(workerConfig).when(workerService).getWorkerConfig(); doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin(); - + + FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( workerConfig, workerService, mock(Namespace.class), mock(MembershipManager.class), - mock(ConnectorsManager.class) - )); - FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); + mock(ConnectorsManager.class), + functionMetaDataManager)); MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient())); List workerInfoList = new LinkedList<>(); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 0469d80..23b7259 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -46,6 +46,7 @@ import org.apache.pulsar.common.policies.data.FunctionStats; import org.apache.pulsar.common.policies.data.FunctionStatus; import org.apache.pulsar.common.policies.data.SinkStatus; import org.apache.pulsar.common.policies.data.SourceStatus; +import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.functions.api.examples.AutoSchemaFunction; import org.apache.pulsar.functions.api.examples.serde.CustomObject; import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer; @@ -751,6 +752,10 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { // get function info getFunctionInfoNotFound(functionName); + + // make sure subscriptions are cleanup + checkSubscriptionsCleanup(inputTopicName); + } private static void submitExclamationFunction(Runtime runtime, @@ -943,6 +948,21 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { } } + private static void checkSubscriptionsCleanup(String topic) throws Exception { + try { + ContainerExecResult result = pulsarCluster.getAnyBroker().execCmd( + PulsarCluster.ADMIN_SCRIPT, + "topics", + "stats", + topic); + TopicStats topicStats = new Gson().fromJson(result.getStdout(), TopicStats.class); + assertEquals(topicStats.subscriptions.size(), 0); + + } catch (ContainerExecException e) { + fail("Command should have exited with non-zero"); + } + } + private static void getFunctionStatus(String functionName, int numMessages) throws Exception { ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd( PulsarCluster.ADMIN_SCRIPT,