pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [pulsar] ivankelly commented on a change in pull request #4062: Delayed message delivery implementation
Date Tue, 21 May 2019 09:07:10 GMT
ivankelly commented on a change in pull request #4062: Delayed message delivery implementation
URL: https://github.com/apache/pulsar/pull/4062#discussion_r285924331
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
 ##########
 @@ -284,42 +288,69 @@ private void incrementUnackedMessages(int ackedMessages) {
         }
     }
 
-    public static int getBatchSizeforEntry(ByteBuf metadataAndPayload, Subscription subscription,
long consumerId) {
+    public static int getNumberOfMessagesInBatch(ByteBuf metadataAndPayload, Subscription
subscription,
+            long consumerId) {
+        MessageMetadata msgMetadata = peekMessageMetadata(metadataAndPayload, subscription,
consumerId);
+        if (msgMetadata == null) {
+            return -1;
+        } else {
+            int numMessagesInBatch = msgMetadata.getNumMessagesInBatch();
+            msgMetadata.recycle();
+            return numMessagesInBatch;
+        }
+    }
+
+    public static MessageMetadata peekMessageMetadata(ByteBuf metadataAndPayload, Subscription
subscription,
+            long consumerId) {
         try {
             // save the reader index and restore after parsing
-            metadataAndPayload.markReaderIndex();
+            int readerIdx = metadataAndPayload.readerIndex();
             PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
-            metadataAndPayload.resetReaderIndex();
-            int batchSize = metadata.getNumMessagesInBatch();
-            metadata.recycle();
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] num messages in batch are {} ", subscription, consumerId,
batchSize);
-            }
-            return batchSize;
+            metadataAndPayload.readerIndex(readerIdx);
+
+            return metadata;
         } catch (Throwable t) {
             log.error("[{}] [{}] Failed to parse message metadata", subscription, consumerId,
t);
+            return null;
         }
-        return -1;
     }
 
-    void updatePermitsAndPendingAcks(final List<Entry> entries, SendMessageInfo sentMessages)
throws PulsarServerException {
+    private void updatePermitsAndFilterMessages(final List<Entry> entries, SendMessageInfo
sentMessages) throws PulsarServerException {
         int permitsToReduce = 0;
-        Iterator<Entry> iter = entries.iterator();
         boolean unsupportedVersion = false;
         long totalReadableBytes = 0;
         boolean clientSupportBatchMessages = cnx.isBatchMessageCompatibleVersion();
-        while (iter.hasNext()) {
-            Entry entry = iter.next();
+
+        for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
+            Entry entry = entries.get(i);
             ByteBuf metadataAndPayload = entry.getDataBuffer();
-            int batchSize = getBatchSizeforEntry(metadataAndPayload, subscription, consumerId);
-            if (batchSize == -1) {
-                // this would suggest that the message might have been corrupted
-                iter.remove();
-                PositionImpl pos = (PositionImpl) entry.getPosition();
-                entry.release();
-                subscription.acknowledgeMessage(Collections.singletonList(pos), AckType.Individual,
Collections.emptyMap());
-                continue;
+            PositionImpl pos = (PositionImpl) entry.getPosition();
+
+            int batchSize;
+            MessageMetadata msgMetadata = peekMessageMetadata(metadataAndPayload, subscription,
consumerId);
 
 Review comment:
   As I said via the other channel there's two options.
   1. Pass in the batchSizeSum as an int, batchSize here becomes batchSizeSum/entries.size()
   2. Create a "stacklocal" ArrayList variable for the batchSizes and pass it in. Reuse the
ArrayList for each invokation (maybe make it thread local)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message