hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1078092 - in /hadoop/hdfs/branches/HDFS-1052: ./ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/
Date Fri, 04 Mar 2011 18:38:01 GMT
Author: suresh
Date: Fri Mar  4 18:38:01 2011
New Revision: 1078092

URL: http://svn.apache.org/viewvc?rev=1078092&view=rev
Log:
HDFS-1720. Federation: FSVolumeSet volumes is not synchronized correctly. (suresh)


Modified:
    hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java

Modified: hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/CHANGES.txt?rev=1078092&r1=1078091&r2=1078092&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-1052/CHANGES.txt Fri Mar  4 18:38:01 2011
@@ -203,6 +203,9 @@ Trunk (unreleased changes)
     HDFS-1719. Federation: Fix TestDFSRemove that fails intermittently.
     (suresh)
 
+    HDFS-1720. Federation: FSVolumeSet volumes is not synchronized correctly.
+    (suresh)
+
   IMPROVEMENTS
 
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1078092&r1=1078091&r2=1078092&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
Fri Mar  4 18:38:01 2011
@@ -221,8 +221,8 @@ class BlockPoolSliceScanner {
      * otherwise, pick the first directory.
      */
     File dir = null;
-    FSDataset.FSVolume[] volumes = dataset.volumes.volumes;
-    for (FSDataset.FSVolume vol : volumes) {
+    List<FSVolume> volumes = dataset.volumes.getVolumes();
+    for (FSDataset.FSVolume vol : dataset.volumes.getVolumes()) {
       File bpDir = vol.getBlockPoolSlice(blockPoolId).getDirectory();
       if (LogFileHandler.isFilePresent(bpDir, verificationLogFile)) {
         dir = bpDir;
@@ -230,7 +230,7 @@ class BlockPoolSliceScanner {
       }
     }
     if (dir == null) {
-      dir = volumes[0].getBlockPoolSlice(blockPoolId).getDirectory();
+      dir = volumes.get(0).getBlockPoolSlice(blockPoolId).getDirectory();
     }
     
     try {

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=1078092&r1=1078091&r2=1078092&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
Fri Mar  4 18:38:01 2011
@@ -136,7 +136,7 @@ public class DataBlockScanner implements
               .iterator();
           while (bpidIterator.hasNext()) {
             String bpid = bpidIterator.next();
-            for (FSDataset.FSVolume vol : dataset.volumes.volumes) {
+            for (FSDataset.FSVolume vol : dataset.volumes.getVolumes()) {
               try {
                 File currFile = BlockPoolSliceScanner.getCurrentFile(vol, bpid);
                 if (currFile.exists()) {

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java?rev=1078092&r1=1078091&r2=1078092&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
Fri Mar  4 18:38:01 2011
@@ -411,18 +411,18 @@ public class DirectoryScanner implements
   /** Get lists of blocks on the disk sorted by blockId, per blockpool */
   private Map<String, ScanInfo[]> getDiskReport() {
     // First get list of data directories
-    FSDataset.FSVolume[] volumes = dataset.volumes.volumes;
+    List<FSVolume> volumes = dataset.volumes.getVolumes();
     ArrayList<ScanInfoPerBlockPool> dirReports =
-      new ArrayList<ScanInfoPerBlockPool>(volumes.length);
+      new ArrayList<ScanInfoPerBlockPool>(volumes.size());
     
     Map<Integer, Future<ScanInfoPerBlockPool>> compilersInProgress =
       new HashMap<Integer, Future<ScanInfoPerBlockPool>>();
-    for (int i = 0; i < volumes.length; i++) {
-      if (!dataset.volumes.isValid(volumes[i])) { // volume is still valid
+    for (int i = 0; i < volumes.size(); i++) {
+      if (!dataset.volumes.isValid(volumes.get(i))) { // volume is still valid
         dirReports.add(i, null);
       } else {
         ReportCompiler reportCompiler =
-          new ReportCompiler(volumes[i]);
+          new ReportCompiler(volumes.get(i));
         Future<ScanInfoPerBlockPool> result = 
           reportCompileThreadPool.submit(reportCompiler);
         compilersInProgress.put(i, result);
@@ -442,8 +442,8 @@ public class DirectoryScanner implements
 
     // Compile consolidated report for all the volumes
     ScanInfoPerBlockPool list = new ScanInfoPerBlockPool();
-    for (int i = 0; i < volumes.length; i++) {
-      if (dataset.volumes.isValid(volumes[i])) { // volume is still valid
+    for (int i = 0; i < volumes.size(); i++) {
+      if (dataset.volumes.isValid(volumes.get(i))) { // volume is still valid
         list.addAll(dirReports.get(i));
       }
     }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1078092&r1=1078091&r2=1078092&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
Fri Mar  4 18:38:01 2011
@@ -31,6 +31,7 @@ import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -721,96 +722,116 @@ public class FSDataset implements FSCons
   }
     
   static class FSVolumeSet {
-    FSVolume[] volumes = null;
-    int curVolume = 0;
+    /*
+     * Read access to this unmodifiable list is not synchronized.
+     * This list is replaced on modification holding "this" lock.
+     */
+    private volatile List<FSVolume> volumes = null;
+    private int curVolume = 0; // Synchronized using "this"
       
     FSVolumeSet(FSVolume[] volumes) {
-      this.volumes = volumes;
+      List<FSVolume> list = Arrays.asList(volumes);
+      this.volumes = Collections.unmodifiableList(list);
     }
     
     private int numberOfVolumes() {
-      return volumes.length;
+      return volumes.size();
     }
       
-    synchronized FSVolume getNextVolume(long blockSize) throws IOException {
-      
-      if(volumes.length < 1) {
-        throw new DiskOutOfSpaceException("No more available volumes");
-      }
-      
-      // since volumes could've been removed because of the failure
-      // make sure we are not out of bounds
-      if(curVolume >= volumes.length) {
-        curVolume = 0;
-      }
-      
-      int startVolume = curVolume;
-      
-      while (true) {
-        FSVolume volume = volumes[curVolume];
-        curVolume = (curVolume + 1) % volumes.length;
-        if (volume.getAvailable() > blockSize) { return volume; }
-        if (curVolume == startVolume) {
-          throw new DiskOutOfSpaceException("Insufficient space for an additional block");
+    /** 
+     * Get next volume. Synchronized to ensure {@link #curVolume} is updated
+     * by a single thread and next volume is chosen with no concurrent
+     * update to {@link #volumes}.
+     * @param blockSize free space needed on the volume
+     * @return next volume to store the block in.
+     */
+    FSVolume getNextVolume(long blockSize) throws IOException {
+      synchronized(this) {
+        if(volumes.size() < 1) {
+          throw new DiskOutOfSpaceException("No more available volumes");
+        }
+        // since volumes could've been removed because of the failure
+        // make sure we are not out of bounds
+        if(curVolume >= volumes.size()) {
+          curVolume = 0;
+        }
+        
+        int startVolume = curVolume;
+        
+        while (true) {
+          FSVolume volume = volumes.get(curVolume);
+          curVolume = (curVolume + 1) % volumes.size();
+          if (volume.getAvailable() > blockSize) { return volume; }
+          if (curVolume == startVolume) {
+            throw new DiskOutOfSpaceException(
+                "Insufficient space for an additional block");
+          }
         }
       }
     }
       
-    long getDfsUsed() throws IOException {
+    private long getDfsUsed() throws IOException {
       long dfsUsed = 0L;
-      for (int idx = 0; idx < volumes.length; idx++) {
-        dfsUsed += volumes[idx].getDfsUsed();
+      for (FSVolume vol : volumes) {
+        dfsUsed += vol.getDfsUsed();
       }
       return dfsUsed;
     }
 
-    long getBlockPoolUsed(String bpid) throws IOException {
+    private long getBlockPoolUsed(String bpid) throws IOException {
       long dfsUsed = 0L;
-      for (int idx = 0; idx < volumes.length; idx++) {
-        dfsUsed += volumes[idx].getBlockPoolUsed(bpid);
+      for (FSVolume vol : volumes) {
+        dfsUsed += vol.getBlockPoolUsed(bpid);
       }
       return dfsUsed;
     }
 
-    long getCapacity() throws IOException {
+    private long getCapacity() throws IOException {
       long capacity = 0L;
-      for (int idx = 0; idx < volumes.length; idx++) {
-        capacity += volumes[idx].getCapacity();
+      for (FSVolume vol : volumes) {
+        capacity += vol.getCapacity();
       }
       return capacity;
     }
       
-    long getRemaining() throws IOException {
+    private long getRemaining() throws IOException {
       long remaining = 0L;
-      for (int idx = 0; idx < volumes.length; idx++) {
-        remaining += volumes[idx].getAvailable();
+      for (FSVolume vol : volumes) {
+        remaining += vol.getAvailable();
       }
       return remaining;
     }
       
-    synchronized void getVolumeMap(ReplicasMap volumeMap) throws IOException {
-      for (int idx = 0; idx < volumes.length; idx++) {
-        volumes[idx].getVolumeMap(volumeMap);
+    private void getVolumeMap(ReplicasMap volumeMap)
+        throws IOException {
+      for (FSVolume vol : volumes) {
+        vol.getVolumeMap(volumeMap);
       }
     }
     
-    synchronized void getVolumeMap(String bpid, ReplicasMap volumeMap)
+    private void getVolumeMap(String bpid, ReplicasMap volumeMap)
         throws IOException {
-      for (int idx = 0; idx < volumes.length; idx++) {
-        volumes[idx].getVolumeMap(bpid, volumeMap);
+      for (FSVolume vol : volumes) {
+        vol.getVolumeMap(bpid, volumeMap);
       }
     }
       
     /**
      * Calls {@link FSVolume#checkDirs()} on each volume, removing any
      * volumes from the active list that result in a DiskErrorException.
+     * 
+     * This method is synchronized to allow only one instance of checkDirs() 
+     * call
      * @return list of all the removed volumes.
      */
-    synchronized List<FSVolume> checkDirs() {
-      ArrayList<FSVolume> removedVols = null;  
+    private synchronized List<FSVolume> checkDirs() {
+      ArrayList<FSVolume> removedVols = null;
       
-      for (int idx = 0; idx < volumes.length; idx++) {
-        FSVolume fsv = volumes[idx];
+      // Make a copy of volumes for performing modification 
+      List<FSVolume> volumeList = new ArrayList<FSVolume>(getVolumes());
+      
+      for (int idx = 0; idx < volumeList.size(); idx++) {
+        FSVolume fsv = volumeList.get(idx);
         try {
           fsv.checkDirs();
         } catch (DiskErrorException e) {
@@ -818,21 +839,20 @@ public class FSDataset implements FSCons
           if (removedVols == null) {
             removedVols = new ArrayList<FSVolume>(1);
           }
-          removedVols.add(volumes[idx]);
-          volumes[idx] = null; // Remove the volume
+          removedVols.add(volumeList.get(idx));
+          volumeList.set(idx, null); // Remove the volume
         }
       }
       
       // Remove null volumes from the volumes array
       if (removedVols != null && removedVols.size() > 0) {
-        FSVolume newVols[] = new FSVolume[volumes.length - removedVols.size()];
-        int i = 0;
-        for (FSVolume vol : volumes) {
+        List<FSVolume> newVols = new ArrayList<FSVolume>();
+        for (FSVolume vol : volumeList) {
           if (vol != null) {
-            newVols[i++] = vol;
+            newVols.add(vol);
           }
         }
-        volumes = newVols; // Replace array of volumes
+        volumes = Collections.unmodifiableList(newVols); // Replace volume list
         DataNode.LOG.info("Completed FSVolumeSet.checkDirs. Removed "
             + removedVols.size() + " volumes. List of current volumes: "
             + this);
@@ -842,29 +862,39 @@ public class FSDataset implements FSCons
     }
       
     public String toString() {
-      StringBuilder sb = new StringBuilder();
-      for (int idx = 0; idx < volumes.length; idx++) {
-        sb.append(volumes[idx].toString());
-        if (idx != volumes.length - 1) { sb.append(","); }
-      }
-      return sb.toString();
+      return volumes.toString();
     }
 
-    public boolean isValid(FSVolume volume) {
-      for (int idx = 0; idx < volumes.length; idx++) {
-        if (volumes[idx] == volume) {
+    boolean isValid(FSVolume volume) {
+      for (FSVolume vol : volumes) {
+        if (vol == volume) {
           return true;
         }
       }
       return false;
     }
 
-    public void addBlockPool(String bpid, Configuration conf)
+    private void addBlockPool(String bpid, Configuration conf)
         throws IOException {
       for (FSVolume v : volumes) {
         v.addBlockPool(bpid, conf);
       }
     }
+    
+    /**
+     * @return unmodifiable list of volumes
+     */
+    public List<FSVolume> getVolumes() {
+      return volumes;
+    }
+
+    private void shutdown() {
+      for (FSVolume volume : volumes) {
+        if(volume != null) {
+          volume.shutdown();
+        }
+      }
+    }
   }
   
   //////////////////////////////////////////////////////
@@ -2018,11 +2048,7 @@ public class FSDataset implements FSCons
     }
     
     if(volumes != null) {
-      for (FSVolume volume : volumes.volumes) {
-        if(volume != null) {
-          volume.shutdown();
-        }
-      }
+      volumes.shutdown();
     }
   }
 
@@ -2393,28 +2419,23 @@ public class FSDataset implements FSCons
     }
   }  
   
-  synchronized Collection<VolumeInfo> getVolumeInfo() {
+  Collection<VolumeInfo> getVolumeInfo() {
     Collection<VolumeInfo> info = new ArrayList<VolumeInfo>();
-    synchronized(volumes.volumes) {
-      for (FSVolume volume : volumes.volumes) {
-        long used = 0;
-        try {
-          used = volume.getDfsUsed();
-        } catch (IOException e) {
-          DataNode.LOG.warn(e.getMessage());
-        }
-        
-        long free= 0;
-        try {
-          free = volume.getAvailable();
-        } catch (IOException e) {
-          DataNode.LOG.warn(e.getMessage());
-        }
-        
-        info.add(new VolumeInfo(volume.toString(), used, free, 
-            volume.getReserved()));
+    for (FSVolume volume : volumes.volumes) {
+      long used = 0;
+      long free = 0;
+      try {
+        used = volume.getDfsUsed();
+        free = volume.getAvailable();
+      } catch (IOException e) {
+        DataNode.LOG.warn(e.getMessage());
+        used = 0;
+        free = 0;
       }
-      return info;
+      
+      info.add(new VolumeInfo(volume.toString(), used, free, 
+          volume.getReserved()));
     }
+    return info;
   }
 }

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java?rev=1078092&r1=1078091&r2=1078092&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java
Fri Mar  4 18:38:01 2011
@@ -98,7 +98,7 @@ public class TestDatanodeRestart {
       out.write(writeBuf);
       out.hflush();
       DataNode dn = cluster.getDataNodes().get(0);
-      for (FSVolume volume : ((FSDataset)dn.data).volumes.volumes) {
+      for (FSVolume volume : ((FSDataset)dn.data).volumes.getVolumes()) {
         File currentDir = volume.getDir().getParentFile();
         File rbwDir = new File(currentDir, "rbw");
         for (File file : rbwDir.listFiles()) {

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java?rev=1078092&r1=1078091&r2=1078092&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
Fri Mar  4 18:38:01 2011
@@ -22,6 +22,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
@@ -141,10 +142,10 @@ public class TestDirectoryScanner extend
 
   /** Create a block file in a random volume*/
   private long createBlockFile() throws IOException {
-    FSVolume[] volumes = fds.volumes.volumes;
-    int index = rand.nextInt(volumes.length - 1);
+    List<FSVolume> volumes = fds.volumes.getVolumes();
+    int index = rand.nextInt(volumes.size() - 1);
     long id = getFreeBlockId();
-    File finalizedDir = volumes[index].getBlockPoolSlice(bpid).getFinalizedDir();
+    File finalizedDir = volumes.get(index).getBlockPoolSlice(bpid).getFinalizedDir();
     File file = new File(finalizedDir, getBlockFile(id));
     if (file.createNewFile()) {
       LOG.info("Created block file " + file.getName());
@@ -154,10 +155,10 @@ public class TestDirectoryScanner extend
 
   /** Create a metafile in a random volume*/
   private long createMetaFile() throws IOException {
-    FSVolume[] volumes = fds.volumes.volumes;
-    int index = rand.nextInt(volumes.length - 1);
+    List<FSVolume> volumes = fds.volumes.getVolumes();
+    int index = rand.nextInt(volumes.size() - 1);
     long id = getFreeBlockId();
-    File finalizedDir = volumes[index].getBlockPoolSlice(bpid).getFinalizedDir();
+    File finalizedDir = volumes.get(index).getBlockPoolSlice(bpid).getFinalizedDir();
     File file = new File(finalizedDir, getMetaFile(id));
     if (file.createNewFile()) {
       LOG.info("Created metafile " + file.getName());
@@ -167,10 +168,10 @@ public class TestDirectoryScanner extend
 
   /** Create block file and corresponding metafile in a rondom volume */
   private long createBlockMetaFile() throws IOException {
-    FSVolume[] volumes = fds.volumes.volumes;
-    int index = rand.nextInt(volumes.length - 1);
+    List<FSVolume> volumes = fds.volumes.getVolumes();
+    int index = rand.nextInt(volumes.size() - 1);
     long id = getFreeBlockId();
-    File finalizedDir = volumes[index].getBlockPoolSlice(bpid).getFinalizedDir();
+    File finalizedDir = volumes.get(index).getBlockPoolSlice(bpid).getFinalizedDir();
     File file = new File(finalizedDir, getBlockFile(id));
     if (file.createNewFile()) {
       LOG.info("Created block file " + file.getName());



Mime
View raw message