pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [incubator-pulsar] 04/04: [go] Ensure producer/consumer/reader keep a ref of client instance so it won't be finalized (#2527)
Date Thu, 06 Sep 2018 18:11:22 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git

commit 050d47dcb1e3138d093941f9a00641c7caf7c3fd
Author: Matteo Merli <mmerli@apache.org>
AuthorDate: Thu Sep 6 07:40:24 2018 -0700

    [go] Ensure producer/consumer/reader keep a ref of client instance so it won't be finalized
(#2527)
---
 pulsar-client-go/pulsar/c_consumer.go |  3 ++-
 pulsar-client-go/pulsar/c_producer.go | 12 +++++++-----
 pulsar-client-go/pulsar/c_reader.go   |  3 ++-
 3 files changed, 11 insertions(+), 7 deletions(-)

diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go
index 1b41a71..c78a58e 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -32,6 +32,7 @@ import (
 )
 
 type consumer struct {
+	client         *client
 	ptr            *C.pulsar_consumer_t
 	defaultChannel chan ConsumerMessage
 }
@@ -76,7 +77,7 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu
 
 	conf := C.pulsar_consumer_configuration_create()
 
-	consumer := &consumer{}
+	consumer := &consumer{client: client}
 
 	if options.MessageChannel == nil {
 		// If there is no message listener, set a default channel so that we can have receive to
diff --git a/pulsar-client-go/pulsar/c_producer.go b/pulsar-client-go/pulsar/c_producer.go
index 284315d..620b64d 100644
--- a/pulsar-client-go/pulsar/c_producer.go
+++ b/pulsar-client-go/pulsar/c_producer.go
@@ -24,13 +24,14 @@ package pulsar
 */
 import "C"
 import (
+	"context"
 	"runtime"
-	"unsafe"
 	"time"
-	"context"
+	"unsafe"
 )
 
 type createProducerCtx struct {
+	client   *client
 	callback func(producer Producer, err error)
 	conf     *C.pulsar_producer_configuration_t
 }
@@ -44,7 +45,7 @@ func pulsarCreateProducerCallbackProxy(res C.pulsar_result, ptr *C.pulsar_produc
 	if res != C.pulsar_result_Ok {
 		producerCtx.callback(nil, newError(res, "Failed to create Producer"))
 	} else {
-		p := &producer{ptr: ptr}
+		p := &producer{client: producerCtx.client, ptr: ptr}
 		runtime.SetFinalizer(p, producerFinalizer)
 		producerCtx.callback(p, nil)
 	}
@@ -140,7 +141,7 @@ func createProducerAsync(client *client, options ProducerOptions, callback
func(
 	defer C.free(unsafe.Pointer(topicName))
 
 	C._pulsar_client_create_producer_async(client.ptr, topicName, conf,
-		savePointer(createProducerCtx{callback, conf}))
+		savePointer(createProducerCtx{client,callback, conf}))
 }
 
 type topicMetadata struct {
@@ -161,7 +162,8 @@ func pulsarRouterCallbackProxy(msg *C.pulsar_message_t, metadata *C.pulsar_topic
 /// Producer
 
 type producer struct {
-	ptr *C.pulsar_producer_t
+	client *client
+	ptr    *C.pulsar_producer_t
 }
 
 func producerFinalizer(p *producer) {
diff --git a/pulsar-client-go/pulsar/c_reader.go b/pulsar-client-go/pulsar/c_reader.go
index 04bb5cf..7336c1a 100644
--- a/pulsar-client-go/pulsar/c_reader.go
+++ b/pulsar-client-go/pulsar/c_reader.go
@@ -31,6 +31,7 @@ import (
 )
 
 type reader struct {
+	client         *client
 	ptr            *C.pulsar_reader_t
 	defaultChannel chan ReaderMessage
 }
@@ -73,7 +74,7 @@ func createReaderAsync(client *client, options ReaderOptions, callback func(Read
 		return
 	}
 
-	reader := &reader{}
+	reader := &reader{client: client}
 
 	if options.MessageChannel == nil {
 		// If there is no message listener, set a default channel so that we can have receive to


Mime
View raw message