This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8688b67 Issue #2330: change getTopicName in MultiTopicsConsumer (#2346)
8688b67 is described below
commit 8688b6733177ee4a48d805df18a1d82865cf7320
Author: Jia Zhai <jiazhai@users.noreply.github.com>
AuthorDate: Tue Aug 28 02:38:41 2018 +0800
Issue #2330: change getTopicName in MultiTopicsConsumer (#2346)
* change getTopicName in MultiTopicsConsumer
* change following sijie's comments
* keep both topicName and topicPartitonName in consumer to avoid new string
---
.../apache/pulsar/client/impl/ConsumerImpl.java | 8 +++++++
.../client/impl/MultiTopicsConsumerImpl.java | 9 ++++----
.../pulsar/client/impl/TopicMessageIdImpl.java | 27 ++++++++++++++++++----
.../pulsar/client/impl/TopicMessageImpl.java | 20 ++++++++++++----
.../client/impl/UnAckedTopicMessageTracker.java | 4 ++--
.../pulsar/client/impl/MessageIdCompareToTest.java | 6 ++++-
.../tests/integration/semantics/SemanticsTest.java | 2 +-
7 files changed, 60 insertions(+), 16 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index da04534..fe37b69 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -131,6 +131,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandle
private final SubscriptionInitialPosition subscriptionInitialPosition;
private final ConnectionHandler connectionHandler;
+ private final String topicNameWithoutPartition;
+
enum SubscriptionMode {
// Make the subscription to be backed by a durable cursor that will retain messages
and persist the current
// position
@@ -203,6 +205,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandle
NonPersistentAcknowledgmentGroupingTracker.of();
}
+ topicNameWithoutPartition = topicName.getPartitionedTopicName();
+
grabCnx();
}
@@ -1458,6 +1462,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandle
this.connectionHandler.grabCnx();
}
+ public String getTopicNameWithoutPartition() {
+ return topicNameWithoutPartition;
+ }
+
private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 799bec8..f1cb9cf 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -231,7 +231,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T>
{
checkArgument(message instanceof MessageImpl);
lock.writeLock().lock();
try {
- TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(consumer.getTopic(),
message);
+ TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
+ consumer.getTopic(), consumer.getTopicNameWithoutPartition(), message);
unAckedMessageTracker.add(topicMessage.getMessageId());
if (log.isDebugEnabled()) {
@@ -370,7 +371,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T>
{
}
if (ackType == AckType.Cumulative) {
- Consumer individualConsumer = consumers.get(topicMessageId.getTopicName());
+ Consumer individualConsumer = consumers.get(topicMessageId.getTopicPartitionName());
if (individualConsumer != null) {
MessageId innerId = topicMessageId.getInnerMessageId();
return individualConsumer.acknowledgeCumulativeAsync(innerId);
@@ -378,7 +379,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T>
{
return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException());
}
} else {
- ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicName());
+ ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicPartitionName());
MessageId innerId = topicMessageId.getInnerMessageId();
return consumer.doAcknowledge(innerId, ackType, properties)
@@ -511,7 +512,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T>
{
}
removeExpiredMessagesFromQueue(messageIds);
messageIds.stream().map(messageId -> (TopicMessageIdImpl)messageId)
- .collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicName, Collectors.toSet()))
+ .collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicPartitionName, Collectors.toSet()))
.forEach((topicName, messageIds1) ->
consumers.get(topicName)
.redeliverUnacknowledgedMessages(messageIds1.stream()
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
index 071b804..dd1b37d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
@@ -18,20 +18,39 @@
*/
package org.apache.pulsar.client.impl;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
+
import java.util.Objects;
import org.apache.pulsar.client.api.MessageId;
public class TopicMessageIdImpl implements MessageId {
+
+ /** This topicPartitionName is get from ConsumerImpl, it contains partition part. */
+ private final String topicPartitionName;
private final String topicName;
private final MessageId messageId;
- TopicMessageIdImpl(String topicName, MessageId messageId) {
- this.topicName = topicName;
+ TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId messageId)
{
this.messageId = messageId;
+ this.topicPartitionName = topicPartitionName;
+ this.topicName = topicName;
}
+ /**
+ * Get the topic name without partition part of this message.
+ * @return the name of the topic on which this message was published
+ */
public String getTopicName() {
- return topicName;
+ return this.topicName;
+ }
+
+ /**
+ * Get the topic name which contains partition part for this message.
+ * @return the topic name which contains Partition part
+ */
+ public String getTopicPartitionName() {
+ return this.topicPartitionName;
}
public MessageId getInnerMessageId() {
@@ -49,7 +68,7 @@ public class TopicMessageIdImpl implements MessageId {
return false;
}
TopicMessageIdImpl other = (TopicMessageIdImpl) obj;
- return Objects.equals(topicName, other.topicName)
+ return Objects.equals(topicPartitionName, other.topicPartitionName)
&& Objects.equals(messageId, other.messageId);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index 4f5ac13..eae02b0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -21,32 +21,44 @@ package org.apache.pulsar.client.impl;
import java.util.Map;
import java.util.Optional;
-
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.EncryptionContext;
public class TopicMessageImpl<T> implements Message<T> {
+ /** This topicPartitionName is get from ConsumerImpl, it contains partition part. */
+ private final String topicPartitionName;
private final String topicName;
private final Message<T> msg;
private final TopicMessageIdImpl messageId;
- TopicMessageImpl(String topicName,
+ TopicMessageImpl(String topicPartitionName,
+ String topicName,
Message<T> msg) {
+ this.topicPartitionName = topicPartitionName;
this.topicName = topicName;
+
this.msg = msg;
- this.messageId = new TopicMessageIdImpl(topicName, msg.getMessageId());
+ this.messageId = new TopicMessageIdImpl(topicPartitionName, topicName, msg.getMessageId());
}
/**
- * Get the topic name of this message.
+ * Get the topic name without partition part of this message.
* @return the name of the topic on which this message was published
*/
public String getTopicName() {
return topicName;
}
+ /**
+ * Get the topic name which contains partition part for this message.
+ * @return the topic name which contains Partition part
+ */
+ public String getTopicPartitionName() {
+ return topicPartitionName;
+ }
+
@Override
public MessageId getMessageId() {
return messageId;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
index dfc9257..f500fda 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
@@ -32,12 +32,12 @@ public class UnAckedTopicMessageTracker extends UnAckedMessageTracker
{
int currentSetRemovedMsgCount = currentSet.removeIf(m -> {
checkState(m instanceof TopicMessageIdImpl,
"message should be of type TopicMessageIdImpl");
- return ((TopicMessageIdImpl)m).getTopicName().contains(topicName);
+ return ((TopicMessageIdImpl)m).getTopicPartitionName().contains(topicName);
});
int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> {
checkState(m instanceof TopicMessageIdImpl,
"message should be of type TopicMessageIdImpl");
- return ((TopicMessageIdImpl)m).getTopicName().contains(topicName);
+ return ((TopicMessageIdImpl)m).getTopicPartitionName().contains(topicName);
});
return currentSetRemovedMsgCount + oldSetRemovedMsgCount;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java
index 78af44e..d032c23 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.client.impl;
-import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import org.testng.annotations.Test;
@@ -122,12 +121,15 @@ public class MessageIdCompareToTest {
public void testMessageIdImplCompareToTopicMessageId() {
MessageIdImpl messageIdImpl = new MessageIdImpl(123L, 345L, 567);
TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl(
+ "test-topic-partition-0",
"test-topic",
new BatchMessageIdImpl(123L, 345L, 566, 789));
TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl(
+ "test-topic-partition-0",
"test-topic",
new BatchMessageIdImpl(123L, 345L, 567, 789));
TopicMessageIdImpl topicMessageId3 = new TopicMessageIdImpl(
+ "test-topic-partition-0",
"test-topic",
new BatchMessageIdImpl(messageIdImpl));
assertTrue(messageIdImpl.compareTo(topicMessageId1) > 0, "Expected to be greater
than");
@@ -144,9 +146,11 @@ public class MessageIdCompareToTest {
BatchMessageIdImpl messageIdImpl2 = new BatchMessageIdImpl(123L, 345L, 567, 0);
BatchMessageIdImpl messageIdImpl3 = new BatchMessageIdImpl(123L, 345L, 567, -1);
TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl(
+ "test-topic-partition-0",
"test-topic",
new MessageIdImpl(123L, 345L, 566));
TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl(
+ "test-topic-partition-0",
"test-topic",
new MessageIdImpl(123L, 345L, 567));
assertTrue(messageIdImpl1.compareTo(topicMessageId1) > 0, "Expected to be greater
than");
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
index 2078726..84dfad1 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
@@ -245,7 +245,7 @@ public class SemanticsTest extends PulsarClusterTestBase {
Message<String> m = consumer.receive();
int topicIdx;
if (numTopics > 1) {
- String topic = ((TopicMessageIdImpl) m.getMessageId()).getTopicName();
+ String topic = ((TopicMessageIdImpl) m.getMessageId()).getTopicPartitionName();
String[] topicParts = StringUtils.split(topic, '-');
topicIdx = Integer.parseInt(topicParts[topicParts.length - 1]);
|