pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] codelipenghui commented on issue #3131: Consumer stop receive messages from broker
Date Wed, 09 Jan 2019 13:41:31 GMT
codelipenghui commented on issue #3131: Consumer stop receive messages from broker
URL: https://github.com/apache/pulsar/issues/3131#issuecomment-452700099
 
 
   This is a test case to show how consumer stop consume messages:
   
   ```java
   package com.zhaopin.pulsar.issues;
   
   import org.apache.pulsar.client.api.Consumer;
   import org.apache.pulsar.client.api.Message;
   import org.apache.pulsar.client.api.Producer;
   import org.apache.pulsar.client.api.PulsarClient;
   import org.apache.pulsar.client.api.PulsarClientException;
   import org.apache.pulsar.client.api.SubscriptionType;
   
   import java.util.ArrayList;
   import java.util.List;
   import java.util.concurrent.TimeUnit;
   
   public class ConsumerStopReceiveMessages {
   
       public static void main(String[] args) throws PulsarClientException {
   
           final String topic = "your-topic";
   
           /**
            * Broker configs:
            *
            * maxUnackedMessagesPerConsumer=500
            * maxUnackedMessagesPerSubscription=2000
            */
           PulsarClient client = PulsarClient.builder()
                   .serviceUrl("pulsar://localhost:6650")
                   .build();
   
           // Create a producer with enable message batching
           Producer<byte[]> producer = client.newProducer()
                   .topic(topic)
                   .batchingMaxMessages(1000)
                   .batchingMaxPublishDelay(30, TimeUnit.SECONDS)
                   .blockIfQueueFull(true)
                   .maxPendingMessages(1000)
                   .enableBatching(true)
                   .create();
   
           // Create 6 consumers
           List<Consumer<byte[]>> consumers = new ArrayList<>();
           for (int i = 0; i < 6; i++) {
               consumers.add(client.newConsumer()
                       .topic(topic)
                       .subscriptionType(SubscriptionType.Shared)
                       .subscriptionName("test")
                       .ackTimeout(10, TimeUnit.SECONDS)
                       .receiverQueueSize(100)
                       .subscribe());
           }
   
           // Producer start publish messages
           new Thread(() -> {
               for (; ; ) {
                   producer.sendAsync(("a message").getBytes());
               }
           }).start();
   
           // Consumers start consume messages
           consumers.forEach(consumer -> new Thread(() -> {
               do {
                   // can't receive message
                   try {
                       Message<byte[]> msg = consumer.receive();
                       System.out.println(consumer.getConsumerName() + " ---- " + new String(msg.getValue()));
                       Thread.sleep(50);
                       consumer.acknowledge(msg);
                   } catch (Exception e) {
                       e.printStackTrace();
                   }
               } while (true);
           }).start());
       }
   }
   
   ```
   
   After a while, you will get following logs:
   
   ```
   1cce6 ---- a message
   1cce6 ---- a message
   1cce6 ---- a message
   1cce6 ---- a message
   1cce6 ---- a message
   1cce6 ---- a message
   1cce6 ---- a message
   [flink-source2] [test] [94eca] Prefetched messages: 0 --- Consume throughput: 0.53 msgs/s
--- Throughput received: 0.00 msg/s --- 0.00 Mbit/s --- Ack sent rate: 0 ack/s --- Failed
messages: 0 --- Failed acks: {}
   1cce6 ---- a message
   1cce6 ---- a message
   [flink-source2] [test] [1cce6] Prefetched messages: 6812 --- Consume throughput: 6.37 msgs/s
--- Throughput received: 0.00 msg/s --- 0.00 Mbit/s --- Ack sent rate: 0 ack/s --- Failed
messages: 0 --- Failed acks: {}
   1cce6 ---- a message
   [ConsumerBase{subscription='test', consumerName='1cce6', topic='flink-source2'}] 7 messages
have timed-out
   [flink-source2] [test] [00c2e] Prefetched messages: 0 --- Consume throughput: 3.20 msgs/s
--- Throughput received: 0.00 msg/s --- 0.00 Mbit/s --- Ack sent rate: 0 ack/s --- Failed
messages: 0 --- Failed acks: {}
   [flink-source2] [test] [1b009] Prefetched messages: 0 --- Consume throughput: 3.20 msgs/s
--- Throughput received: 0.00 msg/s --- 0.00 Mbit/s --- Ack sent rate: 0 ack/s --- Failed
messages: 0 --- Failed acks: {}
   [flink-source2] [pulsar-api-test-14-3231] Pending messages: 1 --- Publish throughput: 7398.33
msg/s --- 0.51 Mbit/s --- Latency: med: 244.845 ms - 95pct: 588.543 ms - 99pct: 712.777 ms
- 99.9pct: 851.404 ms - max: 851.478 ms --- Ack received rate: 7398.33 ack/s --- Failed messages:
0
   [flink-source2] [test] [1cce6] Prefetched messages: 0 --- Consume throughput: 0.02 msgs/s
--- Throughput received: 0.00 msg/s --- 0.00 Mbit/s --- Ack sent rate: 0 ack/s --- Failed
messages: 0 --- Failed acks: {}
   [flink-source2] [pulsar-api-test-14-3231] Pending messages: 1 --- Publish throughput: 6949.81
msg/s --- 0.48 Mbit/s --- Latency: med: 277.904 ms - 95pct: 609.112 ms - 99pct: 659.421 ms
- 99.9pct: 746.124 ms - max: 746.239 ms --- Ack received rate: 6949.81 ack/s --- Failed messages:
0
   [flink-source2] [pulsar-api-test-14-3231] Pending messages: 1 --- Publish throughput: 9166.40
msg/s --- 0.63 Mbit/s --- Latency: med: 163.221 ms - 95pct: 530.519 ms - 99pct: 651.178 ms
- 99.9pct: 813.009 ms - max: 864.875 ms --- Ack received rate: 9166.40 ack/s --- Failed messages:
0
   
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message