ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jonathanhur...@apache.org
Subject [29/30] ambari git commit: AMBARI-18654. Implement instrumented Lock for profiling/logging. (Attila Doroszlai via stoader)
Date Fri, 21 Oct 2016 17:20:17 GMT
AMBARI-18654. Implement instrumented Lock for profiling/logging. (Attila Doroszlai via stoader)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/462c8d9f
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/462c8d9f
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/462c8d9f

Branch: refs/heads/branch-feature-AMBARI-18456
Commit: 462c8d9f61554f42704013d53b6376379dc408e0
Parents: 3b159d2
Author: Attila Doroszlai <adoroszlai@hortonworks.com>
Authored: Fri Oct 21 18:51:31 2016 +0200
Committer: Toader, Sebastian <stoader@hortonworks.com>
Committed: Fri Oct 21 18:53:11 2016 +0200

----------------------------------------------------------------------
 .../server/configuration/Configuration.java     |  13 ++
 .../ambari/server/logging/LockFactory.java      | 128 ++++++++++++
 .../server/logging/LockProfileDelegate.java     | 186 +++++++++++++++++
 .../ambari/server/logging/ProfiledLock.java     |  50 +++++
 .../server/logging/ProfiledReentrantLock.java   | 115 ++++++++++
 .../logging/ProfiledReentrantReadWriteLock.java | 208 +++++++++++++++++++
 .../server/state/cluster/ClusterImpl.java       |  14 +-
 .../server/configuration/ConfigurationTest.java |  12 ++
 .../ambari/server/logging/LockFactoryTest.java  |  95 +++++++++
 .../ProfiledReentrantReadWriteLockTest.java     | 157 ++++++++++++++
 10 files changed, 974 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/462c8d9f/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index 1759dc4..1753216 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -1114,6 +1114,12 @@ public class Configuration {
 
 
   /**
+   * Enable the profiling of internal locks.
+   */
+  @Markdown(description = "Enable the profiling of internal locks.")
+  public static final ConfigurationProperty<Boolean> SERVER_LOCKS_PROFILING = new ConfigurationProperty<>("server.locks.profiling",
Boolean.FALSE);
+
+  /**
    * The size of the cache used to hold {@link HostRoleCommand} instances in-memory.
    */
   @Markdown(description = "The size of the cache which is used to hold current operations
in memory until they complete.")
@@ -4941,6 +4947,13 @@ public class Configuration {
   }
 
   /**
+   * @return true if lock profiling is enabled for Ambari Server, in which case LockFactory
should create instrumented locks
+   */
+  public boolean isServerLocksProfilingEnabled() {
+    return Boolean.parseBoolean(getProperty(SERVER_LOCKS_PROFILING));
+  }
+
+  /**
    * @return the capacity of async audit logger
    */
   public int getAuditLoggerCapacity() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/462c8d9f/ambari-server/src/main/java/org/apache/ambari/server/logging/LockFactory.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/logging/LockFactory.java
b/ambari-server/src/main/java/org/apache/ambari/server/logging/LockFactory.java
new file mode 100644
index 0000000..f40aa84
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/logging/LockFactory.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.logging;
+
+import com.google.common.base.Ticker;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.ambari.server.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Factory to create locks depending on configuration.  If lock profiling is enabled,
+ * it creates instrumented locks that collect statistics and log requests.  If profiling
is
+ * disabled, it creates regular reentrant locks.
+ *
+ * @see Configuration#isServerLocksProfilingEnabled()
+ */
+@Singleton
+public class LockFactory {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LockFactory.class);
+
+  private final boolean profiling;
+  private final Set<ProfiledLock> profiledLocks;
+
+  @Inject
+  public LockFactory(Configuration config) {
+    profiling = config.isServerLocksProfilingEnabled();
+    profiledLocks = profiling ? new CopyOnWriteArraySet<ProfiledLock>() : null;
+    LOG.info("Lock profiling is {}", profiling ? "enabled" : "disabled");
+  }
+
+  /**
+   * @return a new Lock instance (implementation depends on configuration setting) without
any special label to identify it in log messages
+   */
+  public Lock newLock() {
+    return newLock(getDefaultPrefix());
+  }
+
+  /**
+   * @return a new Lock instance (implementation depends on configuration setting) with <code>label</code>
to identify it in log messages
+   */
+  public Lock newLock(String label) {
+    ReentrantLock baseLock = new ReentrantLock();
+    if (profiling) {
+      ProfiledReentrantLock profiledLock = new ProfiledReentrantLock(baseLock, Ticker.systemTicker(),
label);
+      profiledLocks.add(profiledLock);
+      return profiledLock;
+    }
+    return baseLock;
+  }
+
+  /**
+   * @return a new ReadWriteLock instance (implementation depends on configuration setting)
without any special label to identify it in log messages
+   */
+  public ReadWriteLock newReadWriteLock() {
+    return newReadWriteLock(getDefaultPrefix());
+  }
+
+  /**
+   * @return a new ReadWriteLock instance (implementation depends on configuration setting)
with <code>label</code> to identify it in log messages
+   */
+  public ReadWriteLock newReadWriteLock(String label) {
+    ReentrantReadWriteLock baseLock = new ReentrantReadWriteLock();
+    if (profiling) {
+      ProfiledReentrantReadWriteLock profiledLock = new ProfiledReentrantReadWriteLock(baseLock,
Ticker.systemTicker(), label);
+      profiledLocks.add(profiledLock.readLock());
+      profiledLocks.add(profiledLock.writeLock());
+      return profiledLock;
+    }
+    return baseLock;
+  }
+
+  /**
+   * If lock profiling is enabled, append summary statistics about lock usage to <code>sb</code>
+   * @param sb the buffer to append the statistics to
+   */
+  public void debugDump(StringBuilder sb) {
+    if (profiling) {
+      sb.append("\n\t\tLocks: [");
+      for (ProfiledLock lock : profiledLocks) {
+        sb.append("\n\t\t\t").append(lock.getLabel())
+          .append(lock)
+          .append(" waited: ").append(lock.getTimeSpentWaitingForLock())
+          .append(" held: ").append(lock.getTimeSpentLocked())
+          .append(" times locked: ").append(lock.getLockCount());
+      }
+      if (!profiledLocks.isEmpty()) {
+        sb.append("\n");
+      }
+      sb.append("]");
+    }
+  }
+
+  private static String getDefaultPrefix() {
+    StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+    // 0: getStackTrace()
+    // 1: getDefaultPrefix()
+    // 2: newLock()
+    // 3: caller of newLock()
+    return stackTrace.length > 3
+      ? stackTrace[3].getFileName() + ":" + stackTrace[3].getLineNumber()
+      : "";
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/462c8d9f/ambari-server/src/main/java/org/apache/ambari/server/logging/LockProfileDelegate.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/logging/LockProfileDelegate.java
b/ambari-server/src/main/java/org/apache/ambari/server/logging/LockProfileDelegate.java
new file mode 100644
index 0000000..e37016e0
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/logging/LockProfileDelegate.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.logging;
+
+import com.google.common.base.Ticker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A helper object for Lock implementations that need logging/profiling.
+ * <p>
+ * It collects the time spent waiting for the lock, the time spent holding the lock,
+ * and the number of times the lock was taken.  Lock requests, acquisitions and releases
+ * are also logged at debug level.  The log message for requests contains a filtered stack
+ * trace, only including methods from org.apache.ambari and subpackages to eliminate the
noise.
+ */
+final class LockProfileDelegate {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LockProfileDelegate.class);
+
+  private final ThreadLocal<Long> lockRequestTime = new ThreadLocal<>();
+  private final ThreadLocal<Long> lockAcquireTime = new ThreadLocal<>();
+  private final ConcurrentMap<String, Long> timeSpentWaitingForLock = new ConcurrentHashMap<>();
+  private final ConcurrentMap<String, Long> timeSpentLocked = new ConcurrentHashMap<>();
+  private final ConcurrentMap<String, Integer> lockCount = new ConcurrentHashMap<>();
+  private final String label;
+  private final ProfiledLock lock;
+  private final Ticker ticker;
+
+  /**
+   * @param ticker is the source of time information, replaceable for testing purpose
+   * @param label is included in log messages (optional, may be empty or null)
+   * @param lock the lock to profile
+   */
+  LockProfileDelegate(Ticker ticker, String label, ProfiledLock lock) {
+    this.label = addSpacePostfixIfNeeded(label);
+    this.lock = lock;
+    this.ticker = ticker;
+  }
+
+  /**
+   * @return the label for this lock (including an extra space at the end, if the label is
otherwise not empty)
+   */
+  String getLabel() {
+    return label;
+  }
+
+  /**
+   * @return time spent waiting for the lock (in milliseconds) by thread name
+   */
+  Map<String, Long> getTimeSpentWaitingForLock() {
+    return new TreeMap<>(timeSpentWaitingForLock);
+  }
+
+  /**
+   * @return time spent holding the lock (in milliseconds) by thread name
+   */
+  Map<String, Long> getTimeSpentLocked() {
+    return new TreeMap<>(timeSpentLocked);
+  }
+
+  /**
+   * @return the number of times the lock was taken by thread name
+   */
+  Map<String, Integer> getLockCount() {
+    return new TreeMap<>(lockCount);
+  }
+
+  /**
+   * Should be called by the lock to indicate that the lock was requested.
+   *
+   * @return whether the lock was already owned by the current thread
+   */
+  boolean logRequest() {
+    boolean alreadyOwned = lock.isHeldByCurrentThread();
+    if (!alreadyOwned) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("{}request {} from {}", label, lock, getFilteredStackTrace());
+      }
+      lockRequestTime.set(ticker.read());
+    }
+    return alreadyOwned;
+  }
+
+  /**
+   * Should be called by the lock to indicate that the request for the lock was completed,
+   * either successfully (acquired) or unsuccessfully (not acquired due to timeout, etc.).
+   *
+   * @param alreadyOwned whether the current thread already owned the lock before the request
(if true, the call is ignored)
+   * @param acquired whether the lock was acquired by the requestor
+   */
+  void logRequestCompleted(boolean alreadyOwned, boolean acquired) {
+    if (!alreadyOwned) {
+      if (acquired) {
+        long elapsed = storeElapsedTime(lockRequestTime, timeSpentWaitingForLock);
+        LOG.debug("{}acquired {} after {} ms", label, lock, elapsed);
+        increment(lockCount);
+        lockAcquireTime.set(ticker.read());
+      } else {
+        LOG.debug("{}failed to acquire {}", label, lock);
+      }
+    }
+  }
+
+  /**
+   * Should be called by the lock to indicate that the lock was unlocked.  For reentrant
locks this may or may not
+   * mean the lock was actually released.
+   */
+  void logUnlock() {
+    boolean released = !lock.isHeldByCurrentThread();
+    if (released) {
+      long elapsed = storeElapsedTime(lockAcquireTime, timeSpentLocked);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("{}released {} after {} ms", label, lock, elapsed);
+      }
+    }
+  }
+
+  private long storeElapsedTime(ThreadLocal<Long> startHolder, ConcurrentMap<String,
Long> map) {
+    long end = ticker.read();
+    long elapsed = Long.MIN_VALUE;
+    Long start = startHolder.get();
+    if (start != null && start <= end) {
+      elapsed = TimeUnit.NANOSECONDS.toMillis(end - start);
+      String name = Thread.currentThread().getName();
+      map.putIfAbsent(name, 0L);
+      if (elapsed > 0) {
+        map.put(name, map.get(name) + elapsed);
+      }
+    }
+    startHolder.remove();
+    return elapsed;
+  }
+
+  private static void increment(ConcurrentMap<String, Integer> map) {
+    String name = Thread.currentThread().getName();
+    map.putIfAbsent(name, 0);
+    map.put(name, map.get(name) + 1);
+  }
+
+  private static String addSpacePostfixIfNeeded(String label) {
+    if (label == null) {
+      return "";
+    }
+    label = label.trim();
+    return label.length() > 0 ? label + " " : label;
+  }
+
+  private static String getFilteredStackTrace() {
+    StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+    StringBuilder sb = new StringBuilder();
+    for (StackTraceElement element : stackTrace) {
+      String className = element.getClassName();
+      if (className.startsWith("org.apache.ambari") && !className.startsWith("org.apache.ambari.server.logging"))
{
+        sb.append(className)
+          .append("#").append(element.getMethodName())
+          .append("(").append(element.getFileName())
+          .append(":").append(element.getLineNumber())
+          .append(");\t");
+      }
+    }
+    return sb.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/462c8d9f/ambari-server/src/main/java/org/apache/ambari/server/logging/ProfiledLock.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/logging/ProfiledLock.java
b/ambari-server/src/main/java/org/apache/ambari/server/logging/ProfiledLock.java
new file mode 100644
index 0000000..9beb664
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/logging/ProfiledLock.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.logging;
+
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Interface for lock implementations that should be profiled and need
+ * the assistance of LockProfileDelegate to achieve it.
+ */
+interface ProfiledLock extends Lock {
+
+  /**
+   * @return true if the lock is already held by the current thread
+   */
+  boolean isHeldByCurrentThread();
+
+  /**
+   * @return time spent waiting for the lock (in milliseconds) by thread name
+   */
+  Map<String, Long> getTimeSpentWaitingForLock();
+
+  /**
+   * @return time spent holding the lock (in milliseconds) by thread name
+   */
+  Map<String, Long> getTimeSpentLocked();
+
+  /**
+   * @return the number of times the lock was taken by thread name
+   */
+  Map<String, Integer> getLockCount();
+
+  String getLabel();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/462c8d9f/ambari-server/src/main/java/org/apache/ambari/server/logging/ProfiledReentrantLock.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/logging/ProfiledReentrantLock.java
b/ambari-server/src/main/java/org/apache/ambari/server/logging/ProfiledReentrantLock.java
new file mode 100644
index 0000000..a45b1e4
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/logging/ProfiledReentrantLock.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.logging;
+
+import com.google.common.base.Ticker;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Implements profiling for a ReentrantLock.
+ */
+final class ProfiledReentrantLock implements ProfiledLock {
+
+  private final ReentrantLock delegate;
+  private final LockProfileDelegate helper;
+
+  /**
+   * @param delegate the lock to profile
+   * @param ticker is the source of time information, replaceable for testing purpose
+   * @param label is included in log messages for easier identification of locks (optional,
may be empty or null)
+   */
+  ProfiledReentrantLock(ReentrantLock delegate, Ticker ticker, String label) {
+    this.delegate = delegate;
+    helper = new LockProfileDelegate(ticker, label, this);
+  }
+
+  @Override
+  public void lock() {
+    boolean alreadyOwned = helper.logRequest();
+    delegate.lock();
+    helper.logRequestCompleted(alreadyOwned, true);
+  }
+
+  @Override
+  public void lockInterruptibly() throws InterruptedException {
+    boolean alreadyOwned = helper.logRequest();
+    delegate.lockInterruptibly();
+    helper.logRequestCompleted(alreadyOwned, true);
+  }
+
+  @Override
+  public boolean tryLock() {
+    boolean alreadyOwned = helper.logRequest();
+    boolean result = delegate.tryLock();
+    helper.logRequestCompleted(alreadyOwned, result);
+    return result;
+  }
+
+  @Override
+  public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
+    boolean alreadyOwned = helper.logRequest();
+    boolean result = delegate.tryLock(timeout, unit);
+    helper.logRequestCompleted(alreadyOwned, result);
+    return result;
+  }
+
+  @Override
+  public void unlock() {
+    delegate.unlock();
+    helper.logUnlock();
+  }
+
+  @Override
+  public Condition newCondition() {
+    return delegate.newCondition();
+  }
+
+  @Override
+  public boolean isHeldByCurrentThread() {
+    return delegate.isHeldByCurrentThread();
+  }
+
+  @Override
+  public Map<String, Long> getTimeSpentWaitingForLock() {
+    return helper.getTimeSpentWaitingForLock();
+  }
+
+  @Override
+  public Map<String, Long> getTimeSpentLocked() {
+    return helper.getTimeSpentLocked();
+  }
+
+  @Override
+  public Map<String, Integer> getLockCount() {
+    return helper.getLockCount();
+  }
+
+  @Override
+  public String getLabel() {
+    return helper.getLabel();
+  }
+
+  @Override
+  public String toString() {
+    return delegate.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/462c8d9f/ambari-server/src/main/java/org/apache/ambari/server/logging/ProfiledReentrantReadWriteLock.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/logging/ProfiledReentrantReadWriteLock.java
b/ambari-server/src/main/java/org/apache/ambari/server/logging/ProfiledReentrantReadWriteLock.java
new file mode 100644
index 0000000..4452b67
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/logging/ProfiledReentrantReadWriteLock.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.logging;
+
+import com.google.common.base.Ticker;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Implements profiling for a ReentrantReadWriteLock.
+ */
+final class ProfiledReentrantReadWriteLock implements ReadWriteLock {
+
+  private final ProfiledReadLock readLock;
+  private final ProfiledWriteLock writeLock;
+
+  /**
+   * @param delegate the lock to profile
+   * @param ticker is the source of time information, replaceable for testing purpose
+   * @param label is included in log messages for easier identification of locks (optional,
may be empty or null)
+   */
+  ProfiledReentrantReadWriteLock(ReentrantReadWriteLock delegate, Ticker ticker, String label)
{
+    readLock = new ProfiledReadLock(delegate, ticker, label);
+    writeLock = new ProfiledWriteLock(delegate, ticker, label);
+  }
+
+  @Override
+  public ProfiledLock readLock() {
+    return readLock;
+  }
+
+  @Override
+  public ProfiledLock writeLock() {
+    return writeLock;
+  }
+
+  private static class ProfiledReadLock extends ReentrantReadWriteLock.ReadLock implements
ProfiledLock {
+
+    private final LockProfileDelegate helper;
+    private final ReentrantReadWriteLock delegate;
+
+    ProfiledReadLock(ReentrantReadWriteLock delegate, Ticker ticker, String label) {
+      super(delegate);
+      this.delegate = delegate;
+      helper = new LockProfileDelegate(ticker, label, this);
+    }
+
+    @Override
+    public void lock() {
+      boolean alreadyOwned = helper.logRequest();
+      super.lock();
+      helper.logRequestCompleted(alreadyOwned, true);
+    }
+
+    @Override
+    public void lockInterruptibly() throws InterruptedException {
+      boolean alreadyOwned = helper.logRequest();
+      super.lockInterruptibly();
+      helper.logRequestCompleted(alreadyOwned, true);
+    }
+
+    @Override
+    public boolean tryLock() {
+      boolean alreadyOwned = helper.logRequest();
+      boolean result = super.tryLock();
+      helper.logRequestCompleted(alreadyOwned, result);
+      return result;
+    }
+
+    @Override
+    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
+      boolean alreadyOwned = helper.logRequest();
+      boolean result = super.tryLock(timeout, unit);
+      helper.logRequestCompleted(alreadyOwned, result);
+      return result;
+    }
+
+    @Override
+    public void unlock() {
+      super.unlock();
+      helper.logUnlock();
+    }
+
+    /**
+     * @return true if any thread holds this lock (note that the read lock may be shared
between threads, hence the different semantics)
+     */
+    @Override
+    public boolean isHeldByCurrentThread() {
+      return delegate.getReadHoldCount() > 0;
+    }
+
+    @Override
+    public Map<String, Long> getTimeSpentWaitingForLock() {
+      return helper.getTimeSpentWaitingForLock();
+    }
+
+    @Override
+    public Map<String, Long> getTimeSpentLocked() {
+      return helper.getTimeSpentLocked();
+    }
+
+    @Override
+    public Map<String, Integer> getLockCount() {
+      return helper.getLockCount();
+    }
+
+    @Override
+    public String toString() {
+      return delegate.readLock().toString();
+    }
+
+    @Override
+    public String getLabel() {
+      return helper.getLabel();
+    }
+  }
+
+  private static class ProfiledWriteLock extends ReentrantReadWriteLock.WriteLock implements
ProfiledLock {
+
+    private final LockProfileDelegate helper;
+    private final ReentrantReadWriteLock delegate;
+
+    ProfiledWriteLock(ReentrantReadWriteLock delegate, Ticker ticker, String label) {
+      super(delegate);
+      this.delegate = delegate;
+      helper = new LockProfileDelegate(ticker, label, this);
+    }
+
+    @Override
+    public void lock() {
+      boolean alreadyOwned = helper.logRequest();
+      super.lock();
+      helper.logRequestCompleted(alreadyOwned, true);
+    }
+
+    @Override
+    public void lockInterruptibly() throws InterruptedException {
+      boolean alreadyOwned = helper.logRequest();
+      super.lockInterruptibly();
+      helper.logRequestCompleted(alreadyOwned, true);
+    }
+
+    @Override
+    public boolean tryLock() {
+      boolean alreadyOwned = helper.logRequest();
+      boolean result = super.tryLock();
+      helper.logRequestCompleted(alreadyOwned, result);
+      return result;
+    }
+
+    @Override
+    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
+      boolean alreadyOwned = helper.logRequest();
+      boolean result = super.tryLock(timeout, unit);
+      helper.logRequestCompleted(alreadyOwned, result);
+      return result;
+    }
+
+    @Override
+    public void unlock() {
+      super.unlock();
+      helper.logUnlock();
+    }
+
+    @Override
+    public Map<String, Long> getTimeSpentWaitingForLock() {
+      return helper.getTimeSpentWaitingForLock();
+    }
+
+    @Override
+    public Map<String, Long> getTimeSpentLocked() {
+      return helper.getTimeSpentLocked();
+    }
+
+    @Override
+    public Map<String, Integer> getLockCount() {
+      return helper.getLockCount();
+    }
+
+    @Override
+    public String toString() {
+      return delegate.writeLock().toString();
+    }
+
+    @Override
+    public String getLabel() {
+      return helper.getLabel();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/462c8d9f/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index 84697b8..bb46c5c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -36,7 +36,6 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.annotation.Nullable;
 import javax.persistence.EntityManager;
@@ -67,6 +66,7 @@ import org.apache.ambari.server.events.jpa.EntityManagerCacheInvalidationEvent;
 import org.apache.ambari.server.events.jpa.JPAEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.events.publishers.JPAEventPublisher;
+import org.apache.ambari.server.logging.LockFactory;
 import org.apache.ambari.server.orm.RequiresSession;
 import org.apache.ambari.server.orm.cache.HostConfigMapping;
 import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
@@ -201,11 +201,10 @@ public class ClusterImpl implements Cluster {
    */
   private final Map<Long, RequestExecution> requestExecutions = new ConcurrentHashMap<>();
 
-  private final ReadWriteLock clusterGlobalLock = new ReentrantReadWriteLock();
+  private final ReadWriteLock clusterGlobalLock;
 
   // This is a lock for operations that do not need to be cluster global
-  private final ReentrantReadWriteLock hostTransitionStateLock = new ReentrantReadWriteLock();
-  private final Lock hostTransitionStateWriteLock = hostTransitionStateLock.writeLock();
+  private final Lock hostTransitionStateWriteLock;
 
   /**
    * The unique ID of the {@link @ClusterEntity}.
@@ -239,6 +238,9 @@ public class ClusterImpl implements Cluster {
   private ConfigFactory configFactory;
 
   @Inject
+  private LockFactory lockFactory;
+
+  @Inject
   private HostConfigMappingDAO hostConfigMappingDAO;
 
   @Inject
@@ -317,6 +319,9 @@ public class ClusterImpl implements Cluster {
 
     injector.injectMembers(this);
 
+    clusterGlobalLock = lockFactory.newReadWriteLock("clusterGlobalLock");
+    hostTransitionStateWriteLock = lockFactory.newLock("hostTransitionStateLock");
+
     loadStackVersion();
     loadServices();
     loadServiceHostComponents();
@@ -1945,6 +1950,7 @@ public class ClusterImpl implements Cluster {
       sb.append(' ');
     }
     sb.append(" ] }");
+    lockFactory.debugDump(sb);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/ambari/blob/462c8d9f/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
b/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
index a56ace5..f90cf76 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
@@ -577,6 +577,18 @@ public class ConfigurationTest {
   }
 
   @Test
+  public void testServerLocksProfilingEnabled() throws Exception {
+    final Properties ambariProperties = new Properties();
+    final Configuration configuration = new Configuration(ambariProperties);
+
+    Assert.assertFalse(configuration.isServerLocksProfilingEnabled());
+
+    ambariProperties.setProperty(Configuration.SERVER_LOCKS_PROFILING.getKey(), Boolean.TRUE.toString());
+
+    Assert.assertTrue(configuration.isServerLocksProfilingEnabled());
+  }
+
+  @Test
   public void testAlertCaching() throws Exception {
     final Properties ambariProperties = new Properties();
     final Configuration configuration = new Configuration(ambariProperties);

http://git-wip-us.apache.org/repos/asf/ambari/blob/462c8d9f/ambari-server/src/test/java/org/apache/ambari/server/logging/LockFactoryTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/logging/LockFactoryTest.java
b/ambari-server/src/test/java/org/apache/ambari/server/logging/LockFactoryTest.java
new file mode 100644
index 0000000..23cd95c
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/logging/LockFactoryTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.logging;
+
+import org.apache.ambari.server.configuration.Configuration;
+import org.easymock.EasyMockSupport;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.easymock.EasyMock.expect;
+
+public class LockFactoryTest extends EasyMockSupport {
+
+  @Test
+  public void createsRegularLockIfDebugIsDisabled() {
+    Configuration config = createNiceMock(Configuration.class);
+    expect(config.isServerLocksProfilingEnabled()).andReturn(false);
+    replayAll();
+
+    LockFactory factory = new LockFactory(config);
+    Lock lock = factory.newLock();
+    Assert.assertTrue(lock instanceof ReentrantLock);
+    verifyAll();
+  }
+
+  @Test
+  public void createsRegularReadWriteLockIfDebugIsDisabled() {
+    Configuration config = createNiceMock(Configuration.class);
+    expect(config.isServerLocksProfilingEnabled()).andReturn(false);
+    replayAll();
+
+    LockFactory factory = new LockFactory(config);
+    ReadWriteLock lock = factory.newReadWriteLock();
+    Assert.assertTrue(lock instanceof ReentrantReadWriteLock);
+    verifyAll();
+  }
+
+  @Test
+  public void createsProfiledLockIfProfilingIsEnabled() {
+    Configuration config = createNiceMock(Configuration.class);
+    expect(config.isServerLocksProfilingEnabled()).andReturn(true);
+    replayAll();
+
+    LockFactory factory = new LockFactory(config);
+    Lock lock = factory.newLock();
+
+    Assert.assertTrue(lock instanceof ProfiledReentrantLock);
+
+    String label = ((ProfiledLock) lock).getLabel();
+    Assert.assertTrue(label, label.contains("LockFactoryTest.java"));
+
+    verifyAll();
+  }
+
+  @Test
+  public void createsProfiledReadWriteLockIfProfilingIsEnabled() {
+    Configuration config = createNiceMock(Configuration.class);
+    expect(config.isServerLocksProfilingEnabled()).andReturn(true);
+    replayAll();
+
+    LockFactory factory = new LockFactory(config);
+    ReadWriteLock lock = factory.newReadWriteLock();
+
+    Assert.assertTrue(lock instanceof ProfiledReentrantReadWriteLock);
+
+    String readLockLabel = ((ProfiledReentrantReadWriteLock) lock).readLock().getLabel();
+    Assert.assertTrue(readLockLabel, readLockLabel.contains("LockFactoryTest.java"));
+
+    String writeLockLabel = ((ProfiledReentrantReadWriteLock) lock).writeLock().getLabel();
+    Assert.assertTrue(writeLockLabel, writeLockLabel.contains("LockFactoryTest.java"));
+
+    verifyAll();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/462c8d9f/ambari-server/src/test/java/org/apache/ambari/server/logging/ProfiledReentrantReadWriteLockTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/logging/ProfiledReentrantReadWriteLockTest.java
b/ambari-server/src/test/java/org/apache/ambari/server/logging/ProfiledReentrantReadWriteLockTest.java
new file mode 100644
index 0000000..3f633e6
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/logging/ProfiledReentrantReadWriteLockTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.logging;
+
+import com.google.common.base.Ticker;
+import org.easymock.EasyMockSupport;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.easymock.EasyMock.expect;
+
+public class ProfiledReentrantReadWriteLockTest extends EasyMockSupport {
+
+  private static final String LABEL = "label";
+
+  @Test
+  public void lockingReadLockOnlyLocksReadLock() {
+    ReentrantReadWriteLock delegate = new ReentrantReadWriteLock();
+    ProfiledReentrantReadWriteLock testSubject = new ProfiledReentrantReadWriteLock(delegate,
Ticker.systemTicker(), LABEL);
+
+    testSubject.readLock().lock();
+
+    Assert.assertEquals(1, delegate.getReadHoldCount());
+    Assert.assertEquals(0, delegate.getWriteHoldCount());
+  }
+
+  @Test
+  public void lockingWriteLockOnlyLocksWriteLock() {
+    ReentrantReadWriteLock delegate = new ReentrantReadWriteLock();
+    ProfiledReentrantReadWriteLock testSubject = new ProfiledReentrantReadWriteLock(delegate,
Ticker.systemTicker(), LABEL);
+
+    testSubject.writeLock().lock();
+
+    Assert.assertEquals(0, delegate.getReadHoldCount());
+    Assert.assertEquals(1, delegate.getWriteHoldCount());
+  }
+
+  @Test
+  public void timeWaitingForReadLockIsRecorded() {
+    Ticker ticker = createMock(Ticker.class);
+    ProfiledLock testSubject = new ProfiledReentrantReadWriteLock(new ReentrantReadWriteLock(),
ticker, LABEL).readLock();
+    timeWaitingForLockIsRecorded(testSubject, ticker);
+  }
+
+  @Test
+  public void timeWaitingForWriteLockIsRecorded() {
+    Ticker ticker = createMock(Ticker.class);
+    ProfiledLock testSubject = new ProfiledReentrantReadWriteLock(new ReentrantReadWriteLock(),
ticker, LABEL).writeLock();
+    timeWaitingForLockIsRecorded(testSubject, ticker);
+  }
+
+  private void timeWaitingForLockIsRecorded(ProfiledLock testSubject, Ticker ticker) {
+    expect(ticker.read()).andReturn(TimeUnit.MILLISECONDS.toNanos(1L));
+    expect(ticker.read()).andReturn(TimeUnit.MILLISECONDS.toNanos(4L));
+    expect(ticker.read()).andReturn(TimeUnit.MILLISECONDS.toNanos(5L));
+    replayAll();
+
+    testSubject.lock();
+
+    Assert.assertEquals(Collections.singletonMap(Thread.currentThread().getName(), 4L - 1L),
testSubject.getTimeSpentWaitingForLock());
+    verifyAll();
+  }
+
+  @Test
+  public void timeReadLockSpentLockedIsRecorded() {
+    Ticker ticker = createMock(Ticker.class);
+    ProfiledLock testSubject = new ProfiledReentrantReadWriteLock(new ReentrantReadWriteLock(),
ticker, LABEL).readLock();
+    timeSpentLockedIsRecorded(ticker, testSubject);
+  }
+
+  @Test
+  public void timeWriteLockSpentLockedIsRecorded() {
+    Ticker ticker = createMock(Ticker.class);
+    ProfiledLock testSubject = new ProfiledReentrantReadWriteLock(new ReentrantReadWriteLock(),
ticker, LABEL).writeLock();
+    timeSpentLockedIsRecorded(ticker, testSubject);
+  }
+
+  @Test
+  public void timeLockSpentLockedIsRecorded() {
+    Ticker ticker = createMock(Ticker.class);
+    ProfiledLock testSubject = new ProfiledReentrantLock(new ReentrantLock(), ticker, LABEL);
+    timeSpentLockedIsRecorded(ticker, testSubject);
+  }
+
+  private void timeSpentLockedIsRecorded(Ticker ticker, ProfiledLock testSubject) {
+    expect(ticker.read()).andReturn(TimeUnit.MILLISECONDS.toNanos(0L));
+    expect(ticker.read()).andReturn(TimeUnit.MILLISECONDS.toNanos(0L));
+    expect(ticker.read()).andReturn(TimeUnit.MILLISECONDS.toNanos(6L));
+    expect(ticker.read()).andReturn(TimeUnit.MILLISECONDS.toNanos(13L));
+    replayAll();
+
+    testSubject.lock();
+    testSubject.unlock();
+
+    Assert.assertEquals(Collections.singletonMap(Thread.currentThread().getName(), 13L -
6L), testSubject.getTimeSpentLocked());
+    verifyAll();
+  }
+
+  @Test
+  public void onlyOutermostLockUnlockIsProfiledForReadLock() {
+    Ticker ticker = createMock(Ticker.class);
+    ProfiledLock testSubject = new ProfiledReentrantReadWriteLock(new ReentrantReadWriteLock(),
ticker, LABEL).readLock();
+    onlyOutermostLockUnlockIsProfiled(testSubject, ticker);
+  }
+
+  @Test
+  public void onlyOutermostLockUnlockIsProfiledForWriteLock() {
+    Ticker ticker = createMock(Ticker.class);
+    ProfiledLock testSubject = new ProfiledReentrantReadWriteLock(new ReentrantReadWriteLock(),
ticker, LABEL).readLock();
+    onlyOutermostLockUnlockIsProfiled(testSubject, ticker);
+  }
+
+  @Test
+  public void onlyOutermostLockUnlockIsProfiled() {
+    Ticker ticker = createMock(Ticker.class);
+    ProfiledLock testSubject = new ProfiledReentrantLock(new ReentrantLock(), ticker, LABEL);
+    onlyOutermostLockUnlockIsProfiled(testSubject, ticker);
+  }
+
+  private void onlyOutermostLockUnlockIsProfiled(ProfiledLock testSubject, Ticker ticker)
{
+    expect(ticker.read()).andReturn(TimeUnit.MILLISECONDS.toNanos(0L));
+    expect(ticker.read()).andReturn(TimeUnit.MILLISECONDS.toNanos(0L));
+    expect(ticker.read()).andReturn(TimeUnit.MILLISECONDS.toNanos(5L));
+    expect(ticker.read()).andReturn(TimeUnit.MILLISECONDS.toNanos(19L));
+    replayAll();
+
+    testSubject.lock();
+    testSubject.lock();
+    testSubject.unlock();
+    testSubject.unlock();
+
+    Assert.assertEquals(Collections.singletonMap(Thread.currentThread().getName(), 19L -
5L), testSubject.getTimeSpentLocked());
+    Assert.assertEquals(Collections.singletonMap(Thread.currentThread().getName(), 1), testSubject.getLockCount());
+    verifyAll();
+  }
+
+}


Mime
View raw message