pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [pulsar] sijie commented on a change in pull request #4435: Introduce batch message container framework and support key based batching container
Date Wed, 05 Jun 2019 06:42:14 GMT
sijie commented on a change in pull request #4435: Introduce batch message container framework
and support key based batching container
URL: https://github.com/apache/pulsar/pull/4435#discussion_r290594792
 
 

 ##########
 File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
 ##########
 @@ -146,9 +114,51 @@ void clear() {
         batchedMessageMetadataAndPayload = null;
     }
 
-    boolean isEmpty() {
+    @Override
+    public boolean isEmpty() {
         return messages.isEmpty();
     }
 
-    private static final Logger log = LoggerFactory.getLogger(BatchMessageContainer.class);
+    @Override
+    public void handleException(Exception ex) {
+        try {
+            // Need to protect ourselves from any exception being thrown in the future handler
from the application
+            firstCallback.sendComplete(ex);
+        } catch (Throwable t) {
+            log.warn("[{}] [{}] Got exception while completing the callback for msg {}:",
topicName, producerName,
+                sequenceId, t);
+        }
+        ReferenceCountUtil.safeRelease(batchedMessageMetadataAndPayload);
+        clear();
+    }
+
+    private OpSendMsg createOpSendMsg() throws IOException {
+        ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload());
+        messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
+        ByteBufPair cmd = producer.sendMessage(producer.producerId, sequenceId, numMessagesInBatch,
+            messageMetadata.build(), encryptedPayload);
+
+        OpSendMsg op = OpSendMsg.create(messages, cmd, sequenceId, firstCallback);
+
+        if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
+            cmd.release();
+            if (op != null) {
+                op.callback.sendComplete(new PulsarClientException.InvalidMessageException(
+                    "Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes"));
+                op.recycle();
+            }
+            return null;
+        }
+
+        op.setNumMessagesInBatch(numMessagesInBatch);
+        op.setBatchSizeByte(currentBatchSizeBytes);
+        return op;
+    }
+
+    @Override
+    public List<OpSendMsg> createOpSendMsgs() throws IOException {
 
 Review comment:
   nit: it seems that we have to create one list for each batch when using normal batching
mechanism. is it possible that we can reduce this overhead when key based batching is not
used?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message