pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: Use synchronized when accessing consumers identity map (#3540)
Date Thu, 07 Feb 2019 22:25:08 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e531e0  Use synchronized when accessing consumers identity map (#3540)
1e531e0 is described below

commit 1e531e0c5dd24db9f85f7e97bb49ed7ad93bf617
Author: Matteo Merli <mmerli@apache.org>
AuthorDate: Thu Feb 7 14:22:28 2019 -0800

    Use synchronized when accessing consumers identity map (#3540)
---
 .../java/org/apache/pulsar/client/impl/PulsarClientImpl.java | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 26b7937..63b4071 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -685,11 +685,13 @@ public class PulsarClientImpl implements PulsarClient {
 
     @SuppressWarnings("unchecked")
     private <T> Optional<ConsumerBase<T>> subscriptionExist(ConsumerConfigurationData<?>
conf) {
-        Optional<ConsumerBase<?>> subscriber = consumers.keySet().stream()
-                .filter(consumerBase -> consumerBase.getSubType().equals(PulsarApi.CommandSubscribe.SubType.Shared))
-                .filter(c -> c.getSubscription().equals(conf.getSubscriptionName()))
-                .findFirst();
-        return subscriber.map(ConsumerBase.class::cast);
+        synchronized (consumers) {
+            Optional<ConsumerBase<?>> subscriber = consumers.keySet().stream()
+                    .filter(consumerBase -> consumerBase.getSubType().equals(PulsarApi.CommandSubscribe.SubType.Shared))
+                    .filter(c -> c.getSubscription().equals(conf.getSubscriptionName()))
+                    .findFirst();
+            return subscriber.map(ConsumerBase.class::cast);
+        }
     }
 
     private static EventLoopGroup getEventLoopGroup(ClientConfigurationData conf) {


Mime
View raw message