pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [pulsar] branch master updated: [functions][state] Python state support (#2714)
Date Mon, 11 Feb 2019 09:53:51 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 21b8c9d  [functions][state] Python state support (#2714)
21b8c9d is described below

commit 21b8c9d598f72247daae11eccead26146c39660b
Author: Sijie Guo <guosijie@gmail.com>
AuthorDate: Mon Feb 11 17:53:46 2019 +0800

    [functions][state] Python state support (#2714)
    
    *Motivation*
    
    Add state support in python functions
    
    *Changes*
    
    - Bump bookkeeper version, so the table service has the changes to support python functions
    - Add state to python function
---
 .../python/pulsar/functions/context.py             |  20 ++
 pulsar-client-cpp/python/setup.py                  |   6 +-
 .../instance/src/main/python/contextimpl.py        |  15 +-
 .../instance/src/main/python/python_instance.py    |  30 ++-
 .../src/main/python/python_instance_main.py        |  12 +-
 .../instance/src/main/python/state_context.py      | 156 +++++++++++++
 .../src/test/python/test_python_instance.py        |   2 +-
 .../python-examples/wordcount_function.py          |  34 +++
 .../pulsar/functions/runtime/RuntimeUtils.java     |   3 +-
 .../functions/runtime/KubernetesRuntimeTest.java   |   3 +-
 .../functions/runtime/ProcessRuntimeTest.java      |   6 +-
 .../integration/functions/PulsarStateTest.java     | 246 +++++++++++++++++++++
 .../topologies/PulsarStandaloneTestBase.java       |   3 +-
 13 files changed, 522 insertions(+), 14 deletions(-)

diff --git a/pulsar-client-cpp/python/pulsar/functions/context.py b/pulsar-client-cpp/python/pulsar/functions/context.py
index 54ba7f5..26e8606 100644
--- a/pulsar-client-cpp/python/pulsar/functions/context.py
+++ b/pulsar-client-cpp/python/pulsar/functions/context.py
@@ -143,3 +143,23 @@ class Context(object):
   def ack(self, msgid, topic):
     """ack this message id"""
     pass
+
+  @abstractmethod
+  def incr_counter(self, key, amount):
+    """incr the counter of a given key in the managed state"""
+    pass
+
+  @abstractmethod
+  def get_counter(self, key):
+    """get the counter of a given key in the managed state"""
+    pass
+
+  @abstractmethod
+  def put_state(self, key, value):
+    """update the value of a given key in the managed state"""
+    pass
+
+  @abstractmethod
+  def get_state(self, key):
+    """get the value of a given key in the managed state"""
+    pass
diff --git a/pulsar-client-cpp/python/setup.py b/pulsar-client-cpp/python/setup.py
index db165ae..7524388 100644
--- a/pulsar-client-cpp/python/setup.py
+++ b/pulsar-client-cpp/python/setup.py
@@ -63,11 +63,13 @@ class my_build_ext(build_ext.build_ext):
 
 
 dependencies = [
-    'grpcio', 'protobuf',
-    'six',
     'fastavro',
+    'grpcio',
+    'protobuf',
+    'six',
 
     # functions dependencies
+    "apache-bookkeeper-client",
     "prometheus_client",
     "ratelimit"
 ]
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index 6b56163..622e679 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -38,13 +38,14 @@ class ContextImpl(pulsar.Context):
   # add label to indicate user metric
   user_metrics_label_names = Stats.metrics_label_names + ["metric"]
 
-  def __init__(self, instance_config, logger, pulsar_client, user_code, consumers, secrets_provider,
metrics_labels):
+  def __init__(self, instance_config, logger, pulsar_client, user_code, consumers, secrets_provider,
metrics_labels, state_context):
     self.instance_config = instance_config
     self.log = logger
     self.pulsar_client = pulsar_client
     self.user_code_dir = os.path.dirname(user_code)
     self.consumers = consumers
     self.secrets_provider = secrets_provider
+    self.state_context = state_context
     self.publish_producers = {}
     self.publish_serializers = {}
     self.message = None
@@ -186,3 +187,15 @@ class ContextImpl(pulsar.Context):
       metrics_map["%s%s_count" % (Stats.USER_METRIC_PREFIX, metric_name)] = user_metric._count.get()
 
     return metrics_map
+
+  def incr_counter(self, key, amount):
+    return self.state_context.incr(key, amount)
+
+  def get_counter(self, key):
+    return self.state_context.get_amount(key)
+
+  def put_state(self, key, value):
+    return self.state_context.put(key, value)
+
+  def get_state(self, key):
+    return self.state_context.get_value(key)
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index 575d992..cb67b6a 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -40,6 +40,9 @@ import log
 import util
 import InstanceCommunication_pb2
 
+# state dependencies
+import state_context
+
 from functools import partial
 from collections import namedtuple
 from function_stats import Stats
@@ -67,8 +70,18 @@ def base64ify(bytes_or_str):
         return output_bytes
 
 class PythonInstance(object):
-  def __init__(self, instance_id, function_id, function_version, function_details, max_buffered_tuples,
-               expected_healthcheck_interval, user_code, pulsar_client, secrets_provider,
cluster_name):
+  def __init__(self,
+               instance_id,
+               function_id,
+               function_version,
+               function_details,
+               max_buffered_tuples,
+               expected_healthcheck_interval,
+               user_code,
+               pulsar_client,
+               secrets_provider,
+               cluster_name,
+               state_storage_serviceurl):
     self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_details,
max_buffered_tuples)
     self.user_code = user_code
     self.queue = queue.Queue(max_buffered_tuples)
@@ -76,6 +89,7 @@ class PythonInstance(object):
     if function_details.logTopic is not None and function_details.logTopic != "":
       self.log_topic_handler = log.LogTopicHandler(str(function_details.logTopic), pulsar_client)
     self.pulsar_client = pulsar_client
+    self.state_storage_serviceurl = state_storage_serviceurl
     self.input_serdes = {}
     self.consumers = {}
     self.output_serde = None
@@ -91,6 +105,7 @@ class PythonInstance(object):
     self.timeout_ms = function_details.source.timeoutMs if function_details.source.timeoutMs
> 0 else None
     self.expected_healthcheck_interval = expected_healthcheck_interval
     self.secrets_provider = secrets_provider
+    self.state_context = state_context.NullStateContext()
     self.metrics_labels = [function_details.tenant,
                            "%s/%s" % (function_details.tenant, function_details.namespace),
                            function_details.name,
@@ -111,6 +126,9 @@ class PythonInstance(object):
       sys.exit(1)
 
   def run(self):
+    # Setup state
+    self.state_context = self.setup_state()
+
     # Setup consumers and input deserializers
     mode = pulsar._pulsar.ConsumerType.Shared
     if self.instance_config.function_details.source.subscriptionType == Function_pb2.SubscriptionType.Value("FAILOVER"):
@@ -176,7 +194,7 @@ class PythonInstance(object):
 
     self.contextimpl = contextimpl.ContextImpl(self.instance_config, Log, self.pulsar_client,
                                                self.user_code, self.consumers,
-                                               self.secrets_provider, self.metrics_labels)
+                                               self.secrets_provider, self.metrics_labels,
self.state_context)
     # Now launch a thread that does execution
     self.execution_thread = threading.Thread(target=self.actual_execution)
     self.execution_thread.start()
@@ -287,6 +305,12 @@ class PythonInstance(object):
                         self.instance_config.instance_id)
       )
 
