From commits-return-24627-archive-asf-public=cust-asf.ponee.io@pulsar.apache.org Fri Mar 15 22:06:00 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 30617180627 for ; Fri, 15 Mar 2019 23:06:00 +0100 (CET) Received: (qmail 22651 invoked by uid 500); 15 Mar 2019 22:05:59 -0000 Mailing-List: contact commits-help@pulsar.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.apache.org Delivered-To: mailing list commits@pulsar.apache.org Received: (qmail 22642 invoked by uid 99); 15 Mar 2019 22:05:59 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Mar 2019 22:05:59 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 8A22A85388; Fri, 15 Mar 2019 22:05:58 +0000 (UTC) Date: Fri, 15 Mar 2019 22:05:58 +0000 To: "commits@pulsar.apache.org" Subject: [pulsar] branch master updated: Fix topic name logic for partitioned topics (#3693) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <155268755830.26173.3043374830704859814@gitbox.apache.org> From: jerrypeng@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: pulsar X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: fff02e2aa2064412dbae18b973eb2bb2abab25d8 X-Git-Newrev: fdaa9e3728e463bc67f5e946833d1dac392412e2 X-Git-Rev: fdaa9e3728e463bc67f5e946833d1dac392412e2 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new fdaa9e3 Fix topic name logic for partitioned topics (#3693) fdaa9e3 is described below commit fdaa9e3728e463bc67f5e946833d1dac392412e2 Author: Sanjeev Kulkarni AuthorDate: Fri Mar 15 15:05:53 2019 -0700 Fix topic name logic for partitioned topics (#3693) * Since partitioned topics have a -partition- affixed to the topic name, when doing explicit acking, check for the case to determine the right topic name * added unittests --- .../instance/src/main/python/contextimpl.py | 21 +++++++++++++++------ .../src/scripts/run_python_instance_tests.sh | 3 ++- .../src/test/python/test_python_instance.py | 22 +++++++++++++++++++++- 3 files changed, 38 insertions(+), 8 deletions(-) diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py index 638e64f..f3a9710 100644 --- a/pulsar-functions/instance/src/main/python/contextimpl.py +++ b/pulsar-functions/instance/src/main/python/contextimpl.py @@ -23,6 +23,7 @@ """contextimpl.py: ContextImpl class that implements the Context interface """ +import re import time import os import json @@ -54,7 +55,6 @@ class ContextImpl(pulsar.Context): self.publish_producers = {} self.publish_serializers = {} self.message = None - self.current_input_topic_name = None self.current_start_time = None self.user_config = json.loads(instance_config.function_details.userConfig) \ if instance_config.function_details.userConfig \ @@ -73,7 +73,6 @@ class ContextImpl(pulsar.Context): # Called on a per message basis to set the context for the current message def set_current_message_context(self, message, topic): self.message = message - self.current_input_topic_name = topic self.current_start_time = time.time() def get_message_id(self): @@ -89,7 +88,7 @@ class ContextImpl(pulsar.Context): return self.message.properties() def get_current_message_topic_name(self): - return self.current_input_topic_name + return self.message.topic_name() def get_function_name(self): return self.instance_config.function_details.name @@ -176,9 +175,19 @@ class ContextImpl(pulsar.Context): self.publish_producers[topic_name].send_async(output_bytes, partial(self.callback_wrapper, callback, topic_name, self.get_message_id()), properties=properties) def ack(self, msgid, topic): - if topic not in self.consumers: - raise ValueError('Invalid topicname %s' % topic) - self.consumers[topic].acknowledge(msgid) + topic_consumer = None + if topic in self.consumers: + topic_consumer = self.consumers[topic] + else: + # if this topic is a partitioned topic + m = re.search('(.+)-partition-(\d+)', topic) + if not m: + raise ValueError('Invalid topicname %s' % topic) + elif m.group(1) in self.consumers: + topic_consumer = self.consumers[m.group(1)] + else: + raise ValueError('Invalid topicname %s' % topic) + topic_consumer.acknowledge(msgid) def get_and_reset_metrics(self): metrics = self.get_metrics() diff --git a/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh b/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh index 9b33c24..7005b9b 100644 --- a/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh +++ b/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh @@ -22,9 +22,10 @@ # Make sure dependencies are installed pip install mock --user pip install protobuf --user +pip install fastavro --user CUR_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )" PULSAR_HOME=$CUR_DIR/../../../../ # run instance tests -PULSAR_HOME=${PULSAR_HOME} PYTHONPATH=${PULSAR_HOME}/pulsar-functions/instance/target/python-instance python -m unittest discover -v ${PULSAR_HOME}/pulsar-functions/instance/target/python-instance/tests \ No newline at end of file +PULSAR_HOME=${PULSAR_HOME} PYTHONPATH=${PULSAR_HOME}/pulsar-functions/instance/target/python-instance python -m unittest discover -v ${PULSAR_HOME}/pulsar-functions/instance/target/python-instance/tests diff --git a/pulsar-functions/instance/src/test/python/test_python_instance.py b/pulsar-functions/instance/src/test/python/test_python_instance.py index 0071e2f..8b92fa8 100644 --- a/pulsar-functions/instance/src/test/python/test_python_instance.py +++ b/pulsar-functions/instance/src/test/python/test_python_instance.py @@ -20,9 +20,12 @@ # DEPENDENCIES: unittest2,mock +from mock import Mock +import sys +sys.modules['prometheus_client'] = Mock() + from contextimpl import ContextImpl from python_instance import InstanceConfig -from mock import Mock from pulsar import Message import Function_pb2 @@ -68,4 +71,21 @@ class TestContextImpl(unittest.TestCase): self.assertEqual(args[1].args[1], "test_topic_name") self.assertEqual(args[1].args[2], "test_message_id") + def test_context_ack_partitionedtopic(self): + instance_id = 'test_instance_id' + function_id = 'test_function_id' + function_version = 'test_function_version' + function_details = Function_pb2.FunctionDetails() + max_buffered_tuples = 100; + instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples) + logger = log.Log + pulsar_client = Mock() + user_code=__file__ + consumer = Mock() + consumer.acknowledge = Mock(return_value=None) + consumers = {"mytopic" : consumer} + context_impl = ContextImpl(instance_config, logger, pulsar_client, user_code, consumers, None, None, None, None) + context_impl.ack("test_message_id", "mytopic-partition-3") + args, kwargs = consumer.acknowledge.call_args + self.assertEqual(args[0], "test_message_id") \ No newline at end of file