From commits-return-13424-archive-asf-public=cust-asf.ponee.io@pulsar.incubator.apache.org Mon Aug 27 20:39:23 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 58952180674 for ; Mon, 27 Aug 2018 20:39:22 +0200 (CEST) Received: (qmail 10403 invoked by uid 500); 27 Aug 2018 18:39:21 -0000 Mailing-List: contact commits-help@pulsar.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.incubator.apache.org Delivered-To: mailing list commits@pulsar.incubator.apache.org Received: (qmail 10394 invoked by uid 99); 27 Aug 2018 18:39:21 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 27 Aug 2018 18:39:21 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id D512781E13; Mon, 27 Aug 2018 18:39:20 +0000 (UTC) Date: Mon, 27 Aug 2018 18:39:20 +0000 To: "commits@pulsar.apache.org" Subject: [incubator-pulsar] branch branch-2.1 updated: Issue #2330: change getTopicName in MultiTopicsConsumer (#2346) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <153539516069.23967.14855265291442396114@gitbox.apache.org> From: sijie@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-pulsar X-Git-Refname: refs/heads/branch-2.1 X-Git-Reftype: branch X-Git-Oldrev: 96bf00fa12bf74db57d36f40989e233752288351 X-Git-Newrev: 259594165f5ee3c69a29629534af81011cab4ef1 X-Git-Rev: 259594165f5ee3c69a29629534af81011cab4ef1 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/branch-2.1 by this push: new 2595941 Issue #2330: change getTopicName in MultiTopicsConsumer (#2346) 2595941 is described below commit 259594165f5ee3c69a29629534af81011cab4ef1 Author: Jia Zhai AuthorDate: Tue Aug 28 02:38:41 2018 +0800 Issue #2330: change getTopicName in MultiTopicsConsumer (#2346) * change getTopicName in MultiTopicsConsumer * change following sijie's comments * keep both topicName and topicPartitonName in consumer to avoid new string --- .../apache/pulsar/client/impl/ConsumerImpl.java | 8 +++++++ .../client/impl/MultiTopicsConsumerImpl.java | 9 ++++---- .../pulsar/client/impl/TopicMessageIdImpl.java | 27 ++++++++++++++++++---- .../pulsar/client/impl/TopicMessageImpl.java | 20 ++++++++++++---- .../client/impl/UnAckedTopicMessageTracker.java | 4 ++-- .../pulsar/client/impl/MessageIdCompareToTest.java | 6 ++++- .../tests/integration/semantics/SemanticsTest.java | 2 +- 7 files changed, 60 insertions(+), 16 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 1a7b67b..fe0e2c1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -131,6 +131,8 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final SubscriptionInitialPosition subscriptionInitialPosition; private final ConnectionHandler connectionHandler; + private final String topicNameWithoutPartition; + enum SubscriptionMode { // Make the subscription to be backed by a durable cursor that will retain messages and persist the current // position @@ -203,6 +205,8 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle NonPersistentAcknowledgmentGroupingTracker.of(); } + topicNameWithoutPartition = topicName.getPartitionedTopicName(); + grabCnx(); } @@ -1458,6 +1462,10 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle this.connectionHandler.grabCnx(); } + public String getTopicNameWithoutPartition() { + return topicNameWithoutPartition; + } + private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index bbfb3f3..6d5d56d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -230,7 +230,8 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { checkArgument(message instanceof MessageImpl); lock.writeLock().lock(); try { - TopicMessageImpl topicMessage = new TopicMessageImpl<>(consumer.getTopic(), message); + TopicMessageImpl topicMessage = new TopicMessageImpl<>( + consumer.getTopic(), consumer.getTopicNameWithoutPartition(), message); unAckedMessageTracker.add(topicMessage.getMessageId()); if (log.isDebugEnabled()) { @@ -369,7 +370,7 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { } if (ackType == AckType.Cumulative) { - Consumer individualConsumer = consumers.get(topicMessageId.getTopicName()); + Consumer individualConsumer = consumers.get(topicMessageId.getTopicPartitionName()); if (individualConsumer != null) { MessageId innerId = topicMessageId.getInnerMessageId(); return individualConsumer.acknowledgeCumulativeAsync(innerId); @@ -377,7 +378,7 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException()); } } else { - ConsumerImpl consumer = consumers.get(topicMessageId.getTopicName()); + ConsumerImpl consumer = consumers.get(topicMessageId.getTopicPartitionName()); MessageId innerId = topicMessageId.getInnerMessageId(); return consumer.doAcknowledge(innerId, ackType, properties) @@ -510,7 +511,7 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { } removeExpiredMessagesFromQueue(messageIds); messageIds.stream().map(messageId -> (TopicMessageIdImpl)messageId) - .collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicName, Collectors.toSet())) + .collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicPartitionName, Collectors.toSet())) .forEach((topicName, messageIds1) -> consumers.get(topicName) .redeliverUnacknowledgedMessages(messageIds1.stream() diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java index 071b804..dd1b37d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java @@ -18,20 +18,39 @@ */ package org.apache.pulsar.client.impl; +import static com.google.common.base.Preconditions.checkState; +import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX; + import java.util.Objects; import org.apache.pulsar.client.api.MessageId; public class TopicMessageIdImpl implements MessageId { + + /** This topicPartitionName is get from ConsumerImpl, it contains partition part. */ + private final String topicPartitionName; private final String topicName; private final MessageId messageId; - TopicMessageIdImpl(String topicName, MessageId messageId) { - this.topicName = topicName; + TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId messageId) { this.messageId = messageId; + this.topicPartitionName = topicPartitionName; + this.topicName = topicName; } + /** + * Get the topic name without partition part of this message. + * @return the name of the topic on which this message was published + */ public String getTopicName() { - return topicName; + return this.topicName; + } + + /** + * Get the topic name which contains partition part for this message. + * @return the topic name which contains Partition part + */ + public String getTopicPartitionName() { + return this.topicPartitionName; } public MessageId getInnerMessageId() { @@ -49,7 +68,7 @@ public class TopicMessageIdImpl implements MessageId { return false; } TopicMessageIdImpl other = (TopicMessageIdImpl) obj; - return Objects.equals(topicName, other.topicName) + return Objects.equals(topicPartitionName, other.topicPartitionName) && Objects.equals(messageId, other.messageId); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java index 4f5ac13..eae02b0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java @@ -21,32 +21,44 @@ package org.apache.pulsar.client.impl; import java.util.Map; import java.util.Optional; - import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.api.EncryptionContext; public class TopicMessageImpl implements Message { + /** This topicPartitionName is get from ConsumerImpl, it contains partition part. */ + private final String topicPartitionName; private final String topicName; private final Message msg; private final TopicMessageIdImpl messageId; - TopicMessageImpl(String topicName, + TopicMessageImpl(String topicPartitionName, + String topicName, Message msg) { + this.topicPartitionName = topicPartitionName; this.topicName = topicName; + this.msg = msg; - this.messageId = new TopicMessageIdImpl(topicName, msg.getMessageId()); + this.messageId = new TopicMessageIdImpl(topicPartitionName, topicName, msg.getMessageId()); } /** - * Get the topic name of this message. + * Get the topic name without partition part of this message. * @return the name of the topic on which this message was published */ public String getTopicName() { return topicName; } + /** + * Get the topic name which contains partition part for this message. + * @return the topic name which contains Partition part + */ + public String getTopicPartitionName() { + return topicPartitionName; + } + @Override public MessageId getMessageId() { return messageId; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java index dfc9257..f500fda 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java @@ -32,12 +32,12 @@ public class UnAckedTopicMessageTracker extends UnAckedMessageTracker { int currentSetRemovedMsgCount = currentSet.removeIf(m -> { checkState(m instanceof TopicMessageIdImpl, "message should be of type TopicMessageIdImpl"); - return ((TopicMessageIdImpl)m).getTopicName().contains(topicName); + return ((TopicMessageIdImpl)m).getTopicPartitionName().contains(topicName); }); int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> { checkState(m instanceof TopicMessageIdImpl, "message should be of type TopicMessageIdImpl"); - return ((TopicMessageIdImpl)m).getTopicName().contains(topicName); + return ((TopicMessageIdImpl)m).getTopicPartitionName().contains(topicName); }); return currentSetRemovedMsgCount + oldSetRemovedMsgCount; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java index 78af44e..d032c23 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.client.impl; -import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import org.testng.annotations.Test; @@ -122,12 +121,15 @@ public class MessageIdCompareToTest { public void testMessageIdImplCompareToTopicMessageId() { MessageIdImpl messageIdImpl = new MessageIdImpl(123L, 345L, 567); TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl( + "test-topic-partition-0", "test-topic", new BatchMessageIdImpl(123L, 345L, 566, 789)); TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl( + "test-topic-partition-0", "test-topic", new BatchMessageIdImpl(123L, 345L, 567, 789)); TopicMessageIdImpl topicMessageId3 = new TopicMessageIdImpl( + "test-topic-partition-0", "test-topic", new BatchMessageIdImpl(messageIdImpl)); assertTrue(messageIdImpl.compareTo(topicMessageId1) > 0, "Expected to be greater than"); @@ -144,9 +146,11 @@ public class MessageIdCompareToTest { BatchMessageIdImpl messageIdImpl2 = new BatchMessageIdImpl(123L, 345L, 567, 0); BatchMessageIdImpl messageIdImpl3 = new BatchMessageIdImpl(123L, 345L, 567, -1); TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl( + "test-topic-partition-0", "test-topic", new MessageIdImpl(123L, 345L, 566)); TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl( + "test-topic-partition-0", "test-topic", new MessageIdImpl(123L, 345L, 567)); assertTrue(messageIdImpl1.compareTo(topicMessageId1) > 0, "Expected to be greater than"); diff --git a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java index c325e14..17ab493 100644 --- a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java +++ b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java @@ -245,7 +245,7 @@ public class SemanticsTest extends PulsarClusterTestBase { Message m = consumer.receive(); int topicIdx; if (numTopics > 1) { - String topic = ((TopicMessageIdImpl) m.getMessageId()).getTopicName(); + String topic = ((TopicMessageIdImpl) m.getMessageId()).getTopicPartitionName(); String[] topicParts = StringUtils.split(topic, '-'); topicIdx = Integer.parseInt(topicParts[topicParts.length - 1]);