pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [pulsar] merlimat commented on a change in pull request #6362: [ClientAPI]Fix hasMessageAvailable()
Date Mon, 02 Mar 2020 20:02:44 GMT
merlimat commented on a change in pull request #6362: [ClientAPI]Fix hasMessageAvailable()
URL: https://github.com/apache/pulsar/pull/6362#discussion_r386619487
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
 ##########
 @@ -1396,22 +1401,83 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId)
             Topic topic = consumer.getSubscription().getTopic();
             Position position = topic.getLastMessageId();
             int partitionIndex = TopicName.getPartitionIndex(topic.getName());
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
-                    topic.getName(), consumer.getSubscription().getName(), position, partitionIndex);
-            }
-            MessageIdData messageId = MessageIdData.newBuilder()
-                .setLedgerId(((PositionImpl)position).getLedgerId())
-                .setEntryId(((PositionImpl)position).getEntryId())
-                .setPartition(partitionIndex)
-                .build();
 
-            ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId));
+            getLargestBatchIndexWhenPossible(
+                    topic,
+                    (PositionImpl) position,
+                    partitionIndex,
+                    requestId,
+                    consumer.getSubscription().getName());
+
         } else {
             ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), ServerError.MetadataError,
"Consumer not found"));
         }
     }
 
+    private void getLargestBatchIndexWhenPossible(
+            Topic topic,
+            PositionImpl position,
+            int partitionIndex,
+            long requestId,
+            String subscriptionName) {
+
+        PersistentTopic persistentTopic = (PersistentTopic) topic;
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+
+        // If it's not pointing to a valid entry, respond messageId of the current position.
+        if (position.getEntryId() == -1) {
+            MessageIdData messageId = MessageIdData.newBuilder()
+                    .setLedgerId(position.getLedgerId())
+                    .setEntryId(position.getEntryId())
+                    .setPartition(partitionIndex).build();
+
+            ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId));
+        }
+
+        // For a valid position, we read the entry out and parse the batch size from its
metadata.
+        CompletableFuture<Entry> entryFuture = new CompletableFuture<>();
+        ml.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
 
 Review comment:
   Ideally we should be avoiding this read by just recording the number of messages in the
last batch, along with the current position.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message