brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aleds...@apache.org
Subject [1/2] brooklyn-server git commit: Log a warning in case of max-concurrency latch double-acquire
Date Sun, 29 Jan 2017 20:12:04 GMT
Repository: brooklyn-server
Updated Branches:
  refs/heads/master 6761825a5 -> b2ae1ccce


Log a warning in case of max-concurrency latch double-acquire


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/fa56d101
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/fa56d101
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/fa56d101

Branch: refs/heads/master
Commit: fa56d10133496b191027dd027ccf64a78b6c93df
Parents: d81af3e
Author: Svetoslav Neykov <svetoslav.neykov@cloudsoftcorp.com>
Authored: Fri Jan 27 13:05:44 2017 +0200
Committer: Svetoslav Neykov <svetoslav.neykov@cloudsoftcorp.com>
Committed: Fri Jan 27 15:05:47 2017 +0200

----------------------------------------------------------------------
 .../brooklyn/core/sensor/ReleaseableLatch.java  | 32 +++++++++++++++++++-
 1 file changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/fa56d101/core/src/main/java/org/apache/brooklyn/core/sensor/ReleaseableLatch.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/ReleaseableLatch.java b/core/src/main/java/org/apache/brooklyn/core/sensor/ReleaseableLatch.java
index 97d4212..4c9ae76 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/ReleaseableLatch.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/ReleaseableLatch.java
@@ -18,11 +18,16 @@
  */
 package org.apache.brooklyn.core.sensor;
 
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
 
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.util.core.flags.TypeCoercions;
 import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Function;
 
@@ -56,16 +61,33 @@ public interface ReleaseableLatch {
         }
 
         private static class MaxConcurrencyLatch implements ReleaseableLatch {
+            private static final Logger LOG = LoggerFactory.getLogger(MaxConcurrencyLatch.class);
+
             private int permits;
             private transient final Semaphore sem;
 
+            // Not initialized on rebind, but #readResolve() will make sure a
+            // properly initialized object is used instead.
+            private transient final Set<Entity> ownerEntities = Collections.newSetFromMap(new
ConcurrentHashMap<Entity, Boolean>());
+
             public MaxConcurrencyLatch(int permits) {
                 this.permits = permits;
                 this.sem = new Semaphore(permits);
             }
 
+            /**
+             * Decreases the available permits by one regardless of whether a previous unreleased
call
+             * to the method was done by {@code caller} in this or another thread.
+             */
             @Override
             public void acquire(Entity caller) {
+                if (!ownerEntities.add(caller)) {
+                    LOG.warn("Entity {} acquiring permit multiple times with ~{} permits
available and ~{} threads waiting in queue",
+                            new Object[] {caller, sem.availablePermits(), sem.getQueueLength()});
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Entity " + caller + " double-acquire call stack:", new
RuntimeException("Call stack for permit double-acquire"));
+                    }
+                }
                 try {
                     sem.acquire();
                 } catch (InterruptedException e) {
@@ -73,9 +95,17 @@ public interface ReleaseableLatch {
                 }
             }
 
+            /**
+             * Increments the available permits by one. No check is done whether a previous
call
+             * to {@code acquire} has been made.
+             */
             @Override
             public void release(Entity caller) {
-                sem.release();
+                try {
+                    sem.release();
+                } finally {
+                    ownerEntities.remove(caller);
+                }
             }
 
             // On rebind reset thread count


Mime
View raw message