hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject hadoop git commit: HDFS-9420. Add DataModels for DiskBalancer. Contributed by Anu Engineer
Date Tue, 24 Nov 2015 03:08:52 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-1312 411e2b2e7 -> 00537b8fb


HDFS-9420. Add DataModels for DiskBalancer. Contributed by Anu Engineer


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/00537b8f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/00537b8f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/00537b8f

Branch: refs/heads/HDFS-1312
Commit: 00537b8fbacfeb8490bc07eaece001e1051860b9
Parents: 411e2b2
Author: Tsz-Wo Nicholas Sze <szetszwo@hortonworks.com>
Authored: Mon Nov 23 19:07:42 2015 -0800
Committer: Tsz-Wo Nicholas Sze <szetszwo@hortonworks.com>
Committed: Mon Nov 23 19:07:42 2015 -0800

----------------------------------------------------------------------
 .../hadoop-hdfs/HDFS-1312_CHANGES.txt           |   5 +
 .../connectors/ClusterConnector.java            |  44 +++
 .../diskbalancer/connectors/package-info.java   |  29 ++
 .../datamodel/DiskBalancerCluster.java          | 249 ++++++++++++++
 .../datamodel/DiskBalancerDataNode.java         | 269 +++++++++++++++
 .../datamodel/DiskBalancerVolume.java           | 330 +++++++++++++++++++
 .../datamodel/DiskBalancerVolumeSet.java        | 325 ++++++++++++++++++
 .../diskbalancer/datamodel/package-info.java    |  31 ++
 .../hdfs/server/diskbalancer/package-info.java  |  36 ++
 .../diskbalancer/DiskBalancerTestUtil.java      | 227 +++++++++++++
 .../server/diskbalancer/TestDataModels.java     | 224 +++++++++++++
 .../diskbalancer/connectors/NullConnector.java  |  59 ++++
 12 files changed, 1828 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/00537b8f/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
