hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1086458 - in /hadoop/hdfs/trunk: ./ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/
Date Tue, 29 Mar 2011 01:09:32 GMT
Author: todd
Date: Tue Mar 29 01:09:32 2011
New Revision: 1086458

URL: http://svn.apache.org/viewvc?rev=1086458&view=rev
Log:
HDFS-1120. Make DataNode's block-to-device placement policy pluggable. Contributed by Harsh
J Chouraria.

Added:
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/hdfs-default.xml
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1086458&r1=1086457&r2=1086458&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Mar 29 01:09:32 2011
@@ -89,6 +89,9 @@ Trunk (unreleased changes)
     HDFS-1789. Refactor frequently used codes from DFSOutputStream,
     BlockReceiver and DataXceiver.  (szetszwo)
 
+    HDFS-1120. Make DataNode's block-to-device placement policy pluggable
+    (Harsh J Chouraria via todd)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

Modified: hadoop/hdfs/trunk/src/java/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/hdfs-default.xml?rev=1086458&r1=1086457&r2=1086458&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/hdfs-default.xml (original)
+++ hadoop/hdfs/trunk/src/java/hdfs-default.xml Tue Mar 29 01:09:32 2011
@@ -345,6 +345,15 @@ creations/deletions), or "all".</descrip
 </property>
 
 <property>
+  <name>dfs.datanode.block.volume.choice.policy</name>
+  <value>org.apache.hadoop.hdfs.server.datanode.RoundRobinVolumesPolicy</value>
+  <description>The policy class to use to determine into which of the
+  datanode's available volumes a block must be written to. Default is a simple
+  round-robin policy that chooses volumes in a cyclic order.
+  </description>
+</property>
+
+<property>
   <name>dfs.heartbeat.interval</name>
   <value>3</value>
   <description>Determines datanode heartbeat interval in seconds.</description>

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1086458&r1=1086457&r2=1086458&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Tue Mar 29 01:09:32
2011
@@ -174,6 +174,9 @@ public class DFSConfigKeys extends Commo
   public static final long    DFS_DATANODE_SIMULATEDDATASTORAGE_CAPACITY_DEFAULT = 2L<<40;
   public static final String  DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed";
   public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true;
