pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [pulsar-client-go] cgfork opened a new issue #282: The client still receives the message after close consumer or close client in Shared subscription
Date Tue, 16 Jun 2020 03:59:07 GMT

cgfork opened a new issue #282:
URL: https://github.com/apache/pulsar-client-go/issues/282


   #### Expected behavior
   
   Stop consuming the messages after invoking the consumer.Close()
   
   #### Actual behavior
   
   Still receiving the messages
   
   #### Steps to reproduce
   
   ```go
   func main() {
   	client, err := pulsar.NewClient(pulsar.ClientOptions{
   		URL: "pulsar://<Your Pulsar Domain>:6650",
   	})
   	checkError(err)
   	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
   		Topic:            "test",
   		Name:             "consumer-test-client",
   		SubscriptionName: "consumer-test",
   		Type:             pulsar.Shared,
   	})
   	checkError(err)
   	if os.Args[1] == "block" {
   		time.Sleep(1 * time.Hour)
   	}
   	// will return an error
   	err = consumer.Unsubscribe()
   	fmt.Println("Unsubscribe", err)
   	consumer.Close()
   	fmt.Println("Consumer Closed")
   	client.Close()
   	fmt.Println("Client Closed")
   	time.Sleep(1 * time.Hour)
   }
   ```
   then, build `go build -o pulsar` and run the command in two terminal:
   terminal 1:
   ```shell
   $ ./pulsar block
   INFO[0000] Connecting to broker                          remote_addr="pulsar://172.23.34.15:6650"
   INFO[0000] TCP connection established                    local_addr="10.23.67.12:53964"
remote_addr="pulsar://172.23.34.15:6650"
   INFO[0000] Connection is ready                           local_addr="10.23.67.12:53964"
remote_addr="pulsar://172.23.34.15:6650"
   INFO[0000] Connected consumer                            name=consumer-test-client subscription=consumer-test
topic="persistent://public/default/test"
   INFO[0000] Created consumer                              name=consumer-test-client subscription=consumer-test
topic="persistent://public/default/test"
   ```
   terminal 2:
   ```shell
   $ ./pulsar unblock                                                                    
                                                                         130 ↵
   INFO[0000] Connecting to broker                          remote_addr="pulsar://172.23.34.15:6650"
   INFO[0000] TCP connection established                    local_addr="10.23.67.12:53850"
remote_addr="pulsar://172.23.34.15:6650"
   INFO[0000] Connection is ready                           local_addr="10.23.67.12:53850"
remote_addr="pulsar://172.23.34.15:6650"
   INFO[0000] Connected consumer                            name=consumer-test-client subscription=consumer-test
topic="persistent://public/default/test"
   INFO[0000] Created consumer                              name=consumer-test-client subscription=consumer-test
topic="persistent://public/default/test"
   ERRO[0000] Failed to unsubscribe consumer                error="server error: MetadataError:
Unconnected or shared consumer attempting to unsubscribe" name=consumer-test-client subscription=consumer-test
topic="persistent://public/default/test"
   INFO[0000] The consumer[1] successfully unsubscribed     name=consumer-test-client subscription=consumer-test
topic="persistent://public/default/test"
   Unsubscribe topic persistent://public/default/test, subscription consumer-test: server
error: MetadataError: Unconnected or shared consumer attempting to unsubscribe
   Consumer Closed
   Client Closed
   WARN[0046] Got unexpected message: ledgerId:796 entryId:20 partition:-1   consumerID=1
local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
   WARN[0046] Got unexpected message: ledgerId:796 entryId:22 partition:-1   consumerID=1
local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
   WARN[0046] Got unexpected message: ledgerId:796 entryId:24 partition:-1   consumerID=1
local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
   WARN[0046] Got unexpected message: ledgerId:796 entryId:26 partition:-1   consumerID=1
local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
   WARN[0046] Got unexpected message: ledgerId:796 entryId:28 partition:-1   consumerID=1
local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
   WARN[0046] Got unexpected message: ledgerId:796 entryId:19 partition:-1   consumerID=1
local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
   WARN[0046] Got unexpected message: ledgerId:796 entryId:30 partition:-1   consumerID=1
local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
   WARN[0046] Got unexpected message: ledgerId:796 entryId:32 partition:-1   consumerID=1
local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
   ```
   
   You can also run the follow command with pulsar-admin:
   ```shell
   # bin/pulsar-admin topics stats persistent://public/default/test
   {
     "msgRateIn" : 0.0,
     "msgThroughputIn" : 0.0,
     "msgRateOut" : 10.721512347506462,
     "msgThroughputOut" : 739.7843519779458,
     "averageMsgSize" : 0.0,
     "storageSize" : 3036,
     "backlogSize" : 1725,
     "publishers" : [ ],
     "subscriptions" : {
       "consumer-test" : {
         "msgRateOut" : 10.721512347506462,
         "msgThroughputOut" : 739.7843519779458,
         "msgRateRedeliver" : 0.0,
         "msgBacklog" : 25,
         "blockedSubscriptionOnUnackedMsgs" : false,
         "msgDelayed" : 0,
         "unackedMessages" : 25,
         "type" : "Shared",
         "msgRateExpired" : 0.0,
         "lastExpireTimestamp" : 0,
         "consumers" : [ {
           "msgRateOut" : 10.721512347506462,
           "msgThroughputOut" : 739.7843519779458,
           "msgRateRedeliver" : 0.0,
           "consumerName" : "consumer-test-client",
           "availablePermits" : 975,
           "unackedMessages" : 25,
           "blockedConsumerOnUnackedMsgs" : false,
           "metadata" : { },
           "connectedSince" : "2020-06-16T11:53:30.052+08:00",
           "address" : "/10.23.67.12:53964"
         }, {
           "msgRateOut" : 0.0,
           "msgThroughputOut" : 0.0,
           "msgRateRedeliver" : 0.0,
           "consumerName" : "consumer-test-client",
           "availablePermits" : 1000,
           "unackedMessages" : 0,
           "blockedConsumerOnUnackedMsgs" : false,
           "metadata" : { },
           "connectedSince" : "2020-06-16T11:53:33.851+08:00",
           "address" : "/10.23.67.12:53969"
         } ],
         "isReplicated" : false
       }
     },
     "replication" : { },
     "deduplicationStatus" : "Disabled",
     "bytesInCounter" : 42367,
     "msgInCounter" : 612
   }
   ```
   
   #### System configuration
   **Pulsar version**: 2.5.0
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message