hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1076406 - in /hadoop/hdfs/branches/HDFS-1052: ./ src/java/org/apache/hadoop/hdfs/server/balancer/ src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/
Date Wed, 02 Mar 2011 21:45:21 GMT
Author: szetszwo
Date: Wed Mar  2 21:45:20 2011
New Revision: 1076406

URL: http://svn.apache.org/viewvc?rev=1076406&view=rev
Log:
HDFS-1681. Balancer: support per pool and per node policies.

Added:
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java
Modified:
    hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java

Modified: hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/CHANGES.txt?rev=1076406&r1=1076405&r2=1076406&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-1052/CHANGES.txt Wed Mar  2 21:45:20 2011
@@ -130,6 +130,7 @@ Trunk (unreleased changes)
     datanode should wait (keep connecting) untill NN comes up 
     with the right version (boryas)
 
+    HDFS-1681. Balancer: support per pool and per node policies.  (szetszwo)
 
   IMPROVEMENTS
 

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1076406&r1=1076405&r2=1076406&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
Wed Mar  2 21:45:20 2011
@@ -74,6 +74,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
@@ -200,6 +201,7 @@ public class Balancer implements Tool {
   private Configuration conf;
   private BlockPool theblockpool;
 
+  private BalancingPolicy policy;
   private double threshold = 10D;
   private boolean isBlockTokenEnabled;
   private boolean shouldRun;
@@ -231,8 +233,6 @@ public class Balancer implements Tool {
   
   private NetworkTopology cluster = new NetworkTopology();
   
-  private double avgUtilization = 0.0D;
-  
   final static private int MOVER_THREAD_POOL_SIZE = 1000;
   final private ExecutorService moverExecutor = 
     Executors.newFixedThreadPool(MOVER_THREAD_POOL_SIZE);
@@ -248,18 +248,21 @@ public class Balancer implements Tool {
     final ClientProtocol client;
     final FileSystem fs;
 
-    private BlockPool(InetSocketAddress namenodeAddress, final String id,
-        Configuration conf) throws IOException {
+    private BlockPool(InetSocketAddress namenodeAddress, Configuration conf
+        ) throws IOException {
       this.namenodeAddress = namenodeAddress;
-      this.id = id;
       this.namenode = createNamenode(namenodeAddress, conf);
       this.client = DFSClient.createNamenode(conf);
       this.fs = FileSystem.get(conf);
+
+      final NamespaceInfo namespaceinfo = namenode.versionRequest();
+      this.id = namespaceinfo.getBlockPoolID();
     }
 
     @Override
     public String toString() {
       return getClass().getSimpleName() + "[namenodeAddress=" + namenodeAddress
+          + ", id=" + id
           + "]";
     }
   }
@@ -512,17 +515,13 @@ public class Balancer implements Tool {
     }
   }
   
-  /* Return the utilization of a datanode */
-  static private double getUtilization(DatanodeInfo datanode) {
-    return ((double)datanode.getDfsUsed())/datanode.getCapacity()*100;
-  }
   
   /* A class that keeps track of a datanode in Balancer */
   private static class BalancerDatanode {
     final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB
-    protected DatanodeInfo datanode;
-    private double utilization;
-    protected long maxSizeToMove;
+    final DatanodeInfo datanode;
+    final double utilization;
+    final long maxSize2Move;
     protected long scheduledSize = 0L;
     //  blocks being moved but not confirmed yet
     private List<PendingBlockMove> pendingBlocks = 
@@ -531,11 +530,12 @@ public class Balancer implements Tool {
     /* Constructor 
      * Depending on avgutil & threshold, calculate maximum bytes to move 
      */
-    private BalancerDatanode(
-        DatanodeInfo node, double avgUtil, double threshold) {
+    private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold)
{
       datanode = node;
-      utilization = Balancer.getUtilization(node);
-        
+      utilization = policy.getUtilization(node);
+      final double avgUtil = policy.getAvgUtilization();
+      long maxSizeToMove;
+
       if (utilization >= avgUtil+threshold
           || utilization <= avgUtil-threshold) { 
         maxSizeToMove = (long)(threshold*datanode.getCapacity()/100);
@@ -546,7 +546,7 @@ public class Balancer implements Tool {
       if (utilization < avgUtil ) {
         maxSizeToMove = Math.min(datanode.getRemaining(), maxSizeToMove);
       }
-      maxSizeToMove = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
+      this.maxSize2Move = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
     }
     
     /** Get the datanode */
@@ -566,12 +566,12 @@ public class Balancer implements Tool {
     
     /** Decide if still need to move more bytes */
     protected boolean isMoveQuotaFull() {
-      return scheduledSize<maxSizeToMove;
+      return scheduledSize<maxSize2Move;
     }
 
     /** Return the total number of bytes that need to be moved */
     protected long availableSizeToMove() {
-      return maxSizeToMove-scheduledSize;
+      return maxSize2Move-scheduledSize;
     }
     
     /* increment scheduled size */
@@ -629,8 +629,8 @@ public class Balancer implements Tool {
             = new ArrayList<BalancerBlock>();
     
     /* constructor */
-    private Source(DatanodeInfo node, double avgUtil, double threshold) {
-      super(node, avgUtil, threshold);
+    private Source(DatanodeInfo node, BalancingPolicy policy, double threshold) {
+      super(node, policy, threshold);
     }
     
     /** Add a node task */
@@ -882,11 +882,12 @@ public class Balancer implements Tool {
    * namenode as a client and a secondary namenode and retry proxies
    * when connection fails.
    */
-  private void init(InetSocketAddress namenodeAddress,  final String id,
+  private void init(InetSocketAddress namenodeAddress, BalancingPolicy policy,
       double threshold) throws IOException {
+    this.policy = policy;
     this.threshold = threshold;
 
-    this.theblockpool = new BlockPool(namenodeAddress, id, conf);
+    this.theblockpool = new BlockPool(namenodeAddress, conf);
     ExportedBlockKeys keys = theblockpool.namenode.getBlockKeys();
     this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
     if (isBlockTokenEnabled) {
@@ -992,15 +993,13 @@ public class Balancer implements Tool {
    */
   private long initNodes(DatanodeInfo[] datanodes) {
     // compute average utilization
-    long totalCapacity=0L, totalUsedSpace=0L;
     for (DatanodeInfo datanode : datanodes) {
       if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
         continue; // ignore decommissioning or decommissioned nodes
       }
-      totalCapacity += datanode.getCapacity();
-      totalUsedSpace += datanode.getDfsUsed();
+      policy.accumulateSpaces(datanode);
     }
-    this.avgUtilization = ((double)totalUsedSpace)/totalCapacity*100;
+    policy.initAvgUtilization();
 
     /*create network topology and all data node lists: 
      * overloaded, above-average, below-average, and underloaded
@@ -1015,26 +1014,27 @@ public class Balancer implements Tool {
       }
       cluster.add(datanode);
       BalancerDatanode datanodeS;
-      if (getUtilization(datanode) > avgUtilization) {
-        datanodeS = new Source(datanode, avgUtilization, threshold);
+      final double avg = policy.getAvgUtilization();
+      if (policy.getUtilization(datanode) > avg) {
+        datanodeS = new Source(datanode, policy, threshold);
         if (isAboveAvgUtilized(datanodeS)) {
           this.aboveAvgUtilizedDatanodes.add((Source)datanodeS);
         } else {
           assert(isOverUtilized(datanodeS)) :
             datanodeS.getName()+ "is not an overUtilized node";
           this.overUtilizedDatanodes.add((Source)datanodeS);
-          overLoadedBytes += (long)((datanodeS.utilization-avgUtilization
+          overLoadedBytes += (long)((datanodeS.utilization-avg
               -threshold)*datanodeS.datanode.getCapacity()/100.0);
         }
       } else {
-        datanodeS = new BalancerDatanode(datanode, avgUtilization, threshold);
+        datanodeS = new BalancerDatanode(datanode, policy, threshold);
         if ( isBelowAvgUtilized(datanodeS)) {
           this.belowAvgUtilizedDatanodes.add(datanodeS);
         } else {
           assert (isUnderUtilized(datanodeS)) :
             datanodeS.getName()+ "is not an underUtilized node"; 
           this.underUtilizedDatanodes.add(datanodeS);
-          underLoadedBytes += (long)((avgUtilization-threshold-
+          underLoadedBytes += (long)((avg-threshold-
               datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0);
         }
       }
@@ -1441,7 +1441,7 @@ public class Balancer implements Tool {
     this.datanodes.clear();
     this.sources.clear();
     this.targets.clear();  
-    this.avgUtilization = 0.0D;
+    this.policy.reset();
     cleanGlobalBlockList();
     this.movedBlocks.cleanup();
   }
@@ -1461,26 +1461,28 @@ public class Balancer implements Tool {
   
   /* Return true if the given datanode is overUtilized */
   private boolean isOverUtilized(BalancerDatanode datanode) {
-    return datanode.utilization > (avgUtilization+threshold);
+    return datanode.utilization > (policy.getAvgUtilization()+threshold);
   }
   
   /* Return true if the given datanode is above average utilized
    * but not overUtilized */
   private boolean isAboveAvgUtilized(BalancerDatanode datanode) {
-    return (datanode.utilization <= (avgUtilization+threshold))
-        && (datanode.utilization > avgUtilization);
+    final double avg = policy.getAvgUtilization();
+    return (datanode.utilization <= (avg+threshold))
+        && (datanode.utilization > avg);
   }
   
   /* Return true if the given datanode is underUtilized */
   private boolean isUnderUtilized(BalancerDatanode datanode) {
-    return datanode.utilization < (avgUtilization-threshold);
+    return datanode.utilization < (policy.getAvgUtilization()-threshold);
   }
 
   /* Return true if the given datanode is below average utilized 
    * but not underUtilized */
   private boolean isBelowAvgUtilized(BalancerDatanode datanode) {
-        return (datanode.utilization >= (avgUtilization-threshold))
-                 && (datanode.utilization < avgUtilization);
+    final double avg = policy.getAvgUtilization();
+    return (datanode.utilization >= (avg-threshold))
+             && (datanode.utilization < avg);
   }
 
   // Exit status
@@ -1495,20 +1497,20 @@ public class Balancer implements Tool {
    * @throws Exception exception that occured during datanode balancing
    */
   public int run(String[] args) throws Exception {
-    return run(NameNode.getServiceAddress(conf, true), "TODO", parseArgs(args));
+    return run(NameNode.getServiceAddress(conf, true),
+        BalancingPolicy.Node.INSTANCE, parseArgs(args));
   }
 
-  int run(InetSocketAddress namenodeAddress, String blockpoolId,
+  int run(InetSocketAddress namenodeAddress, BalancingPolicy policy, 
       double threshold) throws IOException, InterruptedException {
     LOG.info("namenodeAddress = " + namenodeAddress);
-    LOG.info("blockpoolId     = " + blockpoolId);
     LOG.info("threshold       = " + threshold);
 
     long startTime = Util.now();
     OutputStream out = null;
     try {
       // initialize a balancer
-      init(namenodeAddress, blockpoolId, threshold);
+      init(namenodeAddress, policy, threshold);
       
       /* Check if there is another balancer running.
        * Exit if there is another one running.

Added: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java?rev=1076406&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java
(added)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java
Wed Mar  2 21:45:20 2011
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.balancer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+
+/**
+ * Balancing policy.
+ * Since a datanode may contain multiple block pools,
+ * {@link Pool} implies {@link Node}
+ * but NOT the other way around
+ */
+@InterfaceAudience.Private
+abstract class BalancingPolicy {
+  long totalCapacity;
+  long totalUsedSpace;
+  private double avgUtilization;
+
+  void reset() {
+    totalCapacity = 0L;
+    totalUsedSpace = 0L;
+    avgUtilization = 0.0;
+  }
+
+  /** Accumulate used space and capacity. */
+  abstract void accumulateSpaces(DatanodeInfo d);
+
+  void initAvgUtilization() {
+    this.avgUtilization = totalUsedSpace*100.0/totalCapacity;
+  }
+  double getAvgUtilization() {
+    return avgUtilization;
+  }
+
+  /** Return the utilization of a datanode */
+  abstract double getUtilization(DatanodeInfo d);
+
+  /**
+   * Cluster is balanced if each node is balance.
+   */
+  static class Node extends BalancingPolicy {
+    static Node INSTANCE = new Node();
+    private Node() {}
+
+    @Override
+    void accumulateSpaces(DatanodeInfo d) {
+      totalCapacity += d.getCapacity();
+      totalUsedSpace += d.getDfsUsed();  
+    }
+    
+    @Override
+    double getUtilization(DatanodeInfo d) {
+      return d.getDfsUsed()*100.0/d.getCapacity();
+    }
+  }
+
+  /**
+   * Cluster is balanced if each pool in each node is balance.
+   */
+  static class Pool extends BalancingPolicy {
+    static Pool INSTANCE = new Pool();
+    private Pool() {}
+
+    @Override
+    void accumulateSpaces(DatanodeInfo d) {
+      totalCapacity += d.getCapacity();
+      totalUsedSpace += d.getBlockPoolUsed();  
+    }
+
+    @Override
+    double getUtilization(DatanodeInfo d) {
+      return d.getBlockPoolUsed()*100.0/d.getCapacity();
+    }
+  }
+}

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=1076406&r1=1076405&r2=1076406&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
Wed Mar  2 21:45:20 2011
@@ -251,9 +251,8 @@ public class TestBalancer extends TestCa
     // start rebalancing
     balancer = new Balancer(conf);
     final InetSocketAddress namenodeAddress = NameNode.getServiceAddress(conf, true);
-    final String blockpoolId = cluster.getNamesystem().getBlockPoolId();
     final double threshold = balancer.parseArgs(new String[0]);
-    balancer.run(namenodeAddress, blockpoolId, threshold);
+    balancer.run(namenodeAddress, BalancingPolicy.Node.INSTANCE, threshold);
 
     waitForHeartBeat(totalUsedSpace, totalCapacity);
     boolean balanced;
@@ -286,9 +285,8 @@ public class TestBalancer extends TestCa
     balancer = new Balancer();
     balancer.setConf(conf);
     final InetSocketAddress namenodeAddress = NameNode.getServiceAddress(conf, true);
-    final String blockpoolId = cluster.getNamesystem().getBlockPoolId();
     final double threshold = balancer.parseArgs(new String[0]);
-    balancer.run(namenodeAddress, blockpoolId, threshold);
+    balancer.run(namenodeAddress, BalancingPolicy.Node.INSTANCE, threshold);
 
     waitForHeartBeat(totalUsedSpace, totalCapacity);
     boolean balanced;



Mime
View raw message