hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1076896 - in /hadoop/hdfs/branches/HDFS-1052: ./ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/
Date Fri, 04 Mar 2011 00:01:46 GMT
Author: suresh
Date: Fri Mar  4 00:01:45 2011
New Revision: 1076896

URL: http://svn.apache.org/viewvc?rev=1076896&view=rev
Log:
HDFS-1717. Federation: FSDataset volumeMap access is not synchronized correctly. Contributed
by Suresh Srinivas.  

Modified:
    hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java

Modified: hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/CHANGES.txt?rev=1076896&r1=1076895&r2=1076896&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-1052/CHANGES.txt Fri Mar  4 00:01:45 2011
@@ -182,7 +182,7 @@ Trunk (unreleased changes)
     HDFS-1715. Federation: warning/error not generated when datanode sees 
     inconsistent/different Cluster ID between namenodes (boryas)
 
-    HDFS-1716: Federation: Add decommission tests for federated namenodes.
+    HDFS-1716. Federation: Add decommission tests for federated namenodes.
     (suresh)
 
     HDFS-1713. Federation: Prevent DataBlockScanner from running in tight loop.
@@ -191,6 +191,9 @@ Trunk (unreleased changes)
     HDFS-1721. Federation: Configuration for principal names should not be 
     namenode specific. (jitendra)
 
+    HDFS-1717. Federation: FSDataset volumeMap access is not synchronized
+    correctly. (suresh)
+
   IMPROVEMENTS
 
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1076896&r1=1076895&r2=1076896&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
Fri Mar  4 00:01:45 2011
@@ -983,7 +983,7 @@ public class FSDataset implements FSCons
    * @param blockId
    * @return
    */
-  synchronized ReplicaInfo fetchReplicaInfo(String bpid, long blockId) {
+  ReplicaInfo fetchReplicaInfo(String bpid, long blockId) {
     ReplicaInfo r = volumeMap.get(bpid, blockId);
     if(r == null)
       return null;
@@ -1041,17 +1041,17 @@ public class FSDataset implements FSCons
     return f;
   }
     
-  FSVolumeSet volumes;
-  private int maxBlocksPerDir = 0;
-  ReplicasMap volumeMap = new ReplicasMap();
-  static  Random random = new Random();
-  FSDatasetAsyncDiskService asyncDiskService;
-  private int validVolsRequired;
+  final FSVolumeSet volumes;
+  private final int maxBlocksPerDir;
+  final ReplicasMap volumeMap;
+  static final Random random = new Random();
+  final FSDatasetAsyncDiskService asyncDiskService;
+  private final int validVolsRequired;
 
   // Used for synchronizing access to usage stats
-  private Object statsLock = new Object();
+  private final Object statsLock = new Object();
 
-  boolean supportAppends = true;
+  final boolean supportAppends;
 
   /**
    * An FSDataset has a directory where it loads its data files.
@@ -1078,6 +1078,8 @@ public class FSDataset implements FSCons
       DataNode.LOG.info("FSDataset added volume - "
           + storage.getStorageDir(idx).getCurrentDir());
     }
+    volumeMap = new ReplicasMap(this);
+    
     volumes = new FSVolumeSet(volArray);
     volumes.getVolumeMap(volumeMap);
 
@@ -1185,25 +1187,38 @@ public class FSDataset implements FSCons
   }
 
   /**
-   * Get the meta info of a block stored in volumeMap
-   * @param b block
-   * @return the meta replica information
+   * Get the meta info of a block stored in volumeMap. To find a block,
+   * block pool Id, block Id and generation stamp must match.
+   * @param b extended block
+   * @return the meta replica information; null if block was not found
+   * @throws ReplicaNotFoundException if no entry is in the map or 
+   *                        there is a generation stamp mismatch
    */
-  private ReplicaInfo getReplicaInfo(ExtendedBlock b) {
-    return volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
+  private ReplicaInfo getReplicaInfo(ExtendedBlock b)
+      throws ReplicaNotFoundException {
+    ReplicaInfo info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
+    if (info == null) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
+    }
+    return info;
   }
   
   /**
-   * Get the meta info of a block stored in volumeMap
-   * @param b block
-   * @return the meta replica information
-   * @throws IOException if no entry is in the map or 
+   * Get the meta info of a block stored in volumeMap. Block is looked up
+   * without matching the generation stamp.
+   * @param bpid block pool Id
+   * @param blkid block Id
+   * @return the meta replica information; null if block was not found
+   * @throws ReplicaNotFoundException if no entry is in the map or 
    *                        there is a generation stamp mismatch
    */
-  private ReplicaInfo getReplicaInfo(String bpid, Block b) throws IOException {
-    ReplicaInfo info = volumeMap.get(bpid, b);
+  private ReplicaInfo getReplicaInfo(String bpid, long blkid)
+      throws ReplicaNotFoundException {
+    ReplicaInfo info = volumeMap.get(bpid, blkid);
     if (info == null) {
-      throw new IOException("Block " + b + " does not exist in volumeMap.");
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.NON_EXISTENT_REPLICA + bpid + ":" + blkid);
     }
     return info;
   }
@@ -1240,12 +1255,8 @@ public class FSDataset implements FSCons
    *           is not in any snapshot.
    */
   public boolean unlinkBlock(ExtendedBlock block, int numLinks) throws IOException {
-    ReplicaInfo info = null;
-
-    synchronized (this) {
-      info = getReplicaInfo(block);
-    }
-   return info.unlinkBlock(numLinks);
+    ReplicaInfo info = getReplicaInfo(block);
+    return info.unlinkBlock(numLinks);
   }
 
   static private void truncateBlock(File blockFile, File metaFile,
@@ -1328,10 +1339,6 @@ public class FSDataset implements FSCons
           " should be greater than the replica " + b + "'s generation stamp");
     }
     ReplicaInfo replicaInfo = getReplicaInfo(b);
