From commits-return-24153-archive-asf-public=cust-asf.ponee.io@pulsar.apache.org Fri Mar 8 01:09:49 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 39949180654 for ; Fri, 8 Mar 2019 02:09:49 +0100 (CET) Received: (qmail 37519 invoked by uid 500); 8 Mar 2019 01:09:48 -0000 Mailing-List: contact commits-help@pulsar.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.apache.org Delivered-To: mailing list commits@pulsar.apache.org Received: (qmail 37510 invoked by uid 99); 8 Mar 2019 01:09:48 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Mar 2019 01:09:48 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id B380487970; Fri, 8 Mar 2019 01:09:47 +0000 (UTC) Date: Fri, 08 Mar 2019 01:09:47 +0000 To: "commits@pulsar.apache.org" Subject: [pulsar] branch master updated: Fixed Reader.HasNext() in Go client (#3764) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <155200738749.6064.7655340119842292967@gitbox.apache.org> From: mmerli@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: pulsar X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 1da21612d48f68db39905842feeaed202f45b684 X-Git-Newrev: 593db1c5e609697c7ea4063387d397dfe6375675 X-Git-Rev: 593db1c5e609697c7ea4063387d397dfe6375675 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 593db1c Fixed Reader.HasNext() in Go client (#3764) 593db1c is described below commit 593db1c5e609697c7ea4063387d397dfe6375675 Author: Matteo Merli AuthorDate: Thu Mar 7 17:09:37 2019 -0800 Fixed Reader.HasNext() in Go client (#3764) * Fixed Reader.HasNext() in Go client * Fixed formatting * Removed commented code --- pulsar-client-cpp/lib/ConsumerImpl.cc | 8 +++---- pulsar-client-cpp/lib/ConsumerImpl.h | 8 +++---- pulsar-client-cpp/lib/Reader.cc | 6 +---- pulsar-client-go/pulsar/reader_test.go | 44 ++++++++++++++++++++++++++++++++++ 4 files changed, 52 insertions(+), 14 deletions(-) diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index 1c46323..7571d66 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -503,6 +503,7 @@ void ConsumerImpl::internalListener() { unAckedMessageTrackerPtr_->add(msg.getMessageId()); try { consumerStatsBasePtr_->receivedMessage(msg, ResultOk); + lastDequedMessage_ = Optional::of(msg.getMessageId()); messageListener_(Consumer(shared_from_this()), msg); } catch (const std::exception& e) { LOG_ERROR(getName() << "Exception thrown from listener" << e.what()); @@ -1039,8 +1040,7 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback return; } - BrokerGetLastMessageIdCallback callback1 = [this, lastDequed, callback](Result result, - MessageId messageId) { + getLastMessageIdAsync([this, lastDequed, callback](Result result, MessageId messageId) { if (result == ResultOk) { if (messageId > lastDequed && messageId.entryId() != -1) { callback(ResultOk, true); @@ -1050,9 +1050,7 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback } else { callback(result, false); } - }; - - getLastMessageIdAsync(callback1); + }); } void ConsumerImpl::brokerGetLastMessageIdListener(Result res, MessageId messageId, diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h index 747c276..b5fe761 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.h +++ b/pulsar-client-cpp/lib/ConsumerImpl.h @@ -177,12 +177,12 @@ class ConsumerImpl : public ConsumerImplBase, void brokerGetLastMessageIdListener(Result res, MessageId messageId, BrokerGetLastMessageIdCallback callback); - MessageId lastMessageIdDequed() { - return lastDequedMessage_.is_present() ? lastDequedMessage_.value() : MessageId(); + const MessageId& lastMessageIdDequed() { + return lastDequedMessage_.is_present() ? lastDequedMessage_.value() : MessageId::earliest(); } - MessageId lastMessageIdInBroker() { - return lastMessageInBroker_.is_present() ? lastMessageInBroker_.value() : MessageId(); + const MessageId& lastMessageIdInBroker() { + return lastMessageInBroker_.is_present() ? lastMessageInBroker_.value() : MessageId::earliest(); } friend class PulsarFriend; diff --git a/pulsar-client-cpp/lib/Reader.cc b/pulsar-client-cpp/lib/Reader.cc index cd86d62..e14c547 100644 --- a/pulsar-client-cpp/lib/Reader.cc +++ b/pulsar-client-cpp/lib/Reader.cc @@ -77,13 +77,9 @@ void Reader::hasMessageAvailableAsync(HasMessageAvailableCallback callback) { } Result Reader::hasMessageAvailable(bool& hasMessageAvailable) { - if (!impl_) { - return ResultConsumerNotInitialized; - } - Promise promise; - impl_->hasMessageAvailableAsync(WaitForCallbackValue(promise)); + hasMessageAvailableAsync(WaitForCallbackValue(promise)); return promise.getFuture().get(hasMessageAvailable); } diff --git a/pulsar-client-go/pulsar/reader_test.go b/pulsar-client-go/pulsar/reader_test.go index 4212617..90821a8 100644 --- a/pulsar-client-go/pulsar/reader_test.go +++ b/pulsar-client-go/pulsar/reader_test.go @@ -23,6 +23,7 @@ import ( "context" "fmt" "github.com/stretchr/testify/assert" + "math/rand" "strings" "testing" "time" @@ -218,3 +219,46 @@ func TestReaderCompaction(t *testing.T) { assert.Nil(t, msg) assert.NotNil(t, err) } + +func TestReaderHasNext(t *testing.T) { + topic := fmt.Sprintf("TestReaderHasNext-%d", rand.Int()) + ctx := context.Background() + + client, err := NewClient(ClientOptions{ + URL: "pulsar://localhost:6650", + }) + assert.Nil(t, err) + defer client.Close() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + }) + assert.Nil(t, err) + defer producer.Close() + + // Send a message. + err = producer.Send(ctx, ProducerMessage{}) + assert.Nil(t, err) + + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessage, + }) + assert.Nil(t, err) + defer reader.Close() + + var hasNext bool + + // Now we have 1 message to read + hasNext, err = reader.HasNext() + assert.Nil(t, err) + assert.True(t, hasNext) + + _, err = reader.Next(ctx) + assert.Nil(t, err) + + // Now there is no message left + hasNext, err = reader.HasNext() + assert.Nil(t, err) + assert.False(t, hasNext) +}