+  public static final String  DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY = "dfs.datanode.block.volume.choice.policy";
+  public static final String  DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY_DEFAULT =
+    "org.apache.hadoop.hdfs.server.datanode.RoundRobinVolumesPolicy";
   public static final String  DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval";
   public static final long    DFS_HEARTBEAT_INTERVAL_DEFAULT = 3;
   public static final String  DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY = "dfs.namenode.decommission.interval";

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java?rev=1086458&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java
(added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java
Tue Mar 29 01:09:32 2011
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+
+/**************************************************
+ * BlockVolumeChoosingPolicy allows a DataNode to
+ * specify what policy is to be used while choosing
+ * a volume for a block request.
+ * 
+ ***************************************************/
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface BlockVolumeChoosingPolicy {
+
+  /**
+   * Returns a specific FSVolume after applying a suitable choice algorithm
+   * to place a given block, given a list of FSVolumes and the block
+   * size sought for storage.
+   * 
+   * (Policies that maintain state must be thread-safe.)
+   * 
+   * @param volumes - the array of FSVolumes that are available.
+   * @param blockSize - the size of the block for which a volume is sought.
+   * @return the chosen volume to store the block.
+   * @throws IOException when disks are unavailable or are full.
+   */
+  public FSVolume chooseVolume(FSVolume[] volumes, long blockSize)
+    throws IOException;
+
+}

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1086458&r1=1086457&r2=1086458&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Tue Mar
29 01:09:32 2011
@@ -61,6 +61,7 @@ import org.apache.hadoop.util.DiskChecke
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.io.IOUtils;
@@ -530,9 +531,11 @@ public class FSDataset implements FSCons
   static class FSVolumeSet {
     FSVolume[] volumes = null;
     int curVolume = 0;
+    BlockVolumeChoosingPolicy blockChooser;
       
-    FSVolumeSet(FSVolume[] volumes) {
+    FSVolumeSet(FSVolume[] volumes, BlockVolumeChoosingPolicy blockChooser) {
       this.volumes = volumes;
+      this.blockChooser = blockChooser;
     }
     
     private int numberOfVolumes() {
@@ -540,27 +543,7 @@ public class FSDataset implements FSCons
     }
       
     synchronized FSVolume getNextVolume(long blockSize) throws IOException {
-      
-      if(volumes.length < 1) {
-        throw new DiskOutOfSpaceException("No more available volumes");
-      }
-      
-      // since volumes could've been removed because of the failure
-      // make sure we are not out of bounds
-      if(curVolume >= volumes.length) {
-        curVolume = 0;
-      }
-      
-      int startVolume = curVolume;
-      
-      while (true) {
-        FSVolume volume = volumes[curVolume];
-        curVolume = (curVolume + 1) % volumes.length;
-        if (volume.getAvailable() > blockSize) { return volume; }
-        if (curVolume == startVolume) {
-          throw new DiskOutOfSpaceException("Insufficient space for an additional block");
-        }
-      }
+      return blockChooser.chooseVolume(volumes, blockSize);
     }
       
     long getDfsUsed() throws IOException {
@@ -862,7 +845,13 @@ public class FSDataset implements FSCons
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
       volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), conf);
     }
-    volumes = new FSVolumeSet(volArray);
+    BlockVolumeChoosingPolicy blockChooserImpl =
+      (BlockVolumeChoosingPolicy) ReflectionUtils.newInstance(
+        conf.getClass(DFSConfigKeys.DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY,
+            RoundRobinVolumesPolicy.class,
+            BlockVolumeChoosingPolicy.class),
+        conf);
+    volumes = new FSVolumeSet(volArray, blockChooserImpl);
     volumes.getVolumeMap(volumeMap);
     File[] roots = new File[storage.getNumStorageDirs()];
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java?rev=1086458&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java
(added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java
Tue Mar 29 01:09:32 2011
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
+
+public class RoundRobinVolumesPolicy implements BlockVolumeChoosingPolicy {
+
+  int curVolume = 0;
+
+  @Override
+  public synchronized FSVolume chooseVolume(FSVolume[] volumes, long blockSize)
+      throws IOException {
+    if(volumes.length < 1) {
+      throw new DiskOutOfSpaceException("No more available volumes");
+    }
+    
+    // since volumes could've been removed because of the failure
+    // make sure we are not out of bounds
+    if(curVolume >= volumes.length) {
+      curVolume = 0;
+    }
+    
+    int startVolume = curVolume;
+    
+    while (true) {
+      FSVolume volume = volumes[curVolume];
+      curVolume = (curVolume + 1) % volumes.length;
+      if (volume.getAvailable() > blockSize) { return volume; }
+      if (curVolume == startVolume) {
+        throw new DiskOutOfSpaceException("Insufficient space for an additional block");
+      }
+    }
+  }
+
+}

Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java?rev=1086458&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java
(added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java
Tue Mar 29 01:09:32 2011
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+import junit.framework.Assert;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestRoundRobinVolumesPolicy {
+
+  // Test the Round-Robin block-volume choosing algorithm.
+  @Test
+  public void testRR() throws Exception {
+    FSVolume[] volumes = new FSVolume[2];
+
+    // First volume, with 100 bytes of space.
+    volumes[0] = Mockito.mock(FSVolume.class);
+    Mockito.when(volumes[0].getAvailable()).thenReturn(100L);
+
+    // Second volume, with 200 bytes of space.
+    volumes[1] = Mockito.mock(FSVolume.class);
+    Mockito.when(volumes[1].getAvailable()).thenReturn(200L);
+
+    RoundRobinVolumesPolicy policy = ReflectionUtils.newInstance(
+        RoundRobinVolumesPolicy.class, null);
+    
+    // Test two rounds of round-robin choosing
+    Assert.assertEquals(volumes[0], policy.chooseVolume(volumes, 0));
+    Assert.assertEquals(volumes[1], policy.chooseVolume(volumes, 0));
+    Assert.assertEquals(volumes[0], policy.chooseVolume(volumes, 0));
+    Assert.assertEquals(volumes[1], policy.chooseVolume(volumes, 0));
+
+    // The first volume has only 100L space, so the policy should
+    // wisely choose the second one in case we ask for more.
+    Assert.assertEquals(volumes[1], policy.chooseVolume(volumes, 150));
+
+    // Fail if no volume can be chosen?
+    try {
+      policy.chooseVolume(volumes, Long.MAX_VALUE);
+      Assert.fail();
+    } catch (IOException e) {
+      // Passed.
+    }
+  }
+
+}



Mime
View raw message