pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: [go client] add SubscriptionInitPos option in ConsumerOptions (#3588)
Date Wed, 13 Feb 2019 13:26:48 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ed8fea  [go client] add SubscriptionInitPos option in ConsumerOptions (#3588)
2ed8fea is described below

commit 2ed8fea94e3565ebe2ffc999412377cf920354c5
Author: 冉小龙 <rxl5555555@qq.com>
AuthorDate: Wed Feb 13 21:26:43 2019 +0800

    [go client] add SubscriptionInitPos option in ConsumerOptions (#3588)
    
    * [go client] add SubscriptionInitPos option in ConsumerOptions
    
    Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
    
    * add comment for SubscriptionInitPos option
    
    Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
---
 pulsar-client-go/pulsar/c_consumer.go    |  4 +++
 pulsar-client-go/pulsar/consumer.go      | 14 ++++++++++
 pulsar-client-go/pulsar/consumer_test.go | 48 +++++++++++++++++++++++++++++++-
 3 files changed, 65 insertions(+), 1 deletion(-)

diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go
index 6f333f1..9ca73e8 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -100,6 +100,10 @@ func subscribeAsync(client *client, options ConsumerOptions, callback
func(Consu
 		C.pulsar_consumer_configuration_set_consumer_type(conf, C.pulsar_consumer_type(options.Type))
 	}
 
+	if options.SubscriptionInitPos != Latest {
+		C.pulsar_consumer_set_subscription_initial_position(conf, C.initial_position(options.SubscriptionInitPos))
+	}
+
 	// ReceiverQueueSize==0 means to use the default queue size
 	// -1 means to disable the consumer prefetching
 	if options.ReceiverQueueSize > 0 {
diff --git a/pulsar-client-go/pulsar/consumer.go b/pulsar-client-go/pulsar/consumer.go
index d431daa..d7549c4 100644
--- a/pulsar-client-go/pulsar/consumer.go
+++ b/pulsar-client-go/pulsar/consumer.go
@@ -46,6 +46,16 @@ const (
 	Failover
 )
 
+type InitialPosition int
+
+const (
+	// Latest position which means the start consuming position will be the last message
+	Latest InitialPosition = iota
+
+	// Earliest position which means the start consuming position will be the first message
+	Earliest
+)
+
 // ConsumerBuilder is used to configure and create instances of Consumer
 type ConsumerOptions struct {
 	// Specify the topic this consumer will subscribe on.
@@ -77,6 +87,10 @@ type ConsumerOptions struct {
 	// Default is `Exclusive`
 	Type SubscriptionType
 
+	// InitialPosition at which the cursor will be set when subscribe
+	// Default is `Latest`
+	SubscriptionInitPos InitialPosition
+
 	// Sets a `MessageChannel` for the consumer
 	// When a message is received, it will be pushed to the channel for consumption
 	MessageChannel chan ConsumerMessage
diff --git a/pulsar-client-go/pulsar/consumer_test.go b/pulsar-client-go/pulsar/consumer_test.go
index 963921e..e82ffe6 100644
--- a/pulsar-client-go/pulsar/consumer_test.go
+++ b/pulsar-client-go/pulsar/consumer_test.go
@@ -454,7 +454,53 @@ func TestConsumer_Seek(t *testing.T) {
 	msg, err := consumer.Receive(ctx)
 	assert.Nil(t, err)
 	t.Logf("again received message:%+v", msg.ID())
-	assert.Equal(t,string(msg.Payload()),"msg-content-0")
+	assert.Equal(t, "msg-content-0", string(msg.Payload()))
 
 	consumer.Unsubscribe()
 }
+
+func TestConsumer_SubscriptionInitPos(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topicName := "persistent://public/default/testSeek"
+	subName := "test-subscription-initial-earliest-position"
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topicName,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	//sent message
+	ctx := context.Background()
+
+	err = producer.Send(ctx, ProducerMessage{
+		Payload: []byte("msg-1-content-1"),
+	})
+	assert.Nil(t, err)
+
+	err = producer.Send(ctx, ProducerMessage{
+		Payload: []byte("msg-1-content-2"),
+	})
+	assert.Nil(t, err)
+
+	// create consumer
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:               topicName,
+		SubscriptionName:    subName,
+		SubscriptionInitPos: Earliest,
+	})
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	msg, err := consumer.Receive(ctx)
+	assert.Nil(t, err)
+
+	assert.Equal(t, "msg-1-content-1", string(msg.Payload()))
+}


Mime
View raw message