new file mode 100644
index 0000000..5a71032
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
@@ -0,0 +1,5 @@
+HDFS-1312 Change Log
+
+  NEW FEATURES
+
+    HDFS-9420. Add DataModels for DiskBalancer. (Anu Engineer via szetszwo)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00537b8f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/ClusterConnector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/ClusterConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/ClusterConnector.java
new file mode 100644
index 0000000..3dbfec2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/ClusterConnector.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer.connectors;
+
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
+
+import java.util.List;
+
+/**
+ * ClusterConnector interface hides all specifics about how we communicate to
+ * the HDFS cluster. This interface returns data in classes that diskbalancer
+ * understands.
+ */
+public interface ClusterConnector {
+
+  /**
+   * getNodes function returns a list of DiskBalancerDataNodes.
+   *
+   * @return Array of DiskBalancerDataNodes
+   */
+  List<DiskBalancerDataNode> getNodes() throws Exception;
+
+  /**
+   * Returns info about the connector.
+   *
+   * @return String.
+   */
+  String getConnectorInfo();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00537b8f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/package-info.java
new file mode 100644
index 0000000..8164804
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/package-info.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer.connectors;
+
+/**
+ * Connectors package is a set of logical connectors that connect
+ * to various data sources to read the hadoop cluster information.
+ *
+ * We currently have 1 connector in this package. it is
+ *
+ * NullConnector - This is an in-memory connector that is useful in testing.
+ * we can crate dataNodes on the fly and attach to this connector and
+ * ask the diskBalancer Cluster to read data from this source.
+ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00537b8f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java
new file mode 100644
index 0000000..91f7eaa
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer.datamodel;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * DiskBalancerCluster represents the nodes that we are working against.
+ * <p/>
+ * Please Note :
+ * <p/>
+ * Semantics of inclusionList and exclusionLists.
+ * <p/>
+ * If a non-empty inclusionList is specified then the diskBalancer assumes that
+ * the user is only interested in processing that list of nodes. This node list
+ * is checked against the exclusionList and only the nodes in inclusionList but
+ * not in exclusionList is processed.
+ * <p/>
+ * if inclusionList is empty, then we assume that all live nodes in the nodes is
+ * to be processed by diskBalancer. In that case diskBalancer will avoid any
+ * nodes specified in the exclusionList but will process all nodes in the
+ * cluster.
+ * <p/>
+ * In other words, an empty inclusionList is means all the nodes otherwise
+ * only a given list is processed and ExclusionList is always honored.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DiskBalancerCluster {
+
+  static final Log LOG = LogFactory.getLog(DiskBalancerCluster.class);
+  private final Set<String> exclusionList;
+  private final Set<String> inclusionList;
+  private ClusterConnector clusterConnector;
+  private List<DiskBalancerDataNode> nodes;
+  private String outputpath;
+
+  @JsonIgnore
+  private List<DiskBalancerDataNode> nodesToProcess;
+  private float threshold;
+
+  /**
+   * Empty Constructor needed by Jackson.
+   */
+  public DiskBalancerCluster() {
+    nodes = new LinkedList<>();
+    exclusionList = new TreeSet<>();
+    inclusionList = new TreeSet<>();
+
+  }
+
+  /**
+   * Constructs a DiskBalancerCluster.
+   *
+   * @param connector - ClusterConnector
+   * @throws IOException
+   */
+  public DiskBalancerCluster(ClusterConnector connector) throws IOException {
+    Preconditions.checkNotNull(connector);
+    clusterConnector = connector;
+    exclusionList = new TreeSet<>();
+    inclusionList = new TreeSet<>();
+  }
+
+  /**
+   * Parses a Json string and converts to DiskBalancerCluster.
+   *
+   * @param json - Json String
+   * @return DiskBalancerCluster
+   * @throws IOException
+   */
+  public static DiskBalancerCluster parseJson(String json) throws IOException {
+    ObjectMapper mapper = new ObjectMapper();
+    return mapper.readValue(json, DiskBalancerCluster.class);
+  }
+
+  /**
+   * readClusterInfo connects to the cluster and reads the node's data.  This
+   * data is used as basis of rest of computation in DiskBalancerCluster
+   */
+  public void readClusterInfo() throws Exception {
+    Preconditions.checkNotNull(clusterConnector);
+    LOG.info("Using connector : " + clusterConnector.getConnectorInfo());
+    nodes = clusterConnector.getNodes();
+  }
+
+  /**
+   * Gets all DataNodes in the Cluster.
+   *
+   * @return Array of DisKBalancerDataNodes
+   */
+  public List<DiskBalancerDataNode> getNodes() {
+    return nodes;
+  }
+
+  /**
+   * Sets the list of nodes of this cluster.
+   *
+   * @param clusterNodes List of Nodes
+   */
+  public void setNodes(List<DiskBalancerDataNode> clusterNodes) {
+    this.nodes = clusterNodes;
+  }
+
+  /**
+   * Returns the current ExclusionList.
+   *
+   * @return List of Nodes that are excluded from diskBalancer right now.
+   */
+  public Set<String> getExclusionList() {
+    return exclusionList;
+  }
+
+  /**
+   * sets the list of nodes to exclude from process of diskBalancer.
+   *
+   * @param excludedNodes - exclusionList of nodes.
+   */
+  public void setExclusionList(Set<String> excludedNodes) {
+    this.exclusionList.addAll(excludedNodes);
+  }
+
+  /**
+   * Returns the threshold value. This is used for indicating how much skew is
+   * acceptable, This is expressed as a percentage. For example to say 20% skew
+   * between volumes is acceptable set this value to 20.
+   *
+   * @return float
+   */
+  public float getThreshold() {
+    return threshold;
+  }
+
+  /**
+   * Sets the threshold value.
+   *
+   * @param thresholdPercent - float - in percentage
+   */
+  public void setThreshold(float thresholdPercent) {
+    Preconditions.checkState((thresholdPercent >= 0.0f) &&
+        (thresholdPercent <= 100.0f),  "A percentage value expected.");
+    this.threshold = thresholdPercent;
+  }
+
+  /**
+   * Gets the Inclusion list.
+   *
+   * @return List of machine to be processed by diskBalancer.
+   */
+  public Set<String> getInclusionList() {
+    return inclusionList;
+  }
+
+  /**
+   * Sets the inclusionList.
+   *
+   * @param includeNodes - set of machines to be processed by diskBalancer.
+   */
+  public void setInclusionList(Set<String> includeNodes) {
+    this.inclusionList.addAll(includeNodes);
+  }
+
+  /**
+   * returns a serialized json string.
+   *
+   * @return String - json
+   * @throws IOException
+   */
+  public String toJson() throws IOException {
+    ObjectMapper mapper = new ObjectMapper();
+    return mapper.writeValueAsString(this);
+  }
+
+  /**
+   * Returns the Nodes to Process which is the real list of nodes processed by
+   * diskBalancer.
+   *
+   * @return List of DiskBalancerDataNodes
+   */
+  @JsonIgnore
+  public List<DiskBalancerDataNode> getNodesToProcess() {
+    return nodesToProcess;
+  }
+
+  /**
+   * Sets the nodes to process.
+   *
+   * @param dnNodesToProcess - List of DataNodes to process
+   */
+  @JsonIgnore
+  public void setNodesToProcess(List<DiskBalancerDataNode> dnNodesToProcess) {
+    this.nodesToProcess = dnNodesToProcess;
+  }
+
+  /**
+   * Returns th output path for this cluster.
+   */
+  public String getOutput() {
+    return outputpath;
+  }
+
+  /**
+   * Sets the output path for this run.
+   *
+   * @param output - Path
+   */
+  public void setOutput(String output) {
+    this.outputpath = output;
+  }
+
+  /**
+   * Writes a snapshot of the cluster to the specified directory.
+   *
+   * @param snapShotName - name of the snapshot
+   */
+  public void createSnapshot(String snapShotName) throws IOException {
+    String json = this.toJson();
+    File outFile = new File(getOutput() + "/" + snapShotName);
+    FileUtils.writeStringToFile(outFile, json);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00537b8f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java
new file mode 100644
index 0000000..87030db
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer.datamodel;
+
+import com.google.common.base.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * DiskBalancerDataNode represents a DataNode that exists in the cluster. It
+ * also contains a metric called nodeDataDensity which allows us to compare
+ * between a set of Nodes.
+ */
+public class DiskBalancerDataNode implements Comparable<DiskBalancerDataNode> {
+  private float nodeDataDensity;
+  private Map<String, DiskBalancerVolumeSet> volumeSets;
+  private String dataNodeUUID;
+  private String dataNodeIP;
+  private int dataNodePort;
+  private String dataNodeName;
+  private int volumeCount;
+
+  /**
+   * Constructs an Empty Data Node.
+   */
+  public DiskBalancerDataNode() {
+  }
+
+  /**
+   * Constructs a DataNode.
+   *
+   * @param dataNodeID - Node ID
+   */
+  public DiskBalancerDataNode(String dataNodeID) {
+    this.dataNodeUUID = dataNodeID;
+    volumeSets = new HashMap<>();
+  }
+
+  /**
+   * Returns the IP address of this Node.
+   *
+   * @return IP Address string
+   */
+  public String getDataNodeIP() {
+    return dataNodeIP;
+  }
+
+  /**
+   * Sets the IP address of this Node.
+   *
+   * @param ipaddress - IP Address
+   */
+  public void setDataNodeIP(String ipaddress) {
+    this.dataNodeIP = ipaddress;
+  }
+
+  /**
+   * Returns the Port of this DataNode.
+   *
+   * @return Port Number
+   */
+  public int getDataNodePort() {
+    return dataNodePort;
+  }
+
+  /**
+   * Sets the DataNode Port number.
+   *
+   * @param port - Datanode Port Number
+   */
+  public void setDataNodePort(int port) {
+    this.dataNodePort = port;
+  }
+
+  /**
+   * Get DataNode DNS name.
+   *
+   * @return name of the node
+   */
+  public String getDataNodeName() {
+    return dataNodeName;
+  }
+
+  /**
+   * Sets node's DNS name.
+   *
+   * @param name - Data node name
+   */
+  public void setDataNodeName(String name) {
+    this.dataNodeName = name;
+  }
+
+  /**
+   * Returns the Volume sets on this node.
+   *
+   * @return a Map of VolumeSets
+   */
+  public Map<String, DiskBalancerVolumeSet> getVolumeSets() {
+    return volumeSets;
+  }
+
+  /**
+   * Returns datanode ID.
+   **/
+  public String getDataNodeUUID() {
+    return dataNodeUUID;
+  }
+
+  /**
+   * Sets Datanode UUID.
+   *
+   * @param nodeID - Node ID.
+   */
+  public void setDataNodeUUID(String nodeID) {
+    this.dataNodeUUID = nodeID;
+  }
+
+  /**
+   * Indicates whether some other object is "equal to" this one.
+   */
+  @Override
+  public boolean equals(Object obj) {
+    if ((obj == null) || (obj.getClass() != getClass())) {
+      return false;
+    }
+    DiskBalancerDataNode that = (DiskBalancerDataNode) obj;
+    return dataNodeUUID.equals(that.getDataNodeUUID());
+  }
+
+  /**
+   * Compares this object with the specified object for order.  Returns a
+   * negative integer, zero, or a positive integer as this object is less than,
+   * equal to, or greater than the specified object.
+   *
+   * @param that the object to be compared.
+   * @return a negative integer, zero, or a positive integer as this object is
+   * less than, equal to, or greater than the specified object.
+   * @throws NullPointerException if the specified object is null
+   * @throws ClassCastException   if the specified object's type prevents it
+   *                              from being compared to this object.
+   */
+  @Override
+  public int compareTo(DiskBalancerDataNode that) {
+    Preconditions.checkNotNull(that);
+
+    if (Float.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
+        < 0) {
+      return -1;
+    }
+
+    if (Float.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
+        == 0) {
+      return 0;
+    }
+
+    if (Float.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
+        > 0) {
+      return 1;
+    }
+    return 0;
+  }
+
+  /**
+   * Returns a hash code value for the object. This method is supported for the
+   * benefit of hash tables such as those provided by {@link HashMap}.
+   */
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
+
+  /**
+   * returns NodeDataDensity Metric.
+   *
+   * @return float
+   */
+  public float getNodeDataDensity() {
+    return nodeDataDensity;
+  }
+
+  /**
+   * computes nodes data density.
+   * <p/>
+   * This metric allows us to compare different  nodes and how well the data is
+   * spread across a set of volumes inside the node.
+   */
+  public void computeNodeDensity() {
+    float sum = 0;
+    int volcount = 0;
+    for (DiskBalancerVolumeSet vset : volumeSets.values()) {
+      for (DiskBalancerVolume vol : vset.getVolumes()) {
+        sum += Math.abs(vol.getVolumeDataDensity());
+        volcount++;
+      }
+    }
+    nodeDataDensity = sum;
+    this.volumeCount = volcount;
+
+  }
+
+  /**
+   * Computes if this node needs balancing at all.
+   *
+   * @param threshold - Percentage
+   * @return true or false
+   */
+  public boolean isBalancingNeeded(float threshold) {
+    for (DiskBalancerVolumeSet vSet : getVolumeSets().values()) {
+      if (vSet.isBalancingNeeded(threshold)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Adds a volume to the DataNode.
+   * <p/>
+   * it is assumed that we have one thread per node hence this call is not
+   * synchronised neither is the map is protected.
+   *
+   * @param volume - volume
+   */
+  public void addVolume(DiskBalancerVolume volume) throws Exception {
+    Preconditions.checkNotNull(volume, "volume cannot be null");
+    Preconditions.checkNotNull(volumeSets, "volume sets cannot be null");
+    Preconditions
+        .checkNotNull(volume.getStorageType(), "storage type cannot be null");
+
+    String volumeSetKey = volume.getStorageType();
+    DiskBalancerVolumeSet vSet;
+    if (volumeSets.containsKey(volumeSetKey)) {
+      vSet = volumeSets.get(volumeSetKey);
+    } else {
+      vSet = new DiskBalancerVolumeSet(volume.isTransient());
+      volumeSets.put(volumeSetKey, vSet);
+    }
+
+    vSet.addVolume(volume);
+    computeNodeDensity();
+  }
+
+  /**
+   * Returns how many volumes are in the DataNode.
+   *
+   * @return int
+   */
+  public int getVolumeCount() {
+    return volumeCount;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00537b8f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolume.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolume.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolume.java
new file mode 100644
index 0000000..a608248
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolume.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer.datamodel;
+
+import com.google.common.base.Preconditions;
+import org.apache.htrace.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.htrace.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+ * DiskBalancerVolume represents a volume in the DataNode.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DiskBalancerVolume {
+  private String path;
+  private long capacity;
+  private String storageType;
+  private long used;
+  private long reserved;
+  private String uuid;
+  private boolean failed;
+  private boolean isTransient;
+  private float volumeDataDensity;
+  private boolean skip = false;
+  private boolean isReadOnly;
+
+  /**
+   * Constructs DiskBalancerVolume.
+   */
+  public DiskBalancerVolume() {
+  }
+
+  /**
+   * Parses a Json string and converts to DiskBalancerVolume.
+   *
+   * @param json - Json String
+   *
+   * @return DiskBalancerCluster
+   *
+   * @throws IOException
+   */
+  public static DiskBalancerVolume parseJson(String json) throws IOException {
+    ObjectMapper mapper = new ObjectMapper();
+    return mapper.readValue(json, DiskBalancerVolume.class);
+  }
+
+  /**
+   * Get this volume Data Density
+   * Please see DiskBalancerVolumeSet#computeVolumeDataDensity to see how
+   * this is computed.
+   *
+   * @return float.
+   */
+  public float getVolumeDataDensity() {
+    return volumeDataDensity;
+  }
+
+  /**
+   * Sets this volume's data density.
+   *
+   * @param volDataDensity - density
+   */
+  public void setVolumeDataDensity(float volDataDensity) {
+    this.volumeDataDensity = volDataDensity;
+  }
+
+  /**
+   * Indicates if the volume is Transient in nature.
+   *
+   * @return true or false.
+   */
+  public boolean isTransient() {
+    return isTransient;
+  }
+
+  /**
+   * Sets volumes transient nature.
+   *
+   * @param aTransient - bool
+   */
+  public void setTransient(boolean aTransient) {
+    this.isTransient = aTransient;
+  }
+
+  /**
+   * Compares two volumes and decides if it is the same volume.
+   *
+   * @param o Volume Object
+   *
+   * @return boolean
+   */
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    DiskBalancerVolume that = (DiskBalancerVolume) o;
+    return uuid.equals(that.uuid);
+  }
+
+  /**
+   * Computes hash code for a diskBalancerVolume.
+   *
+   * @return int
+   */
+  @Override
+  public int hashCode() {
+    return uuid.hashCode();
+  }
+
+  /**
+   * Capacity of this volume.
+   *
+   * @return long
+   */
+  public long getCapacity() {
+    return capacity;
+  }
+
+  /**
+   * Sets the capacity of this volume.
+   *
+   * @param totalCapacity long
+   */
+  public void setCapacity(long totalCapacity) {
+    this.capacity = totalCapacity;
+  }
+
+  /**
+   * Indicates if this is a failed volume.
+   *
+   * @return boolean
+   */
+  public boolean isFailed() {
+    return failed;
+  }
+
+  /**
+   * Sets the failed flag for this volume.
+   *
+   * @param fail boolean
+   */
+  public void setFailed(boolean fail) {
+    this.failed = fail;
+  }
+
+  /**
+   * Returns the path for this volume.
+   *
+   * @return String
+   */
+  public String getPath() {
+    return path;
+  }
+
+  /**
+   * Sets the path for this volume.
+   *
+   * @param volPath Path
+   */
+  public void setPath(String volPath) {
+    this.path = volPath;
+  }
+
+  /**
+   * Gets the reserved size for this volume.
+   *
+   * @return Long - Reserved size.
+   */
+  public long getReserved() {
+    return reserved;
+  }
+
+  /**
+   * Sets the reserved size.
+   *
+   * @param reservedSize -- Sets the reserved.
+   */
+  public void setReserved(long reservedSize) {
+    this.reserved = reservedSize;
+  }
+
+  /**
+   * Gets the StorageType.
+   *
+   * @return String StorageType.
+   */
+  public String getStorageType() {
+    return storageType;
+  }
+
+  /**
+   * Sets the StorageType.
+   *
+   * @param typeOfStorage - Storage Type String.
+   */
+  public void setStorageType(String typeOfStorage) {
+    this.storageType = typeOfStorage;
+  }
+
+  /**
+   * Gets the dfsUsed Size.
+   *
+   * @return - long - used space
+   */
+  public long getUsed() {
+    return used;
+  }
+
+  /**
+   * Sets the used Space for Long.
+   *
+   * @param dfsUsedSpace - dfsUsedSpace for this volume.
+   */
+  public void setUsed(long dfsUsedSpace) {
+    Preconditions.checkArgument(dfsUsedSpace < this.getCapacity());
+    this.used = dfsUsedSpace;
+  }
+
+  /**
+   * Gets the uuid for this volume.
+   *
+   * @return String - uuid of th volume
+   */
+  public String getUuid() {
+    return uuid;
+  }
+
+  /**
+   * Sets the uuid for this volume.
+   *
+   * @param id - String
+   */
+  public void setUuid(String id) {
+    this.uuid = id;
+  }
+
+  /**
+   * Returns effective capacity of a volume.
+   *
+   * @return float - fraction that represents used capacity.
+   */
+  @JsonIgnore
+  public long computeEffectiveCapacity() {
+    return getCapacity() - getReserved();
+  }
+
+  /**
+   * returns a Json String.
+   *
+   * @return String
+   *
+   * @throws IOException
+   */
+  public String toJson() throws IOException {
+    ObjectMapper mapper = new ObjectMapper();
+    return mapper.writeValueAsString(this);
+  }
+
+  /**
+   * returns if we should skip this volume.
+   * @return true / false
+   */
+  public boolean isSkip() {
+    return skip;
+  }
+
+  /**
+   * Sets the Skip value for this volume.
+   * @param skipValue bool
+   */
+  public void setSkip(boolean skipValue) {
+    this.skip = skipValue;
+  }
+
+  /**
+   * Returns the usedPercentage of a disk.
+   * This is useful in debugging disk usage
+   * @return float
+   */
+  public float computeUsedPercentage() {
+    return (float) (getUsed()) / (float) (getCapacity());
+  }
+
+  /**
+   * Tells us if a volume is transient.
+   * @param transientValue
+   */
+  public void setIsTransient(boolean transientValue) {
+    this.isTransient = transientValue;
+  }
+
+  /**
+   * Tells us if this volume is read-only.
+   * @return true / false
+   */
+  public boolean isReadOnly() {
+    return isReadOnly;
+  }
+
+  /**
+   * Sets this volume as read only.
+   * @param readOnly - boolean
+   */
+  public void setReadOnly(boolean readOnly) {
+    isReadOnly = readOnly;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00537b8f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java
new file mode 100644
index 0000000..15c21ac
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer.datamodel;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.htrace.fasterxml.jackson.annotation.JsonProperty;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+
+/**
+ * DiskBalancerVolumeSet is a collection of storage devices on the
+ * data node which are of similar StorageType.
+ */
+@JsonIgnoreProperties({"sortedQueue", "volumeCount", "idealUsed"})
+public class DiskBalancerVolumeSet {
+  static final Log LOG = LogFactory.getLog(DiskBalancerVolumeSet.class);
+  private final int maxDisks = 256;
+
+  @JsonProperty("transient")
+  private boolean isTransient;
+  private Set<DiskBalancerVolume> volumes;
+
+  @JsonIgnore
+  private TreeSet<DiskBalancerVolume> sortedQueue;
+  private String storageType;
+  private String setID;
+
+  private float idealUsed;
+
+
+  /**
+   * Constructs Empty DiskNBalanceVolumeSet.
+   * This is needed by jackson
+   */
+  public DiskBalancerVolumeSet() {
+    setID = UUID.randomUUID().toString();
+  }
+
+  /**
+   * Constructs a DiskBalancerVolumeSet.
+   *
+   * @param isTransient - boolean
+   */
+  public DiskBalancerVolumeSet(boolean isTransient) {
+    this.isTransient = isTransient;
+    volumes = new HashSet<>(maxDisks);
+    sortedQueue = new TreeSet<>(new MinHeap());
+    this.storageType = null;
+    setID = UUID.randomUUID().toString();
+  }
+
+  /**
+   * Constructs a new DiskBalancerVolumeSet.
+   */
+  public DiskBalancerVolumeSet(DiskBalancerVolumeSet volumeSet) {
+    this.isTransient = volumeSet.isTransient();
+    this.storageType = volumeSet.storageType;
+    this.volumes = new HashSet<>(volumeSet.volumes);
+    sortedQueue = new TreeSet<>(new MinHeap());
+    setID = UUID.randomUUID().toString();
+  }
+
+  /**
+   * Tells us if this volumeSet is transient.
+   *
+   * @return - true or false
+   */
+  @JsonProperty("transient")
+  public boolean isTransient() {
+    return isTransient;
+  }
+
+  /**
+   * Set the transient properties for this volumeSet.
+   *
+   * @param transientValue - Boolean
+   */
+  @JsonProperty("transient")
+  public void setTransient(boolean transientValue) {
+    this.isTransient = transientValue;
+  }
+
+  /**
+   * Computes Volume Data Density. Adding a new volume changes
+   * the volumeDataDensity for all volumes. So we throw away
+   * our priority queue and recompute everything.
+   *
+   * we discard failed volumes from this computation.
+   *
+   * totalCapacity = totalCapacity of this volumeSet
+   * totalUsed = totalDfsUsed for this volumeSet
+   * idealUsed = totalUsed / totalCapacity
+   * dfsUsedRatio = dfsUsedOnAVolume / Capacity On that Volume
+   * volumeDataDensity = idealUsed - dfsUsedRatio
+   */
+  public void computeVolumeDataDensity() {
+    long totalCapacity = 0;
+    long totalUsed = 0;
+    sortedQueue.clear();
+
+    // when we plan to re-distribute data we need to make
+    // sure that we skip failed volumes.
+    for (DiskBalancerVolume volume : volumes) {
+      if (!volume.isFailed() && !volume.isSkip()) {
+
+        if (volume.computeEffectiveCapacity() < 0) {
+          skipMisConfiguredVolume(volume);
+          continue;
+        }
+
+        totalCapacity += volume.computeEffectiveCapacity();
+        totalUsed += volume.getUsed();
+      }
+    }
+
+    if (totalCapacity != 0) {
+      this.idealUsed = totalUsed / (float) totalCapacity;
+    }
+
+    for (DiskBalancerVolume volume : volumes) {
+      if (!volume.isFailed() && !volume.isSkip()) {
+        float dfsUsedRatio =
+            volume.getUsed() / (float) volume.computeEffectiveCapacity();
+        volume.setVolumeDataDensity(this.idealUsed - dfsUsedRatio);
+        sortedQueue.add(volume);
+      }
+    }
+  }
+
+  private void skipMisConfiguredVolume(DiskBalancerVolume volume) {
+    //probably points to some sort of mis-configuration. Log this and skip
+    // processing this volume.
+    String errMessage = String.format("Real capacity is negative." +
+                                          "This usually points to some " +
+                                          "kind of mis-configuration.%n" +
+                                          "Capacity : %d Reserved : %d " +
+                                          "realCap = capacity - " +
+                                          "reserved = %d.%n" +
+                                          "Skipping this volume from " +
+                                          "all processing. type : %s id" +
+                                          " :%s",
+                                      volume.getCapacity(),
+                                      volume.getReserved(),
+                                      volume.computeEffectiveCapacity(),
+                                      volume.getStorageType(),
+                                      volume.getUuid());
+
+    LOG.fatal(errMessage);
+    volume.setSkip(true);
+  }
+
+  /**
+   * Returns the number of volumes in the Volume Set.
+   *
+   * @return int
+   */
+  @JsonIgnore
+  public int getVolumeCount() {
+    return volumes.size();
+  }
+
+  /**
+   * Get Storage Type.
+   *
+   * @return String
+   */
+  public String getStorageType() {
+    return storageType;
+  }
+
+  /**
+   * Set Storage Type.
+   * @param typeOfStorage -- StorageType
+   */
+  public void setStorageType(String typeOfStorage) {
+    this.storageType = typeOfStorage;
+  }
+
+  /**
+   * adds a given volume into this volume set.
+   *
+   * @param volume - volume to add.
+   *
+   * @throws Exception
+   */
+  public void addVolume(DiskBalancerVolume volume) throws Exception {
+    Preconditions.checkNotNull(volume, "volume cannot be null");
+    Preconditions.checkState(isTransient() == volume.isTransient(),
+                             "Mismatch in volumeSet and volume's transient " +
+                                 "properties.");
+
+
+    if (this.storageType == null) {
+      Preconditions.checkState(volumes.size() == 0L, "Storage Type is Null but"
+          + " volume size is " + volumes.size());
+      this.storageType = volume.getStorageType();
+    } else {
+      Preconditions.checkState(this.storageType.equals(volume.getStorageType()),
+                               "Adding wrong type of disk to this volume set");
+    }
+    volumes.add(volume);
+    computeVolumeDataDensity();
+
+  }
+
+  /**
+   * Returns a list diskVolumes that are part of this volume set.
+   *
+   * @return List
+   */
+  public List<DiskBalancerVolume> getVolumes() {
+    return new ArrayList<>(volumes);
+  }
+
+
+  @JsonIgnore
+  public TreeSet<DiskBalancerVolume> getSortedQueue() {
+    return sortedQueue;
+  }
+
+  /**
+   * Computes whether we need to do any balancing on this volume Set at all.
+   * It checks if any disks are out of threshold value
+   *
+   * @param thresholdPercentage - threshold - in percentage
+   *
+   * @return true if balancing is needed false otherwise.
+   */
+  public boolean isBalancingNeeded(float thresholdPercentage) {
+    float threshold = thresholdPercentage / 100.0f;
+
+    if(volumes == null || volumes.size() <= 1) {
+      // there is nothing we can do with a single volume.
+      // so no planning needed.
+      return false;
+    }
+
+    for (DiskBalancerVolume vol : volumes) {
+      boolean notSkip = !vol.isFailed() && !vol.isTransient() && !vol.isSkip();
+      if ((Math.abs(vol.getVolumeDataDensity()) > threshold) && notSkip) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Remove a volume from the current set.
+   *
+   * This call does not recompute the volumeDataDensity. It has to be
+   * done manually after this call.
+   *
+   * @param volume - Volume to remove
+   */
+  public void removeVolume(DiskBalancerVolume volume) {
+    volumes.remove(volume);
+    sortedQueue.remove(volume);
+  }
+
+  /**
+   * Get Volume Set ID.
+   * @return String
+   */
+  public String getSetID() {
+    return setID;
+  }
+
+  /**
+   * Set VolumeSet ID.
+   * @param volID String
+   */
+  public void setSetID(String volID) {
+    this.setID = volID;
+  }
+
+  /**
+   * Gets the idealUsed for this volume set.
+   */
+
+  @JsonIgnore
+  public float getIdealUsed() {
+    return this.idealUsed;
+  }
+
+  static class MinHeap implements Comparator<DiskBalancerVolume>, Serializable {
+
+    /**
+     * Compares its two arguments for order.  Returns a negative integer,
+     * zero, or a positive integer as the first argument is less than, equal
+     * to, or greater than the second.
+     */
+    @Override
+    public int compare(DiskBalancerVolume first, DiskBalancerVolume second) {
+      return Float
+          .compare(second.getVolumeDataDensity(), first.getVolumeDataDensity());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00537b8f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/package-info.java
new file mode 100644
index 0000000..f72e283
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/package-info.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer.datamodel;
+/**
+ * Disk Balancer Data Model is the Data Model for the cluster that
+ * Disk Balancer is working against. This information is read
+ * directly from NameNode or from a user supplied json model file.
+ *
+ * Here is the overview of the model maintained by diskBalancer.
+ *
+ * DiskBalancerCluster is a list of DiskBalancerDataNodes.
+ * DiskBalancerDataNodes is a collection of DiskBalancerVolumeSets
+ * DiskBalancerVolumeSets is a collection of DiskBalancerVolumes
+ * DiskBalancerVolumes represents actual volumes on DataNodes.
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00537b8f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/package-info.java
new file mode 100644
index 0000000..4bec98f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/package-info.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer;
+/**
+ * Disk Balancer connects to a {@link org.apache.hadoop.hdfs.server.datanode
+ * .DataNode} and attempts to spread data across all volumes evenly.
+ *
+ * This is achieved by :
+ *
+ * 1) Calculating the average data that should be on a set of volumes grouped
+ * by the type. For example, how much data should be on each volume of SSDs on a
+ * machine.
+ *
+ * 2) Once we know the average data that is expected to be on a volume we
+ * move data from volumes with higher than average load to volumes with
+ * less than average load.
+ *
+ * 3) Disk Balancer operates against data nodes which are live and operational.
+ *
+ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00537b8f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java
new file mode 100644
index 0000000..5e3f4bf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.server.diskbalancer.connectors.NullConnector;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet;
+import org.apache.hadoop.util.Time;
+
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Helper class to create various cluster configrations at run time.
+ */
+public class DiskBalancerTestUtil {
+  // we modeling disks here, hence HDD style units
+  public static final long GB = 1000000000L;
+  public static final long TB = 1000000000000L;
+  private static int[] diskSizes =
+      {1, 2, 3, 4, 5, 6, 7, 8, 9, 100, 200, 300, 400, 500, 600, 700, 800, 900};
+  Random rand;
+  private String stringTable =
+      "ABCDEDFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0987654321";
+
+  /**
+   * Constructs a util class.
+   */
+  public DiskBalancerTestUtil() {
+    this.rand = new Random(Time.monotonicNow());
+  }
+
+  /**
+   * Returns a random string.
+   *
+   * @param length - Number of chars in the string
+   *
+   * @return random String
+   */
+  private String getRandomName(int length) {
+    StringBuilder name = new StringBuilder();
+    for (int x = 0; x < length; x++) {
+      name.append(stringTable.charAt(rand.nextInt(stringTable.length())));
+    }
+    return name.toString();
+  }
+
+  /**
+   * Returns a Random Storage Type.
+   *
+   * @return - StorageType
+   */
+  private StorageType getRandomStorageType() {
+    return StorageType.parseStorageType(rand.nextInt(3));
+  }
+
+  /**
+   * Returns random capacity, if the size is smaller than 10
+   * they are TBs otherwise the size is assigned to GB range.
+   *
+   * @return Long - Disk Size
+   */
+  private long getRandomCapacity() {
+    int size = diskSizes[rand.nextInt(diskSizes.length)];
+    if (size < 10) {
+      return size * TB;
+    } else {
+      return size * GB;
+    }
+  }
+
+  /**
+   * Some value under 20% in these tests.
+   */
+  private long getRandomReserved(long capacity) {
+    double rcap = capacity * 0.2d;
+    double randDouble = rand.nextDouble();
+    double temp = randDouble * rcap;
+    return (new Double(temp)).longValue();
+
+  }
+
+  /**
+   * Some value less that capacity - reserved.
+   */
+  private long getRandomDfsUsed(long capacity, long reserved) {
+    double rcap = capacity - reserved;
+    double randDouble = rand.nextDouble();
+    double temp = randDouble * rcap;
+    return (new Double(temp)).longValue();
+  }
+
+  /**
+   * Creates a Random Volume of a specific storageType.
+   *
+   * @return Volume
+   */
+  public DiskBalancerVolume createRandomVolume() {
+    return createRandomVolume(getRandomStorageType());
+  }
+
+  /**
+   * Creates a Random Volume for testing purpose.
+   *
+   * @param type - StorageType
+   *
+   * @return DiskBalancerVolume
+   */
+  public DiskBalancerVolume createRandomVolume(StorageType type) {
+    DiskBalancerVolume volume = new DiskBalancerVolume();
+    volume.setPath("/tmp/disk/" + getRandomName(10));
+    volume.setStorageType(type.toString());
+    volume.setTransient(type.isTransient());
+
+    volume.setCapacity(getRandomCapacity());
+    volume.setReserved(getRandomReserved(volume.getCapacity()));
+    volume
+        .setUsed(getRandomDfsUsed(volume.getCapacity(), volume.getReserved()));
+    volume.setUuid(UUID.randomUUID().toString());
+    return volume;
+  }
+
+  /**
+   * Creates a RandomVolumeSet.
+   *
+   * @param type -Storage Type
+   * @param diskCount - How many disks you need.
+   *
+   * @return volumeSet
+   *
+   * @throws Exception
+   */
+  public DiskBalancerVolumeSet createRandomVolumeSet(StorageType type,
+                                                     int diskCount)
+      throws Exception {
+
+    Preconditions.checkState(diskCount > 0);
+    DiskBalancerVolumeSet volumeSet =
+        new DiskBalancerVolumeSet(type.isTransient());
+    for (int x = 0; x < diskCount; x++) {
+      volumeSet.addVolume(createRandomVolume(type));
+    }
+    assert (volumeSet.getVolumeCount() == diskCount);
+    return volumeSet;
+  }
+
+  /**
+   * Creates a RandomDataNode.
+   *
+   * @param diskTypes - Storage types needed in the Node
+   * @param diskCount - Disk count - that many disks of each type is created
+   *
+   * @return DataNode
+   *
+   * @throws Exception
+   */
+  public DiskBalancerDataNode createRandomDataNode(StorageType[] diskTypes,
+                                                   int diskCount)
+      throws Exception {
+    Preconditions.checkState(diskTypes.length > 0);
+    Preconditions.checkState(diskCount > 0);
+
+    DiskBalancerDataNode node =
+        new DiskBalancerDataNode(UUID.randomUUID().toString());
+
+    for (StorageType t : diskTypes) {
+      DiskBalancerVolumeSet vSet = createRandomVolumeSet(t, diskCount);
+      for (DiskBalancerVolume v : vSet.getVolumes()) {
+        node.addVolume(v);
+      }
+    }
+    return node;
+  }
+
+  /**
+   * Creates a RandomCluster.
+   *
+   * @param dataNodeCount - How many nodes you need
+   * @param diskTypes - StorageTypes you need in each node
+   * @param diskCount - How many disks you need of each type.
+   *
+   * @return Cluster
+   *
+   * @throws Exception
+   */
+  public DiskBalancerCluster createRandCluster(int dataNodeCount,
+                                               StorageType[] diskTypes,
+                                               int diskCount)
+
+      throws Exception {
+    Preconditions.checkState(diskTypes.length > 0);
+    Preconditions.checkState(diskCount > 0);
+    Preconditions.checkState(dataNodeCount > 0);
+    NullConnector nullConnector = new NullConnector();
+    DiskBalancerCluster cluster = new DiskBalancerCluster(nullConnector);
+
+    // once we add these nodes into the connector, cluster will read them
+    // from the connector.
+    for (int x = 0; x < dataNodeCount; x++) {
+      nullConnector.addNode(createRandomDataNode(diskTypes, diskCount));
+    }
+
+    // with this call we have populated the cluster info
+    cluster.readClusterInfo();
+    return cluster;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00537b8f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDataModels.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDataModels.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDataModels.java
new file mode 100644
index 0000000..3507c96
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDataModels.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdfs.server.diskbalancer;
+
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TreeSet;
+import java.util.UUID;
+
+public class TestDataModels {
+  @Test
+  public void TestCreateRandomVolume() throws Exception {
+    DiskBalancerTestUtil util = new DiskBalancerTestUtil();
+    DiskBalancerVolume vol = util.createRandomVolume(StorageType.DISK);
+    Assert.assertNotNull(vol.getUuid());
+    Assert.assertNotNull(vol.getPath());
+    Assert.assertNotNull(vol.getStorageType());
+    Assert.assertFalse(vol.isFailed());
+    Assert.assertFalse(vol.isTransient());
+    Assert.assertTrue(vol.getCapacity() > 0);
+    Assert.assertTrue((vol.getCapacity() - vol.getReserved()) > 0);
+    Assert.assertTrue((vol.getReserved() + vol.getUsed()) < vol.getCapacity());
+  }
+
+  @Test
+  public void TestCreateRandomVolumeSet() throws Exception {
+    DiskBalancerTestUtil util = new DiskBalancerTestUtil();
+    DiskBalancerVolumeSet vSet =
+        util.createRandomVolumeSet(StorageType.SSD, 10);
+    Assert.assertEquals(10, vSet.getVolumeCount());
+    Assert.assertEquals(StorageType.SSD.toString(),
+        vSet.getVolumes().get(0).getStorageType());
+
+  }
+
+  @Test
+  public void TestCreateRandomDataNode() throws Exception {
+    DiskBalancerTestUtil util = new DiskBalancerTestUtil();
+    DiskBalancerDataNode node = util.createRandomDataNode(
+        new StorageType[]{StorageType.DISK, StorageType.RAM_DISK}, 10);
+    Assert.assertNotNull(node.getNodeDataDensity());
+  }
+
+  @Test
+  public void TestDiskQueues() throws Exception {
+    DiskBalancerTestUtil util = new DiskBalancerTestUtil();
+    DiskBalancerDataNode node = util.createRandomDataNode(
+        new StorageType[]{StorageType.DISK, StorageType.RAM_DISK}, 3);
+    TreeSet<DiskBalancerVolume> sortedQueue =
+        node.getVolumeSets().get(StorageType.DISK.toString()).getSortedQueue();
+
+    List<DiskBalancerVolume> reverseList = new LinkedList<>();
+    List<DiskBalancerVolume> highList = new LinkedList<>();
+    int queueSize = sortedQueue.size();
+    for (int x = 0; x < queueSize; x++) {
+      reverseList.add(sortedQueue.first());
+      highList.add(sortedQueue.first());
+    }
+    Collections.reverse(reverseList);
+
+    for (int x = 0; x < queueSize; x++) {
+
+      Assert.assertEquals(reverseList.get(x).getCapacity(),
+          highList.get(x).getCapacity());
+      Assert.assertEquals(reverseList.get(x).getReserved(),
+          highList.get(x).getReserved());
+      Assert.assertEquals(reverseList.get(x).getUsed(),
+          highList.get(x).getUsed());
+    }
+  }
+
+  @Test
+  public void TestNoBalancingNeededEvenDataSpread() throws Exception {
+    DiskBalancerTestUtil util = new DiskBalancerTestUtil();
+    DiskBalancerDataNode node =
+        new DiskBalancerDataNode(UUID.randomUUID().toString());
+
+    // create two disks which have exactly same data and isBalancing should
+    // say we don't need to balance.
+    DiskBalancerVolume v1 = util.createRandomVolume(StorageType.SSD);
+    v1.setCapacity(DiskBalancerTestUtil.TB);
+    v1.setReserved(100 * DiskBalancerTestUtil.GB);
+    v1.setUsed(500 * DiskBalancerTestUtil.GB);
+
+    DiskBalancerVolume v2 = util.createRandomVolume(StorageType.SSD);
+    v2.setCapacity(DiskBalancerTestUtil.TB);
+    v2.setReserved(100 * DiskBalancerTestUtil.GB);
+    v2.setUsed(500 * DiskBalancerTestUtil.GB);
+
+    node.addVolume(v1);
+    node.addVolume(v2);
+
+    for (DiskBalancerVolumeSet vsets : node.getVolumeSets().values()) {
+      Assert.assertFalse(vsets.isBalancingNeeded(10.0f));
+    }
+  }
+
+  @Test
+  public void TestNoBalancingNeededTransientDisks() throws Exception {
+    DiskBalancerTestUtil util = new DiskBalancerTestUtil();
+    DiskBalancerDataNode node =
+        new DiskBalancerDataNode(UUID.randomUUID().toString());
+
+    // create two disks which have different data sizes, but
+    // transient. isBalancing should say no balancing needed.
+    DiskBalancerVolume v1 = util.createRandomVolume(StorageType.RAM_DISK);
+    v1.setCapacity(DiskBalancerTestUtil.TB);
+    v1.setReserved(100 * DiskBalancerTestUtil.GB);
+    v1.setUsed(1 * DiskBalancerTestUtil.GB);
+
+    DiskBalancerVolume v2 = util.createRandomVolume(StorageType.RAM_DISK);
+    v2.setCapacity(DiskBalancerTestUtil.TB);
+    v2.setReserved(100 * DiskBalancerTestUtil.GB);
+    v2.setUsed(500 * DiskBalancerTestUtil.GB);
+
+    node.addVolume(v1);
+    node.addVolume(v2);
+
+    for (DiskBalancerVolumeSet vsets : node.getVolumeSets().values()) {
+      Assert.assertFalse(vsets.isBalancingNeeded(10.0f));
+    }
+  }
+
+  @Test
+  public void TestNoBalancingNeededFailedDisks() throws Exception {
+    DiskBalancerTestUtil util = new DiskBalancerTestUtil();
+    DiskBalancerDataNode node =
+        new DiskBalancerDataNode(UUID.randomUUID().toString());
+
+    // create two disks which have which are normal disks, but fail
+    // one of them. VolumeSet should say no balancing needed.
+    DiskBalancerVolume v1 = util.createRandomVolume(StorageType.SSD);
+    v1.setCapacity(DiskBalancerTestUtil.TB);
+    v1.setReserved(100 * DiskBalancerTestUtil.GB);
+    v1.setUsed(1 * DiskBalancerTestUtil.GB);
+    v1.setFailed(true);
+
+    DiskBalancerVolume v2 = util.createRandomVolume(StorageType.SSD);
+    v2.setCapacity(DiskBalancerTestUtil.TB);
+    v2.setReserved(100 * DiskBalancerTestUtil.GB);
+    v2.setUsed(500 * DiskBalancerTestUtil.GB);
+
+    node.addVolume(v1);
+    node.addVolume(v2);
+
+    for (DiskBalancerVolumeSet vsets : node.getVolumeSets().values()) {
+      Assert.assertFalse(vsets.isBalancingNeeded(10.0f));
+    }
+  }
+
+  @Test
+  public void TestNeedBalancingUnevenDataSpread() throws Exception {
+    DiskBalancerTestUtil util = new DiskBalancerTestUtil();
+    DiskBalancerDataNode node =
+        new DiskBalancerDataNode(UUID.randomUUID().toString());
+
+    DiskBalancerVolume v1 = util.createRandomVolume(StorageType.SSD);
+    v1.setCapacity(DiskBalancerTestUtil.TB);
+    v1.setReserved(100 * DiskBalancerTestUtil.GB);
+    v1.setUsed(0);
+
+    DiskBalancerVolume v2 = util.createRandomVolume(StorageType.SSD);
+    v2.setCapacity(DiskBalancerTestUtil.TB);
+    v2.setReserved(100 * DiskBalancerTestUtil.GB);
+    v2.setUsed(500 * DiskBalancerTestUtil.GB);
+
+    node.addVolume(v1);
+    node.addVolume(v2);
+
+    for (DiskBalancerVolumeSet vsets : node.getVolumeSets().values()) {
+      Assert.assertTrue(vsets.isBalancingNeeded(10.0f));
+    }
+  }
+
+  @Test
+  public void TestVolumeSerialize() throws Exception {
+    DiskBalancerTestUtil util = new DiskBalancerTestUtil();
+    DiskBalancerVolume volume = util.createRandomVolume(StorageType.DISK);
+    String originalString = volume.toJson();
+    DiskBalancerVolume parsedVolume =
+        DiskBalancerVolume.parseJson(originalString);
+    String parsedString = parsedVolume.toJson();
+    Assert.assertEquals(originalString, parsedString);
+  }
+
+  @Test
+  public void TestClusterSerialize() throws Exception {
+    DiskBalancerTestUtil util = new DiskBalancerTestUtil();
+
+    // Create a Cluster with 3 datanodes, 3 disk types and 3 disks in each type
+    // that is 9 disks in each machine.
+    DiskBalancerCluster cluster = util.createRandCluster(3, new StorageType[]{
+        StorageType.DISK, StorageType.RAM_DISK, StorageType.SSD}, 3);
+
+    DiskBalancerCluster newCluster =
+        DiskBalancerCluster.parseJson(cluster.toJson());
+    Assert.assertEquals(cluster.getNodes(), newCluster.getNodes());
+    Assert
+        .assertEquals(cluster.getNodes().size(), newCluster.getNodes().size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00537b8f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/NullConnector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/NullConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/NullConnector.java
new file mode 100644
index 0000000..3f78530
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/NullConnector.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdfs.server.diskbalancer.connectors;
+
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * This connector allows user to create an in-memory cluster
+ * and is useful in testing.
+ */
+public class NullConnector implements ClusterConnector {
+  private final List<DiskBalancerDataNode> nodes = new LinkedList<>();
+
+  /**
+   * getNodes function returns a list of DiskBalancerDataNodes.
+   *
+   * @return Array of DiskBalancerDataNodes
+   */
+  @Override
+  public List<DiskBalancerDataNode> getNodes() throws Exception {
+    return nodes;
+  }
+
+  /**
+   * Returns info about the connector.
+   *
+   * @return String.
+   */
+  @Override
+  public String getConnectorInfo() {
+    return "Null Connector : No persistence, in-memory connector";
+  }
+
+  /**
+   * Allows user to add nodes into this connector.
+   *
+   * @param node - Node to add
+   */
+  public void addNode(DiskBalancerDataNode node) {
+    nodes.add(node);
+  }
+}


Mime
View raw message