pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [incubator-pulsar] branch branch-2.1 updated: Added Reader.HasNext in Go client (#2450)
Date Mon, 27 Aug 2018 18:41:53 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new b6dc7c1  Added Reader.HasNext in Go client (#2450)
b6dc7c1 is described below

commit b6dc7c1eae314e65cf782a7b0201408279dc9e8d
Author: Matteo Merli <mmerli@apache.org>
AuthorDate: Mon Aug 27 11:41:24 2018 -0700

    Added Reader.HasNext in Go client (#2450)
    
    ### Motivation
    
    Added `Reader.HasNext()` in Go client library
---
 pulsar-client-go/pulsar/c_reader.go    | 13 +++++++++++++
 pulsar-client-go/pulsar/reader.go      |  3 +++
 pulsar-client-go/pulsar/reader_test.go | 12 ++++++++++--
 3 files changed, 26 insertions(+), 2 deletions(-)

diff --git a/pulsar-client-go/pulsar/c_reader.go b/pulsar-client-go/pulsar/c_reader.go
index 730f9b8..12c1103 100644
--- a/pulsar-client-go/pulsar/c_reader.go
+++ b/pulsar-client-go/pulsar/c_reader.go
@@ -146,6 +146,19 @@ func (r *reader) Next(ctx context.Context) (Message, error) {
 	}
 }
 
+func (r *reader) HasNext() (bool, error) {
+	value := C.int(0)
+	res := C.pulsar_reader_has_message_available(r.ptr, &value)
+
+	if res != C.pulsar_result_Ok {
+		return false, newError(res, "Failed to check if next message is available")
+	} else if value == C.int(1) {
+		return true, nil
+	} else {
+		return false, nil
+	}
+}
+
 func (r *reader) Close() error {
 	channel := make(chan error)
 	r.CloseAsync(func(err error) { channel <- err; close(channel) })
diff --git a/pulsar-client-go/pulsar/reader.go b/pulsar-client-go/pulsar/reader.go
index f61ebd7..7015c9c 100644
--- a/pulsar-client-go/pulsar/reader.go
+++ b/pulsar-client-go/pulsar/reader.go
@@ -67,6 +67,9 @@ type Reader interface {
 	// Read the next message in the topic, blocking until a message is available
 	Next(context.Context) (Message, error)
 
+	// Check if there is any message available to read from the current position
+	HasNext() (bool, error)
+
 	// Close the reader and stop the broker to push more messages
 	Close() error
 }
diff --git a/pulsar-client-go/pulsar/reader_test.go b/pulsar-client-go/pulsar/reader_test.go
index 11d1b36..3b075e1 100644
--- a/pulsar-client-go/pulsar/reader_test.go
+++ b/pulsar-client-go/pulsar/reader_test.go
@@ -20,9 +20,9 @@
 package pulsar
 
 import (
-	"testing"
-	"fmt"
 	"context"
+	"fmt"
+	"testing"
 )
 
 func TestReaderConnectError(t *testing.T) {
@@ -80,12 +80,20 @@ func TestReader(t *testing.T) {
 			t.Fatal(err)
 		}
 
+		hasNext, err := reader.HasNext()
+		assertNil(t, err)
+		assertEqual(t, hasNext, true)
+
 		msg, err := reader.Next(ctx)
 		assertNil(t, err)
 		assertNotNil(t, msg)
 
 		assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
 	}
+
+	hasNext, err := reader.HasNext()
+	assertNil(t, err)
+	assertEqual(t, hasNext, false)
 }
 
 func TestReaderWithInvalidConf(t *testing.T) {


Mime
View raw message