hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [2/3] hadoop git commit: HDFS-10742. Measure lock time in FsDatasetImpl. Contributed by Chen Liang.
Date Sun, 11 Sep 2016 01:11:29 GMT
HDFS-10742. Measure lock time in FsDatasetImpl. Contributed by Chen Liang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/04f620c4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/04f620c4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/04f620c4

Branch: refs/heads/branch-2.8
Commit: 04f620c4d01c3954c780465087a327bba76fe6df
Parents: d5ea508
Author: Arpit Agarwal <arp@apache.org>
Authored: Sat Sep 10 18:01:37 2016 -0700
Committer: Arpit Agarwal <arp@apache.org>
Committed: Sat Sep 10 18:01:37 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/util/AutoCloseableLock.java   |  28 ++-
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   5 +
 .../apache/hadoop/hdfs/InstrumentedLock.java    | 185 +++++++++++++++++++
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  11 +-
 .../src/main/resources/hdfs-default.xml         |   7 +
 .../hadoop/hdfs/TestInstrumentedLock.java       | 166 +++++++++++++++++
 6 files changed, 394 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f620c4/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java
index 2aa8578..d920bc6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java
@@ -17,22 +17,33 @@
  */
 package org.apache.hadoop.util;
 
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This is a wrap class of a ReentrantLock. Extending AutoCloseable
  * interface such that the users can use a try-with-resource syntax.
  */
 public class AutoCloseableLock implements AutoCloseable {
 
-  private final ReentrantLock lock;
+  private final Lock lock;
 
   /**
    * Creates an instance of {@code AutoCloseableLock}, initializes
-   * the underlying {@code ReentrantLock} object.
+   * the underlying lock instance with a new {@code ReentrantLock}.
    */
   public AutoCloseableLock() {
-    this.lock = new ReentrantLock();
+    this(new ReentrantLock());
+  }
+
+  /**
+   * Wrap provided Lock instance.
+   * @param lock Lock instance to wrap in AutoCloseable API.
+   */
+  public AutoCloseableLock(Lock lock) {
+    this.lock = lock;
   }
 
   /**
@@ -86,7 +97,7 @@ public class AutoCloseableLock implements AutoCloseable {
 
   /**
    * A wrapper method that makes a call to {@code tryLock()} of
-   * the underlying {@code ReentrantLock} object.
+   * the underlying {@code Lock} object.
    *
    * If the lock is not held by another thread, acquires the lock, set the
    * hold count to one and returns {@code true}.
@@ -116,7 +127,12 @@ public class AutoCloseableLock implements AutoCloseable {
    * @return {@code true} if any thread holds this lock and
    *         {@code false} otherwise
    */
-  public boolean isLocked() {
-    return lock.isLocked();
+  @VisibleForTesting
+  boolean isLocked() {
+    if (lock instanceof ReentrantLock) {
+      return ((ReentrantLock)lock).isLocked();
+    }
+    throw new UnsupportedOperationException();
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f620c4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 9eee1ac..382404c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -383,6 +383,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY =
       "dfs.namenode.read-lock-reporting-threshold-ms";
   public static final long    DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 5000L;
+  // Threshold for how long the lock warnings must be suppressed
+  public static final String DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY =
+      "dfs.lock.suppress.warning.interval";
+  public static final long DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT =
+      10000; //ms
 
   public static final String  DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor";
   public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f620c4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java
new file mode 100644
index 0000000..6279e95
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Timer;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This is a debugging class that can be used by callers to track
+ * whether a specifc lock is being held for too long and periodically
+ * log a warning and stack trace, if so.
+ *
+ * The logged warnings are throttled so that logs are not spammed.
+ *
+ * A new instance of InstrumentedLock can be created for each object
+ * that needs to be instrumented.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class InstrumentedLock implements Lock {
+
+  private final Lock lock;
+  private final Log logger;
+  private final String name;
+  private final Timer clock;
+
+  /** Minimum gap between two lock warnings. */
+  private final long minLoggingGap;
+  /** Threshold for detecting long lock held time. */
+  private final long lockWarningThreshold;
+
+  // Tracking counters for lock statistics.
+  private volatile long lockAcquireTimestamp;
+  private final AtomicLong lastLogTimestamp;
+  private final AtomicLong warningsSuppressed = new AtomicLong(0);
+
+  /**
+   * Create a instrumented lock instance which logs a warning message
+   * when lock held time is above given threshold.
+   *
+   * @param name the identifier of the lock object
+   * @param logger this class does not have its own logger, will log to the
+   *               given logger instead
+   * @param minLoggingGapMs  the minimum time gap between two log messages,
+   *                         this is to avoid spamming to many logs
+   * @param lockWarningThresholdMs the time threshold to view lock held
+   *                               time as being "too long"
+   */
+  public InstrumentedLock(String name, Log logger, long minLoggingGapMs,
+      long lockWarningThresholdMs) {
+    this(name, logger, new ReentrantLock(),
+        minLoggingGapMs, lockWarningThresholdMs);
+  }
+
+  public InstrumentedLock(String name, Log logger, Lock lock,
+      long minLoggingGapMs, long lockWarningThresholdMs) {
+    this(name, logger, lock,
+        minLoggingGapMs, lockWarningThresholdMs, new Timer());
+  }
+
+  @VisibleForTesting
+  InstrumentedLock(String name, Log logger, Lock lock,
+      long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) {
+    this.name = name;
+    this.lock = lock;
+    this.clock = clock;
+    this.logger = logger;
+    minLoggingGap = minLoggingGapMs;
+    lockWarningThreshold = lockWarningThresholdMs;
+    lastLogTimestamp = new AtomicLong(
+      clock.monotonicNow() - Math.max(minLoggingGap, lockWarningThreshold));
+  }
+
+  @Override
+  public void lock() {
+    lock.lock();
+    lockAcquireTimestamp = clock.monotonicNow();
+  }
+
+  @Override
+  public void lockInterruptibly() throws InterruptedException {
+    lock.lockInterruptibly();
+    lockAcquireTimestamp = clock.monotonicNow();
+  }
+
+  @Override
+  public boolean tryLock() {
+    if (lock.tryLock()) {
+      lockAcquireTimestamp = clock.monotonicNow();
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
+    if (lock.tryLock(time, unit)) {
+      lockAcquireTimestamp = clock.monotonicNow();
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public void unlock() {
+    long localLockReleaseTime = clock.monotonicNow();
+    long localLockAcquireTime = lockAcquireTimestamp;
+    lock.unlock();
+    check(localLockAcquireTime, localLockReleaseTime);
+  }
+
+  @Override
+  public Condition newCondition() {
+    return lock.newCondition();
+  }
+
+  @VisibleForTesting
+  void logWarning(long lockHeldTime, long suppressed) {
+    logger.warn(String.format("Lock held time above threshold: " +
+        "lock identifier: %s " +
+        "lockHeldTimeMs=%d ms. Suppressed %d lock warnings. " +
+        "The stack trace is: %s" ,
+        name, lockHeldTime, suppressed,
+        StringUtils.getStackTrace(Thread.currentThread())));
+  }
+
+  /**
+   * Log a warning if the lock was held for too long.
+   *
+   * Should be invoked by the caller immediately AFTER releasing the lock.
+   *
+   * @param acquireTime  - timestamp just after acquiring the lock.
+   * @param releaseTime - timestamp just before releasing the lock.
+   */
+  private void check(long acquireTime, long releaseTime) {
+    if (!logger.isWarnEnabled()) {
+      return;
+    }
+
+    final long lockHeldTime = releaseTime - acquireTime;
+    if (lockWarningThreshold - lockHeldTime < 0) {
+      long now;
+      long localLastLogTs;
+      do {
+        now = clock.monotonicNow();
+        localLastLogTs = lastLogTimestamp.get();
+        long deltaSinceLastLog = now - localLastLogTs;
+        // check should print log or not
+        if (deltaSinceLastLog - minLoggingGap < 0) {
+          warningsSuppressed.incrementAndGet();
+          return;
+        }
+      } while (!lastLogTimestamp.compareAndSet(localLastLogTs, now));
+      long suppressed = warningsSuppressed.getAndSet(0);
+      logWarning(lockHeldTime, suppressed);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f620c4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 596ab75..1f0d975 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -42,6 +42,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.*;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
@@ -62,7 +63,7 @@ import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.InstrumentedLock;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -280,7 +281,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     this.dataStorage = storage;
     this.conf = conf;
     this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
-    this.datasetLock = new AutoCloseableLock();
+    this.datasetLock = new AutoCloseableLock(
+        new InstrumentedLock(getClass().getName(), LOG,
+          conf.getTimeDuration(
+            DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
+            DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
+            TimeUnit.MILLISECONDS),
+          300));
     // The number of volumes required for operation is the total number
     // of volumes minus the number of failed volumes we can tolerate.
     volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f620c4/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 19ae973..6984f6f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3046,4 +3046,11 @@
       Keytab based login can be enabled with dfs.balancer.keytab.enabled.
     </description>
   </property>
+
+<property>
+  <name>dfs.lock.suppress.warning.interval</name>
+  <value>10s</value>
+  <description>Instrumentation reporting long critical sections will suppress
+    consecutive warnings within this interval.</description>
+</property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f620c4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java
new file mode 100644
index 0000000..1d1a42b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.hadoop.util.AutoCloseableLock;
+import org.apache.hadoop.util.Timer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+/**
+ * A test class for InstrumentedLock.
+ */
+public class TestInstrumentedLock {
+
+  static final Log LOG = LogFactory.getLog(TestInstrumentedLock.class);
+
+  @Rule public TestName name = new TestName();
+
+  /**
+   * Test exclusive access of the lock.
+   * @throws Exception
+   */
+  @Test(timeout=10000)
+  public void testMultipleThread() throws Exception {
+    String testname = name.getMethodName();
+    final InstrumentedLock lock = new InstrumentedLock(testname, LOG, 0, 300);
+    lock.lock();
+    try {
+      Thread competingThread = new Thread() {
+        @Override
+        public void run() {
+          assertFalse(lock.tryLock());
+        }
+      };
+      competingThread.start();
+      competingThread.join();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Test the correctness with try-with-resource syntax.
+   * @throws Exception
+   */
+  @Test(timeout=10000)
+  public void testTryWithResourceSyntax() throws Exception {
+    String testname = name.getMethodName();
+    final AtomicReference<Thread> lockThread = new AtomicReference<>(null);
+    final Lock lock = new InstrumentedLock(testname, LOG, 0, 300) {
+      @Override
+      public void lock() {
+        super.lock();
+        lockThread.set(Thread.currentThread());
+      }
+      @Override
+      public void unlock() {
+        super.unlock();
+        lockThread.set(null);
+      }
+    };
+    AutoCloseableLock acl = new AutoCloseableLock(lock);
+    try (AutoCloseable localLock = acl.acquire()) {
+      assertEquals(acl, localLock);
+      Thread competingThread = new Thread() {
+        @Override
+        public void run() {
+          assertNotEquals(Thread.currentThread(), lockThread.get());
+          assertFalse(lock.tryLock());
+        }
+      };
+      competingThread.start();
+      competingThread.join();
+      assertEquals(Thread.currentThread(), lockThread.get());
+    }
+    assertNull(lockThread.get());
+  }
+
+  /**
+   * Test the lock logs warning when lock held time is greater than threshold
+   * and not log warning otherwise.
+   * @throws Exception
+   */
+  @Test(timeout=10000)
+  public void testLockLongHoldingReport() throws Exception {
+    String testname = name.getMethodName();
+    final AtomicLong time = new AtomicLong(0);
+    Timer mclock = new Timer() {
+      @Override
+      public long monotonicNow() {
+        return time.get();
+      }
+    };
+    Lock mlock = mock(Lock.class);
+
+    final AtomicLong wlogged = new AtomicLong(0);
+    final AtomicLong wsuppresed = new AtomicLong(0);
+    InstrumentedLock lock = new InstrumentedLock(
+        testname, LOG, mlock, 2000, 300, mclock) {
+      @Override
+      void logWarning(long lockHeldTime, long suppressed) {
+        wlogged.incrementAndGet();
+        wsuppresed.set(suppressed);
+      }
+    };
+
+    // do not log warning when the lock held time is short
+    lock.lock();   // t = 0
+    time.set(200);
+    lock.unlock(); // t = 200
+    assertEquals(0, wlogged.get());
+    assertEquals(0, wsuppresed.get());
+
+    lock.lock();   // t = 200
+    time.set(700);
+    lock.unlock(); // t = 700
+    assertEquals(1, wlogged.get());
+    assertEquals(0, wsuppresed.get());
+
+    // despite the lock held time is greater than threshold
+    // suppress the log warning due to the logging gap
+    // (not recorded in wsuppressed until next log message)
+    lock.lock();   // t = 700
+    time.set(1100);
+    lock.unlock(); // t = 1100
+    assertEquals(1, wlogged.get());
+    assertEquals(0, wsuppresed.get());
+
+    // log a warning message when the lock held time is greater the threshold
+    // and the logging time gap is satisfied. Also should display suppressed
+    // previous warnings.
+    time.set(2400);
+    lock.lock();   // t = 2400
+    time.set(2800);
+    lock.unlock(); // t = 2800
+    assertEquals(2, wlogged.get());
+    assertEquals(1, wsuppresed.get());
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message