hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jia...@apache.org
Subject [49/50] [abbrv] hadoop git commit: HADOOP-10048. LocalDirAllocator should avoid holding locks while accessing the filesystem. Contributed by Jason Lowe.
Date Tue, 07 Jun 2016 20:26:41 GMT
HADOOP-10048. LocalDirAllocator should avoid holding locks while accessing the filesystem.
Contributed by Jason Lowe.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c14c1b29
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c14c1b29
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c14c1b29

Branch: refs/heads/YARN-4757
Commit: c14c1b298e29e799f7c8f15ff24d7eba6e0cd39b
Parents: e620530
Author: Junping Du <junping_du@apache.org>
Authored: Tue Jun 7 09:18:58 2016 -0700
Committer: Junping Du <junping_du@apache.org>
Committed: Tue Jun 7 09:18:58 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/fs/LocalDirAllocator.java | 153 ++++++++++++-------
 1 file changed, 94 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c14c1b29/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java
index 70cf87d..b14e1f0 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java
@@ -20,9 +20,10 @@ package org.apache.hadoop.fs;
 
 import java.io.*;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.*;
-
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -247,74 +248,101 @@ public class LocalDirAllocator {
     private final Log LOG =
       LogFactory.getLog(AllocatorPerContext.class);
 
-    private int dirNumLastAccessed;
     private Random dirIndexRandomizer = new Random();
-    private FileSystem localFS;
-    private DF[] dirDF = new DF[0];
     private String contextCfgItemName;
-    private String[] localDirs = new String[0];
-    private String savedLocalDirs = "";
+
+    // NOTE: the context must be accessed via a local reference as it
+    //       may be updated at any time to reference a different context
+    private AtomicReference<Context> currentContext;
+
+    private static class Context {
+      private AtomicInteger dirNumLastAccessed = new AtomicInteger(0);
+      private FileSystem localFS;
+      private DF[] dirDF;
+      private Path[] localDirs;
+      private String savedLocalDirs;
+
+      public int getAndIncrDirNumLastAccessed() {
+        return getAndIncrDirNumLastAccessed(1);
+      }
+
+      public int getAndIncrDirNumLastAccessed(int delta) {
+        if (localDirs.length < 2 || delta == 0) {
+          return dirNumLastAccessed.get();
+        }
+        int oldval, newval;
+        do {
+          oldval = dirNumLastAccessed.get();
+          newval = (oldval + delta) % localDirs.length;
+        } while (!dirNumLastAccessed.compareAndSet(oldval, newval));
+        return oldval;
+      }
+    }
 
     public AllocatorPerContext(String contextCfgItemName) {
       this.contextCfgItemName = contextCfgItemName;
+      this.currentContext = new AtomicReference<Context>(new Context());
     }
 
     /** This method gets called everytime before any read/write to make sure
      * that any change to localDirs is reflected immediately.
      */
-    private synchronized void confChanged(Configuration conf) 
+    private Context confChanged(Configuration conf)
         throws IOException {
+      Context ctx = currentContext.get();
       String newLocalDirs = conf.get(contextCfgItemName);
       if (null == newLocalDirs) {
         throw new IOException(contextCfgItemName + " not configured");
       }
-      if (!newLocalDirs.equals(savedLocalDirs)) {
-        localDirs = StringUtils.getTrimmedStrings(newLocalDirs);
-        localFS = FileSystem.getLocal(conf);
-        int numDirs = localDirs.length;
-        ArrayList<String> dirs = new ArrayList<String>(numDirs);
+      if (!newLocalDirs.equals(ctx.savedLocalDirs)) {
+        ctx = new Context();
+        String[] dirStrings = StringUtils.getTrimmedStrings(newLocalDirs);
+        ctx.localFS = FileSystem.getLocal(conf);
+        int numDirs = dirStrings.length;
+        ArrayList<Path> dirs = new ArrayList<Path>(numDirs);
         ArrayList<DF> dfList = new ArrayList<DF>(numDirs);
         for (int i = 0; i < numDirs; i++) {
           try {
             // filter problematic directories
-            Path tmpDir = new Path(localDirs[i]);
-            if(localFS.mkdirs(tmpDir)|| localFS.exists(tmpDir)) {
+            Path tmpDir = new Path(dirStrings[i]);
+            if(ctx.localFS.mkdirs(tmpDir)|| ctx.localFS.exists(tmpDir)) {
               try {
-
                 File tmpFile = tmpDir.isAbsolute()
-                  ? new File(localFS.makeQualified(tmpDir).toUri())
-                  : new File(localDirs[i]);
+                    ? new File(ctx.localFS.makeQualified(tmpDir).toUri())
+                    : new File(dirStrings[i]);
 
                 DiskChecker.checkDir(tmpFile);
-                dirs.add(tmpFile.getPath());
+                dirs.add(new Path(tmpFile.getPath()));
                 dfList.add(new DF(tmpFile, 30000));
-
               } catch (DiskErrorException de) {
-                LOG.warn( localDirs[i] + " is not writable\n", de);
+                LOG.warn(dirStrings[i] + " is not writable\n", de);
               }
             } else {
-              LOG.warn( "Failed to create " + localDirs[i]);
+              LOG.warn("Failed to create " + dirStrings[i]);
             }
           } catch (IOException ie) { 
-            LOG.warn( "Failed to create " + localDirs[i] + ": " +
+            LOG.warn("Failed to create " + dirStrings[i] + ": " +
                 ie.getMessage() + "\n", ie);
           } //ignore
         }
-        localDirs = dirs.toArray(new String[dirs.size()]);
-        dirDF = dfList.toArray(new DF[dirs.size()]);
-        savedLocalDirs = newLocalDirs;
-        
+        ctx.localDirs = dirs.toArray(new Path[dirs.size()]);
+        ctx.dirDF = dfList.toArray(new DF[dirs.size()]);
+        ctx.savedLocalDirs = newLocalDirs;
+
         if (dirs.size() > 0) {
           // randomize the first disk picked in the round-robin selection
-          dirNumLastAccessed = dirIndexRandomizer.nextInt(dirs.size());
+          ctx.dirNumLastAccessed.set(dirIndexRandomizer.nextInt(dirs.size()));
         }
+
+        currentContext.set(ctx);
       }
+
+      return ctx;
     }
 
-    private Path createPath(String path, 
+    private Path createPath(Path dir, String path,
         boolean checkWrite) throws IOException {
-      Path file = new Path(new Path(localDirs[dirNumLastAccessed]),
-                                    path);
+      Path file = new Path(dir, path);
       if (checkWrite) {
         //check whether we are able to create a directory here. If the disk
         //happens to be RDONLY we will fail
@@ -334,7 +362,7 @@ public class LocalDirAllocator {
      * @return the current directory index.
      */
     int getCurrentDirectoryIndex() {
-      return dirNumLastAccessed;
+      return currentContext.get().dirNumLastAccessed.get();
     }
 
     /** Get a path from the local FS. If size is known, we go
@@ -344,10 +372,10 @@ public class LocalDirAllocator {
      *  If size is not known, use roulette selection -- pick directories
      *  with probability proportional to their available space.
      */
-    public synchronized Path getLocalPathForWrite(String pathStr, long size, 
+    public Path getLocalPathForWrite(String pathStr, long size,
         Configuration conf, boolean checkWrite) throws IOException {
-      confChanged(conf);
-      int numDirs = localDirs.length;
+      Context ctx = confChanged(conf);
+      int numDirs = ctx.localDirs.length;
       int numDirsSearched = 0;
       //remove the leading slash from the path (to make sure that the uri
       //resolution results in a valid path on the dir being checked)
@@ -358,12 +386,12 @@ public class LocalDirAllocator {
       
       if(size == SIZE_UNKNOWN) {  //do roulette selection: pick dir with probability 
                     //proportional to available size
-        long[] availableOnDisk = new long[dirDF.length];
+        long[] availableOnDisk = new long[ctx.dirDF.length];
         long totalAvailable = 0;
         
             //build the "roulette wheel"
-        for(int i =0; i < dirDF.length; ++i) {
-          availableOnDisk[i] = dirDF[i].getAvailable();
+        for(int i =0; i < ctx.dirDF.length; ++i) {
+          availableOnDisk[i] = ctx.dirDF[i].getAvailable();
           totalAvailable += availableOnDisk[i];
         }
 
@@ -380,8 +408,8 @@ public class LocalDirAllocator {
             randomPosition -= availableOnDisk[dir];
             dir++;
           }
-          dirNumLastAccessed = dir;
-          returnPath = createPath(pathStr, checkWrite);
+          ctx.dirNumLastAccessed.set(dir);
+          returnPath = createPath(ctx.localDirs[dir], pathStr, checkWrite);
           if (returnPath == null) {
             totalAvailable -= availableOnDisk[dir];
             availableOnDisk[dir] = 0; // skip this disk
@@ -389,15 +417,21 @@ public class LocalDirAllocator {
           }
         }
       } else {
-        while (numDirsSearched < numDirs && returnPath == null) {
-          long capacity = dirDF[dirNumLastAccessed].getAvailable();
+        int dirNum = ctx.getAndIncrDirNumLastAccessed();
+        while (numDirsSearched < numDirs) {
+          long capacity = ctx.dirDF[dirNum].getAvailable();
           if (capacity > size) {
-            returnPath = createPath(pathStr, checkWrite);
+            returnPath =
+                createPath(ctx.localDirs[dirNum], pathStr, checkWrite);
+            if (returnPath != null) {
+              ctx.getAndIncrDirNumLastAccessed(numDirsSearched);
+              break;
+            }
           }
-          dirNumLastAccessed++;
-          dirNumLastAccessed = dirNumLastAccessed % numDirs; 
+          dirNum++;
+          dirNum = dirNum % numDirs;
           numDirsSearched++;
-        } 
+        }
       }
       if (returnPath != null) {
         return returnPath;
@@ -432,10 +466,10 @@ public class LocalDirAllocator {
      *  configured dirs for the file's existence and return the complete
      *  path to the file when we find one 
      */
-    public synchronized Path getLocalPathToRead(String pathStr, 
+    public Path getLocalPathToRead(String pathStr,
         Configuration conf) throws IOException {
-      confChanged(conf);
-      int numDirs = localDirs.length;
+      Context ctx = confChanged(conf);
+      int numDirs = ctx.localDirs.length;
       int numDirsSearched = 0;
       //remove the leading slash from the path (to make sure that the uri
       //resolution results in a valid path on the dir being checked)
@@ -443,8 +477,8 @@ public class LocalDirAllocator {
         pathStr = pathStr.substring(1);
       }
       while (numDirsSearched < numDirs) {
-        Path file = new Path(localDirs[numDirsSearched], pathStr);
-        if (localFS.exists(file)) {
+        Path file = new Path(ctx.localDirs[numDirsSearched], pathStr);
+        if (ctx.localFS.exists(file)) {
           return file;
         }
         numDirsSearched++;
@@ -459,10 +493,10 @@ public class LocalDirAllocator {
       private final FileSystem fs;
       private final String pathStr;
       private int i = 0;
-      private final String[] rootDirs;
+      private final Path[] rootDirs;
       private Path next = null;
 
-      private PathIterator(FileSystem fs, String pathStr, String[] rootDirs)
+      private PathIterator(FileSystem fs, String pathStr, Path[] rootDirs)
           throws IOException {
         this.fs = fs;
         this.pathStr = pathStr;
@@ -517,21 +551,22 @@ public class LocalDirAllocator {
      * @return all of the paths that exist under any of the roots
      * @throws IOException
      */
-    synchronized Iterable<Path> getAllLocalPathsToRead(String pathStr,
+    Iterable<Path> getAllLocalPathsToRead(String pathStr,
         Configuration conf) throws IOException {
-      confChanged(conf);
+      Context ctx = confChanged(conf);
       if (pathStr.startsWith("/")) {
         pathStr = pathStr.substring(1);
       }
-      return new PathIterator(localFS, pathStr, localDirs);
+      return new PathIterator(ctx.localFS, pathStr, ctx.localDirs);
     }
 
     /** We search through all the configured dirs for the file's existence
      *  and return true when we find one 
      */
-    public synchronized boolean ifExists(String pathStr,Configuration conf) {
+    public boolean ifExists(String pathStr, Configuration conf) {
+      Context ctx = currentContext.get();
       try {
-        int numDirs = localDirs.length;
+        int numDirs = ctx.localDirs.length;
         int numDirsSearched = 0;
         //remove the leading slash from the path (to make sure that the uri
         //resolution results in a valid path on the dir being checked)
@@ -539,8 +574,8 @@ public class LocalDirAllocator {
           pathStr = pathStr.substring(1);
         }
         while (numDirsSearched < numDirs) {
-          Path file = new Path(localDirs[numDirsSearched], pathStr);
-          if (localFS.exists(file)) {
+          Path file = new Path(ctx.localDirs[numDirsSearched], pathStr);
+          if (ctx.localFS.exists(file)) {
             return true;
           }
           numDirsSearched++;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message