This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new b6dc7c1 Added Reader.HasNext in Go client (#2450)
b6dc7c1 is described below
commit b6dc7c1eae314e65cf782a7b0201408279dc9e8d
Author: Matteo Merli <mmerli@apache.org>
AuthorDate: Mon Aug 27 11:41:24 2018 -0700
Added Reader.HasNext in Go client (#2450)
### Motivation
Added `Reader.HasNext()` in Go client library
---
pulsar-client-go/pulsar/c_reader.go | 13 +++++++++++++
pulsar-client-go/pulsar/reader.go | 3 +++
pulsar-client-go/pulsar/reader_test.go | 12 ++++++++++--
3 files changed, 26 insertions(+), 2 deletions(-)
diff --git a/pulsar-client-go/pulsar/c_reader.go b/pulsar-client-go/pulsar/c_reader.go
index 730f9b8..12c1103 100644
--- a/pulsar-client-go/pulsar/c_reader.go
+++ b/pulsar-client-go/pulsar/c_reader.go
@@ -146,6 +146,19 @@ func (r *reader) Next(ctx context.Context) (Message, error) {
}
}
+func (r *reader) HasNext() (bool, error) {
+ value := C.int(0)
+ res := C.pulsar_reader_has_message_available(r.ptr, &value)
+
+ if res != C.pulsar_result_Ok {
+ return false, newError(res, "Failed to check if next message is available")
+ } else if value == C.int(1) {
+ return true, nil
+ } else {
+ return false, nil
+ }
+}
+
func (r *reader) Close() error {
channel := make(chan error)
r.CloseAsync(func(err error) { channel <- err; close(channel) })
diff --git a/pulsar-client-go/pulsar/reader.go b/pulsar-client-go/pulsar/reader.go
index f61ebd7..7015c9c 100644
--- a/pulsar-client-go/pulsar/reader.go
+++ b/pulsar-client-go/pulsar/reader.go
@@ -67,6 +67,9 @@ type Reader interface {
// Read the next message in the topic, blocking until a message is available
Next(context.Context) (Message, error)
+ // Check if there is any message available to read from the current position
+ HasNext() (bool, error)
+
// Close the reader and stop the broker to push more messages
Close() error
}
diff --git a/pulsar-client-go/pulsar/reader_test.go b/pulsar-client-go/pulsar/reader_test.go
index 11d1b36..3b075e1 100644
--- a/pulsar-client-go/pulsar/reader_test.go
+++ b/pulsar-client-go/pulsar/reader_test.go
@@ -20,9 +20,9 @@
package pulsar
import (
- "testing"
- "fmt"
"context"
+ "fmt"
+ "testing"
)
func TestReaderConnectError(t *testing.T) {
@@ -80,12 +80,20 @@ func TestReader(t *testing.T) {
t.Fatal(err)
}
+ hasNext, err := reader.HasNext()
+ assertNil(t, err)
+ assertEqual(t, hasNext, true)
+
msg, err := reader.Next(ctx)
assertNil(t, err)
assertNotNil(t, msg)
assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
}
+
+ hasNext, err := reader.HasNext()
+ assertNil(t, err)
+ assertEqual(t, hasNext, false)
}
func TestReaderWithInvalidConf(t *testing.T) {
|