pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: [issues 4476] Fix MessageID output order error (#4477)
Date Wed, 05 Jun 2019 16:24:45 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d16b90  [issues 4476] Fix MessageID output order error (#4477)
0d16b90 is described below

commit 0d16b905d42d66c0a19361b7c46ab62140673d5a
Author: 冉小龙 <ranxiaolong716@gmail.com>
AuthorDate: Thu Jun 6 00:24:38 2019 +0800

    [issues 4476] Fix MessageID output order error (#4477)
    
    * [issues 4476] Fix MessageID output order error
    
    Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
    
    * add comments of msgID output
    
    Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
---
 pulsar-client-cpp/lib/MessageId.cc       |  2 +-
 pulsar-client-go/pulsar/producer_test.go | 54 ++++++++++++++++++++++++++++++++
 2 files changed, 55 insertions(+), 1 deletion(-)

diff --git a/pulsar-client-cpp/lib/MessageId.cc b/pulsar-client-cpp/lib/MessageId.cc
index 48a3a99..7936877 100644
--- a/pulsar-client-cpp/lib/MessageId.cc
+++ b/pulsar-client-cpp/lib/MessageId.cc
@@ -92,7 +92,7 @@ int32_t MessageId::partition() const { return impl_->partition_; }
 
 PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const pulsar::MessageId&
messageId) {
     s << '(' << messageId.impl_->ledgerId_ << ',' << messageId.impl_->entryId_
<< ','
-      << messageId.impl_->batchIndex_ << ',' << messageId.impl_->partition_
<< ')';
+      << messageId.impl_->partition_ << ',' << messageId.impl_->batchIndex_
<< ')';
     return s;
 }
 
diff --git a/pulsar-client-go/pulsar/producer_test.go b/pulsar-client-go/pulsar/producer_test.go
index 8c8e4d5..571fd9d 100644
--- a/pulsar-client-go/pulsar/producer_test.go
+++ b/pulsar-client-go/pulsar/producer_test.go
@@ -22,6 +22,7 @@ package pulsar
 import (
 	"context"
 	"fmt"
+	"strings"
 	"testing"
 	"time"
 
@@ -282,3 +283,56 @@ func TestProducer_Flush(t *testing.T) {
 		producer.Flush()
 	}
 }
+
+func TestProducer_MessageID(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topicName := "test-message-id"
+	subName := "sub-1"
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:               topicName,
+		Batching:            true,
+		BatchingMaxMessages: 5,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: subName,
+	})
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	ctx := context.Background()
+
+	for i := 0; i < 10; i++ {
+		// Create a different message to send asynchronously
+		asyncMsg := ProducerMessage{
+			Payload: []byte(fmt.Sprintf("async-message-%d", i)),
+		}
+		// Attempt to send the message asynchronously and handle the response
+		producer.SendAsync(ctx, asyncMsg, func(msg ProducerMessage, err error) {
+			if err != nil {
+				log.Fatal(err)
+			}
+		})
+	}
+
+	for i := 0; i < 10; i++ {
+		msg, err := consumer.Receive(ctx)
+		if err != nil {
+			log.Fatal(err)
+		}
+		err = consumer.Ack(msg)
+		assert.Nil(t, err)
+		// msgID output: (11,16,-1,0)
+		msgID := fmt.Sprintf("%v", msg.ID())
+		index := strings.Index(msgID, "-1")
+		assert.Equal(t, 6, index)
+	}
+}


Mime
View raw message