-    if (replicaInfo == null) {
-      throw new ReplicaNotFoundException(
-          ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
-    }  
     DataNode.LOG.info("Appending to replica " + replicaInfo);
     if (replicaInfo.getState() != ReplicaState.FINALIZED) {
       throw new ReplicaNotFoundException(
@@ -1413,10 +1420,6 @@ public class FSDataset implements FSCons
   private ReplicaInfo recoverCheck(ExtendedBlock b, long newGS, 
       long expectedBlockLen) throws IOException {
     ReplicaInfo replicaInfo = getReplicaInfo(b);
-    if (replicaInfo == null) {
-      throw new ReplicaNotFoundException(
-          ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
-    }
     
     // check state
     if (replicaInfo.getState() != ReplicaState.FINALIZED &&
@@ -1461,6 +1464,7 @@ public class FSDataset implements FSCons
     
     return replicaInfo;
   }
+  
   @Override  // FSDatasetInterface
   public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b,
       long newGS, long expectedBlockLen) throws IOException {
@@ -1523,7 +1527,8 @@ public class FSDataset implements FSCons
   @Override // FSDatasetInterface
   public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b)
       throws IOException {
-    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId());
+    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), 
+        b.getBlockId());
     if (replicaInfo != null) {
       throw new ReplicaAlreadyExistsException("Block " + b +
       " already exists in state " + replicaInfo.getState() +
@@ -1545,11 +1550,7 @@ public class FSDataset implements FSCons
       throws IOException {
     DataNode.LOG.info("Recover the RBW replica " + b);
 
-    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId());
-    if (replicaInfo == null) {
-      throw new ReplicaNotFoundException(
-          ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
-    }
+    ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
     
     // check the replica's state
     if (replicaInfo.getState() != ReplicaState.RBW) {
@@ -1626,7 +1627,10 @@ public class FSDataset implements FSCons
 
   synchronized File createTmpFile(FSVolume vol, String bpid, Block blk) throws IOException
{
     if ( vol == null ) {
-      vol = getReplicaInfo(bpid, blk).getVolume();
+      ReplicaInfo replica = volumeMap.get(bpid, blk);
+      if (replica != null) {
+        vol = volumeMap.get(bpid, blk).getVolume();
+      }
       if ( vol == null ) {
         throw new IOException("Could not find volume for block " + blk);
       }
@@ -1684,7 +1688,8 @@ public class FSDataset implements FSCons
    */
   @Override // FSDatasetInterface
   public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException {
-    ReplicaInfo replicaInfo = getReplicaInfo(b);
+    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), 
+        b.getLocalBlock());
     if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY)
{
       // remove from volumeMap
       volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock());
@@ -1728,7 +1733,6 @@ public class FSDataset implements FSCons
    */
   @Override // FSDatasetInterface
   public BlockListAsLongs getBlockReport(String bpid) {
-    // TODO:FEDERATION volumeMap.size() has not been synchronized - old code
     int size =  volumeMap.size(bpid);
     ArrayList<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(size);
     ArrayList<ReplicaInfo> uc = new ArrayList<ReplicaInfo>();
@@ -1761,24 +1765,6 @@ public class FSDataset implements FSCons
   }
 
   /**
-   * Get the block list from in-memory blockmap for a block pool.
-   * 
-   * Note if <deepcopy> is false, reference to the block in the volumeMap is
-   * returned. This block should not be changed. Suitable synchronization using
-   * {@link FSDataset} is needed to handle concurrent modification to the block.
-   */
-  synchronized Block[] getBlockList(String bpid, boolean deepcopy) {
-    Block[] list = volumeMap.replicas(bpid).toArray(
-        new Block[volumeMap.size(bpid)]);
-    if (deepcopy) {
-      for (int i = 0; i < list.length; i++) {
-        list[i] = new Block(list[i]);
-      }
-    }
-    return list;
-  }
-
-  /**
    * Get the list of finalized blocks from in-memory blockmap for a block pool.
    */
   synchronized List<Block> getFinalizedBlocks(String bpid) {
@@ -1797,7 +1783,9 @@ public class FSDataset implements FSCons
    */
   @Override // FSDatasetInterface
   public boolean isValidBlock(ExtendedBlock b) {
-    ReplicaInfo replicaInfo = getReplicaInfo(b);
+    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), 
+        b.getLocalBlock());
+    
     if (replicaInfo == null || 
         replicaInfo.getState() != ReplicaState.FINALIZED) {
       return false;
@@ -2209,7 +2197,6 @@ public class FSDataset implements FSCons
   @Override // FSDatasetInterface
   @Deprecated
   public ReplicaInfo getReplica(String bpid, long blockId) {
-    assert(Thread.holdsLock(this));
     return volumeMap.get(bpid, blockId);
   }
 
@@ -2363,10 +2350,8 @@ public class FSDataset implements FSCons
   @Override // FSDatasetInterface
   public synchronized long getReplicaVisibleLength(final ExtendedBlock block)
   throws IOException {
-    final Replica replica = volumeMap.get(block.getBlockPoolId(), block.getBlockId());
-    if (replica == null) {
-      throw new ReplicaNotFoundException(block);
-    }
+    final Replica replica = getReplicaInfo(block.getBlockPoolId(), 
+        block.getBlockId());
     if (replica.getGenerationStamp() < block.getGenerationStamp()) {
       throw new IOException(
           "replica.getGenerationStamp() < block.getGenerationStamp(), block="

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java?rev=1076896&r1=1076895&r2=1076896&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java
Fri Mar  4 00:01:45 2011
@@ -21,15 +21,32 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.hdfs.protocol.Block;
 
+/**
+ * Maintains the replicas map. 
+ */
 class ReplicasMap {
+  // Object using which this class is synchronized
+  private final Object mutex;
+  
   // Map of block pool Id to another map of block Id to ReplicaInfo.
   private Map<String, Map<Long, ReplicaInfo>> map = 
     new HashMap<String, Map<Long, ReplicaInfo>>();
   
-  synchronized String[] getBlockPoolList() {
-    return map.keySet().toArray(new String[map.keySet().size()]);   
+  ReplicasMap(Object mutex) {
+    if (mutex == null) {
+      throw new HadoopIllegalArgumentException(
+          "Object to synchronize on cannot be null");
+    }
+    this.mutex = mutex;
+  }
+  
+  String[] getBlockPoolList() {
+    synchronized(mutex) {
+      return map.keySet().toArray(new String[map.keySet().size()]);   
+    }
   }
   
   private void checkBlockPool(String bpid) {
@@ -72,8 +89,10 @@ class ReplicasMap {
    */
   ReplicaInfo get(String bpid, long blockId) {
     checkBlockPool(bpid);
-    Map<Long, ReplicaInfo> m = map.get(bpid);
-    return m != null ? m.get(blockId) : null;
+    synchronized(mutex) {
+      Map<Long, ReplicaInfo> m = map.get(bpid);
+      return m != null ? m.get(blockId) : null;
+    }
   }
   
   /**
@@ -87,13 +106,15 @@ class ReplicasMap {
   ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
     checkBlockPool(bpid);
     checkBlock(replicaInfo);
-    Map<Long, ReplicaInfo> m = map.get(bpid);
-    if (m == null) {
-      // Add an entry for block pool if it does not exist already
-      m = new HashMap<Long, ReplicaInfo>();
-      map.put(bpid, m);
+    synchronized(mutex) {
+      Map<Long, ReplicaInfo> m = map.get(bpid);
+      if (m == null) {
+        // Add an entry for block pool if it does not exist already
+        m = new HashMap<Long, ReplicaInfo>();
+        map.put(bpid, m);
+      }
+      return  m.put(replicaInfo.getBlockId(), replicaInfo);
     }
-    return  m.put(replicaInfo.getBlockId(), replicaInfo);
   }
   
   /**
@@ -107,14 +128,16 @@ class ReplicasMap {
   ReplicaInfo remove(String bpid, Block block) {
     checkBlockPool(bpid);
     checkBlock(block);
-    Map<Long, ReplicaInfo> m = map.get(bpid);
-    if (m != null) {
-      Long key = Long.valueOf(block.getBlockId());
-      ReplicaInfo replicaInfo = m.get(key);
-      if (replicaInfo != null &&
-          block.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
-        return m.remove(key);
-      } 
+    synchronized(mutex) {
+      Map<Long, ReplicaInfo> m = map.get(bpid);
+      if (m != null) {
+        Long key = Long.valueOf(block.getBlockId());
+        ReplicaInfo replicaInfo = m.get(key);
+        if (replicaInfo != null &&
+            block.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
+          return m.remove(key);
+        } 
+      }
     }
     
     return null;
@@ -128,9 +151,11 @@ class ReplicasMap {
    */
   ReplicaInfo remove(String bpid, long blockId) {
     checkBlockPool(bpid);
-    Map<Long, ReplicaInfo> m = map.get(bpid);
-    if (m != null) {
-      return m.remove(blockId);
+    synchronized(mutex) {
+      Map<Long, ReplicaInfo> m = map.get(bpid);
+      if (m != null) {
+        return m.remove(blockId);
+      }
     }
     return null;
   }
@@ -141,27 +166,46 @@ class ReplicasMap {
    * @return the number of replicas in the map
    */
   int size(String bpid) {
-    Map<Long, ReplicaInfo> m = map.get(bpid);
-    return m != null ? m.size() : 0;
+    Map<Long, ReplicaInfo> m = null;
+    synchronized(mutex) {
+      m = map.get(bpid);
+      return m != null ? m.size() : 0;
+    }
   }
   
   /**
    * Get a collection of the replicas for given block pool
+   * This method is <b>not synchronized</b>. It needs to be synchronized
+   * externally using the mutex, both for getting the replicas
+   * values from the map and iterating over it. Mutex can be accessed using
+   * {@link #getMutext()} method.
+   * 
    * @param bpid block pool id
-   * @return a collection of the replicas
+   * @return a collection of the replicas belonging to the block pool
    */
   Collection<ReplicaInfo> replicas(String bpid) {
-    Map<Long, ReplicaInfo> m = map.get(bpid);
+    Map<Long, ReplicaInfo> m = null;
+    m = map.get(bpid);
     return m != null ? m.values() : null;
   }
 
   void initBlockPool(String bpid) {
     checkBlockPool(bpid);
-    Map<Long, ReplicaInfo> m = map.get(bpid);
-    if (m == null) {
-      // Add an entry for block pool if it does not exist already
-      m = new HashMap<Long, ReplicaInfo>();
-      map.put(bpid, m);
+    synchronized(mutex) {
+      Map<Long, ReplicaInfo> m = map.get(bpid);
+      if (m == null) {
+        // Add an entry for block pool if it does not exist already
+        m = new HashMap<Long, ReplicaInfo>();
+        map.put(bpid, m);
+      }
     }
   }
+  
+  /**
+   * Give access to mutex used for synchronizing ReplicasMap
+   * @return object used as lock
+   */
+  Object getMutext() {
+    return mutex;
+  }
 }
\ No newline at end of file

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java?rev=1076896&r1=1076895&r2=1076896&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
Fri Mar  4 00:01:45 2011
@@ -140,7 +140,7 @@ public class TestInterDatanodeProtocol {
     final long firstblockid = 10000L;
     final long gs = 7777L;
     final long length = 22L;
-    final ReplicasMap map = new ReplicasMap();
+    final ReplicasMap map = new ReplicasMap(this);
     String bpid = "BP-TEST";
     final Block[] blocks = new Block[5];
     for(int i = 0; i < blocks.length; i++) {

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java?rev=1076896&r1=1076895&r2=1076896&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java
Fri Mar  4 00:01:45 2011
@@ -26,7 +26,7 @@ import org.junit.Test;
  * Unit test for ReplicasMap class
  */
 public class TestReplicasMap {
-  private static final ReplicasMap map = new ReplicasMap();
+  private static final ReplicasMap map = new ReplicasMap(TestReplicasMap.class);
   private static final String bpid = "BP-TEST";
   private static final  Block block = new Block(1234, 1234, 1234);
   



Mime
View raw message