+  def setup_state(self):
+    table_ns = "%s_%s" % (str(self.instance_config.function_details.tenant),
+                          str(self.instance_config.function_details.namespace))
+    table_name = str(self.instance_config.function_details.name)
+    return state_context.create_state_context(self.state_storage_serviceurl, table_ns, table_name)
+
   def message_listener(self, serde, consumer, message):
     # increment number of received records from source
     self.stats.incr_total_received()
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index c93d6e1..8c5bf8a 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -44,6 +44,7 @@ import util
 import prometheus_client_fix
 
 from google.protobuf import json_format
+from bookkeeper.kv.client import Client
 
 to_run = True
 Log = log.Log
@@ -84,6 +85,7 @@ def main():
   parser.add_argument('--install_usercode_dependencies', required=False, help='For packaged
python like wheel files, do we need to install all dependencies', type=bool)
   parser.add_argument('--dependency_repository', required=False, help='For packaged python
like wheel files, which repository to pull the dependencies from')
   parser.add_argument('--extra_dependency_repository', required=False, help='For packaged
python like wheel files, any extra repository to pull the dependencies from')
+  parser.add_argument('--state_storage_serviceurl', required=False, help='Managed State Storage
Service Url')
   parser.add_argument('--cluster_name', required=True, help='The name of the cluster this
instance is running on')
 
   args = parser.parse_args()
@@ -158,6 +160,10 @@ def main():
      tls_trust_cert_path =  args.tls_trust_cert_path
   pulsar_client = pulsar.Client(args.pulsar_serviceurl, authentication, 30, 1, 1, 50000,
None, use_tls, tls_trust_cert_path, tls_allow_insecure_connection)
 
+  state_storage_serviceurl = None
+  if args.state_storage_serviceurl is not None:
+    state_storage_serviceurl = str(args.state_storage_serviceurl)
+
   secrets_provider = None
   if args.secrets_provider is not None:
     secrets_provider = util.import_class(os.path.dirname(inspect.getfile(inspect.currentframe())),
str(args.secrets_provider))
@@ -178,7 +184,11 @@ def main():
                                               str(args.function_version), function_details,
                                               int(args.max_buffered_tuples),
                                               int(args.expected_healthcheck_interval),
-                                              str(args.py), pulsar_client, secrets_provider,
args.cluster_name)
+                                              str(args.py),
+                                              pulsar_client,
+                                              secrets_provider,
+                                              args.cluster_name,
+                                              state_storage_serviceurl)
   pyinstance.run()
   server_instance = server.serve(args.port, pyinstance)
 
diff --git a/pulsar-functions/instance/src/main/python/state_context.py b/pulsar-functions/instance/src/main/python/state_context.py
new file mode 100644
index 0000000..ef8bbf4
--- /dev/null
+++ b/pulsar-functions/instance/src/main/python/state_context.py
@@ -0,0 +1,156 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# -*- encoding: utf-8 -*-
+
+"""state_context.py: state context for accessing managed state
+"""
+from abc import abstractmethod
+from bookkeeper import admin, kv
+from bookkeeper.common.exceptions import NamespaceNotFoundError, StreamNotFoundError, KeyNotFoundError
+from bookkeeper.proto import stream_pb2
+from bookkeeper.proto.stream_pb2 import HASH
+from bookkeeper.proto.stream_pb2 import TABLE
+from bookkeeper.types import StorageClientSettings
+
+
+def new_bk_table_conf(num_ranges):
+    """Create a table configuration with the specified `num_ranges`"""
+    return stream_pb2.StreamConfiguration(
+        key_type=HASH,
+        min_num_ranges=num_ranges,
+        initial_num_ranges=num_ranges,
+        split_policy=stream_pb2.SplitPolicy(
+            type=stream_pb2.SplitPolicyType.values()[0],
+            fixed_range_policy=stream_pb2.FixedRangeSplitPolicy(
+                num_ranges=2
+            )
+        ),
+        rolling_policy=stream_pb2.SegmentRollingPolicy(
+            size_policy=stream_pb2.SizeBasedSegmentRollingPolicy(
+                max_segment_size=128 * 1024 * 1024
+            )
+        ),
+        retention_policy=stream_pb2.RetentionPolicy(
+            time_policy=stream_pb2.TimeBasedRetentionPolicy(
+                retention_minutes=-1
+            )
+        ),
+        storage_type=TABLE
+    )
+
+
+def create_state_context(state_storage_serviceurl, table_ns, table_name):
+    """Create the state context based on state storage serviceurl"""
+    if state_storage_serviceurl is None:
+        return NullStateContext()
+    else:
+        return BKManagedStateContext(state_storage_serviceurl, table_ns, table_name)
+
+
+class StateContext(object):
+    """Interface defining operations on managed state"""
+
+    @abstractmethod
+    def incr(self, key, amount):
+        pass
+
+    @abstractmethod
+    def put(self, key, value):
+        pass
+
+    @abstractmethod
+    def get_value(self, key):
+        pass
+
+    @abstractmethod
+    def get_amount(self, key):
+        pass
+
+
+class NullStateContext(StateContext):
+    """A state context that does nothing"""
+
+    def incr(self, key, amount):
+        return
+
+    def put(self, key, value):
+        return
+
+    def get_value(self, key):
+        return None
+
+    def get_amount(self, key):
+        return None
+
+
+class BKManagedStateContext(StateContext):
+    """A state context that access bookkeeper managed state"""
+
+    def __init__(self, state_storage_serviceurl, table_ns, table_name):
+        client_settings = StorageClientSettings(
+          service_uri=state_storage_serviceurl)
+        admin_client = admin.client.Client(
+          storage_client_settings=client_settings)
+        # create namespace and table if needed
+        ns = admin_client.namespace(table_ns)
+        try:
+            ns.get(stream_name=table_name)
+        except NamespaceNotFoundError:
+            admin_client.namespaces().create(namespace=table_ns)
+            # TODO: make number of table ranges configurable
+            table_conf = new_bk_table_conf(1)
+            ns.create(
+                stream_name=table_name,
+                stream_config=table_conf)
+        except StreamNotFoundError:
+            # TODO: make number of table ranges configurable
+            table_conf = new_bk_table_conf(1)
+            ns.create(
+                stream_name=table_name,
+                stream_config=table_conf)
+        self.__client__ = kv.Client(namespace=table_ns)
+        self.__table__ = self.__client__.table(table_name=table_name)
+
+    def incr(self, key, amount):
+        return self.__table__.incr_str(key, amount)
+
+    def get_amount(self, key):
+        try:
+            kv = self.__table__.get_str(key)
+            if kv is not None:
+                return kv.number_value
+            else:
+                return None
+        except KeyNotFoundError:
+            return None
+
+    def get_value(self, key):
+        try:
+            kv = self.__table__.get_str(key)
+            if kv is not None:
+                return kv.value
+            else:
+                return None
+        except KeyNotFoundError:
+            return None
+
+    def put(self, key, value):
+        return self.__table__.put_str(key, value)
diff --git a/pulsar-functions/instance/src/test/python/test_python_instance.py b/pulsar-functions/instance/src/test/python/test_python_instance.py
index b865a9d..9c90597 100644
--- a/pulsar-functions/instance/src/test/python/test_python_instance.py
+++ b/pulsar-functions/instance/src/test/python/test_python_instance.py
@@ -48,7 +48,7 @@ class TestContextImpl(unittest.TestCase):
     pulsar_client.create_producer = Mock(return_value=producer)
     user_code=__file__
     consumers = None
-    context_impl = ContextImpl(instance_config, logger, pulsar_client, user_code, consumers,
None, None)
+    context_impl = ContextImpl(instance_config, logger, pulsar_client, user_code, consumers,
None, None, None)
 
     context_impl.publish("test_topic_name", "test_message")
 
diff --git a/pulsar-functions/python-examples/wordcount_function.py b/pulsar-functions/python-examples/wordcount_function.py
new file mode 100644
index 0000000..d84eb95
--- /dev/null
+++ b/pulsar-functions/python-examples/wordcount_function.py
@@ -0,0 +1,34 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+from pulsar import Function
+
+# The classic ExclamationFunction that appends an exclamation at the end
+# of the input
+class WordCountFunction(Function):
+    def __init__(self):
+        pass
+
+    def process(self, input, context):
+        words = input.split()
+        for word in words:
+            context.incr_counter(word, 1)
+        return input + "!"
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 3db9f5d..e74ec20 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -169,8 +169,7 @@ class RuntimeUtils {
         args.add(String.valueOf(metricsPort));
 
         // state storage configs
-        if (null != stateStorageServiceUrl
-                && instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA)
{
+        if (null != stateStorageServiceUrl) {
             args.add("--state_storage_serviceurl");
             args.add(stateStorageServiceUrl);
         }
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index f3a72e0..95afdf3 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -316,7 +316,7 @@ public class KubernetesRuntimeTest {
             pythonPath = "";
             metricsPortArg = 31;
         } else {
-            totalArgs = 37;
+            totalArgs = 39;
             portArg = 30;
             configArg = 10;
             metricsPortArg = 32;
@@ -342,6 +342,7 @@ public class KubernetesRuntimeTest {
                 + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port " + args.get(portArg) + " --metrics_port
" + args.get(metricsPortArg)
+                + " --state_storage_serviceurl bk://localhost:4181"
                 + " --expected_healthcheck_interval -1";
         if (secretsAttached) {
             expectedArgs += " --secrets_provider secretsprovider.ClearTextSecretsProvider"
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 323943c..5460fe3 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -310,7 +310,7 @@ public class ProcessRuntimeTest {
         ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l);
         List<String> args = container.getProcessArgs();
 
-        int totalArgs = 34;
+        int totalArgs = 36;
         int portArg = 23;
         int metricsPortArg = 25;
         String pythonPath = "";
@@ -325,7 +325,9 @@ public class ProcessRuntimeTest {
                 + " --function_version " + config.getFunctionVersion()
                 + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(portArg) + " --metrics_port
" + args.get(metricsPortArg)
+                + " --max_buffered_tuples 1024 --port " + args.get(portArg)
+                + " --metrics_port " + args.get(metricsPortArg)
+                + " --state_storage_serviceurl bk://localhost:4181"
                 + " --expected_healthcheck_interval 30"
                 + " --secrets_provider secretsprovider.ClearTextSecretsProvider"
                 + " --secrets_provider_config '{\"Config\":\"Value\"}'"
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
new file mode 100644
index 0000000..eeefd6e
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.AssertJUnit.fail;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
+import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
+import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testng.annotations.Test;
+
+/**
+ * State related test cases.
+ */
+public class PulsarStateTest extends PulsarStandaloneTestSuite {
+
+    public static final String WORDCOUNT_PYTHON_CLASS =
+        "wordcount_function.WordCountFunction";
+
+    public static final String WORDCOUNT_PYTHON_FILE = "wordcount_function.py";
+
+
+    @Test
+    public void testPythonWordCountFunction() throws Exception {
+        String inputTopicName = "test-wordcount-py-input-" + randomName(8);
+        String outputTopicName = "test-wordcount-py-output-" + randomName(8);
+        String functionName = "test-wordcount-py-fn-" + randomName(8);
+
+        final int numMessages = 10;
+
+        // submit the exclamation function
+        submitExclamationFunction(
+            Runtime.PYTHON, inputTopicName, outputTopicName, functionName);
+
+        // get function info
+        getFunctionInfoSuccess(functionName);
+
+        // publish and consume result
+        publishAndConsumeMessages(inputTopicName, outputTopicName, numMessages);
+
+        // get function status
+        getFunctionStatus(functionName, numMessages);
+
+        // get state
+        queryState(functionName, "hello", numMessages);
+        queryState(functionName, "test", numMessages);
+        for (int i = 0; i < numMessages; i++) {
+            queryState(functionName, "message-" + i, 1);
+        }
+
+        // delete function
+        deleteFunction(functionName);
+
+        // get function info
+        getFunctionInfoNotFound(functionName);
+    }
+
+    private static void submitExclamationFunction(Runtime runtime,
+                                                  String inputTopicName,
+                                                  String outputTopicName,
+                                                  String functionName) throws Exception {
+        submitFunction(
+            runtime,
+            inputTopicName,
+            outputTopicName,
+            functionName,
+            getExclamationClass(runtime),
+            Schema.BYTES);
+    }
+
+    protected static String getExclamationClass(Runtime runtime) {
+        if (Runtime.PYTHON == runtime) {
+            return WORDCOUNT_PYTHON_CLASS;
+        } else {
+            throw new IllegalArgumentException("Unsupported runtime : " + runtime);
+        }
+    }
+
+    private static <T> void submitFunction(Runtime runtime,
+                                           String inputTopicName,
+                                           String outputTopicName,
+                                           String functionName,
+                                           String functionClass,
+                                           Schema<T> inputTopicSchema) throws Exception
{
+        CommandGenerator generator;
+        generator = CommandGenerator.createDefaultGenerator(inputTopicName, functionClass);
+        generator.setSinkTopic(outputTopicName);
+        generator.setFunctionName(functionName);
+        String command;
+        if (Runtime.JAVA == runtime) {
+            command = generator.generateCreateFunctionCommand();
+        } else if (Runtime.PYTHON == runtime) {
+            generator.setRuntime(runtime);
+            command = generator.generateCreateFunctionCommand(WORDCOUNT_PYTHON_FILE);
+        } else {
+            throw new IllegalArgumentException("Unsupported runtime : " + runtime);
+        }
+        String[] commands = {
+            "sh", "-c", command
+        };
+        ContainerExecResult result = container.execCmd(
+            commands);
+        assertTrue(result.getStdout().contains("\"Created successfully\""));
+
+        ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName),
inputTopicSchema);
+    }
+
+    private static <T> void ensureSubscriptionCreated(String inputTopicName,
+                                                      String subscriptionName,
+                                                      Schema<T> inputTopicSchema)
+            throws Exception {
+        // ensure the function subscription exists before we start producing messages
+        try (PulsarClient client = PulsarClient.builder()
+            .serviceUrl(container.getPlainTextServiceUrl())
+            .build()) {
+            try (Consumer<T> ignored = client.newConsumer(inputTopicSchema)
+                .topic(inputTopicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName(subscriptionName)
+                .subscribe()) {
+            }
+        }
+    }
+
+    private static void getFunctionInfoSuccess(String functionName) throws Exception {
+        ContainerExecResult result = container.execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "get",
+            "--tenant", "public",
+            "--namespace", "default",
+            "--name", functionName
+        );
+        assertTrue(result.getStdout().contains("\"name\": \"" + functionName + "\""));
+    }
+
+    private static void getFunctionInfoNotFound(String functionName) throws Exception {
+        try {
+            container.execCmd(
+                    PulsarCluster.ADMIN_SCRIPT,
+                    "functions",
+                    "get",
+                    "--tenant", "public",
+                    "--namespace", "default",
+                    "--name", functionName);
+            fail("Command should have exited with non-zero");
+        } catch (ContainerExecException e) {
+            assertTrue(e.getResult().getStderr().contains("Reason: Function " + functionName
+ " doesn't exist"));
+        }
+    }
+
+    private static void getFunctionStatus(String functionName, int numMessages) throws Exception
{
+        ContainerExecResult result = container.execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "getstatus",
+            "--tenant", "public",
+            "--namespace", "default",
+            "--name", functionName
+        );
+        assertTrue(result.getStdout().contains("\"running\" : true"));
+        assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\" : " + numMessages));
+    }
+
+    private static void queryState(String functionName, String key, int amount)
+        throws Exception {
+        ContainerExecResult result = container.execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "querystate",
+            "--tenant", "public",
+            "--namespace", "default",
+            "--name", functionName,
+            "--key", key
+        );
+        assertTrue(result.getStdout().contains("\"numberValue\": " + amount));
+    }
+
+    private static void publishAndConsumeMessages(String inputTopic,
+                                                  String outputTopic,
+                                                  int numMessages) throws Exception {
+        @Cleanup PulsarClient client = PulsarClient.builder()
+            .serviceUrl(container.getPlainTextServiceUrl())
+            .build();
+        @Cleanup Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
+            .topic(outputTopic)
+            .subscriptionType(SubscriptionType.Exclusive)
+            .subscriptionName("test-sub")
+            .subscribe();
+        @Cleanup Producer<byte[]> producer = client.newProducer(Schema.BYTES)
+            .topic(inputTopic)
+            .create();
+
+        for (int i = 0; i < numMessages; i++) {
+            producer.send(("hello test message-" + i).getBytes(UTF_8));
+        }
+
+        for (int i = 0; i < numMessages; i++) {
+            Message<byte[]> msg = consumer.receive();
+            assertEquals("hello test message-" + i + "!", new String(msg.getValue(), UTF_8));
+        }
+    }
+
+    private static void deleteFunction(String functionName) throws Exception {
+        ContainerExecResult result = container.execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "delete",
+            "--tenant", "public",
+            "--namespace", "default",
+            "--name", functionName
+        );
+        assertTrue(result.getStdout().contains("Deleted successfully"));
+        assertTrue(result.getStderr().isEmpty());
+    }
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
index 46a6c0d..6c3d982 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
@@ -61,7 +61,8 @@ public abstract class PulsarStandaloneTestBase extends PulsarTestBase {
         String clusterName = PulsarClusterTestBase.randomName(8);
         container = new StandaloneContainer(clusterName, pulsarImageName)
             .withNetwork(network)
-            .withNetworkAliases(StandaloneContainer.NAME + "-" + clusterName);
+            .withNetworkAliases(StandaloneContainer.NAME + "-" + clusterName)
+            .withEnv("PF_stateStorageServiceUrl", "bk://localhost:4181");
         container.start();
         log.info("Pulsar cluster {} is up running:", clusterName);
         log.info("\tBinary Service Url : {}", container.getPlainTextServiceUrl());


Mime
View raw message