hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1555021 [3/15] - in /hadoop/common/branches/HDFS-5535/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/ hadoop-hdfs/dev-support/ hadoop-hdfs/src/main/java/ hadoop-hdfs/src/main/java/org/apache...
Date Fri, 03 Jan 2014 07:27:01 GMT
Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java Fri Jan  3 07:26:52 2014
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.hdfs.protocol;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import javax.annotation.Nullable;
@@ -32,14 +30,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.InvalidRequestException;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.permission.PermissionStatus;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
-import org.apache.hadoop.hdfs.util.XMLUtils;
-import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
-import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
-import org.apache.hadoop.io.Text;
-import org.xml.sax.ContentHandler;
-import org.xml.sax.SAXException;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration;
 
 /**
  * CachePoolInfo describes a cache pool.
@@ -52,6 +43,20 @@ import org.xml.sax.SAXException;
 public class CachePoolInfo {
   public static final Log LOG = LogFactory.getLog(CachePoolInfo.class);
 
+  /**
+   * Indicates that the pool does not have a maximum relative expiry.
+   */
+  public static final long RELATIVE_EXPIRY_NEVER =
+      Expiration.MAX_RELATIVE_EXPIRY_MS;
+  /**
+   * Default max relative expiry for cache pools.
+   */
+  public static final long DEFAULT_MAX_RELATIVE_EXPIRY =
+      RELATIVE_EXPIRY_NEVER;
+
+  public static final long LIMIT_UNLIMITED = Long.MAX_VALUE;
+  public static final long DEFAULT_LIMIT = LIMIT_UNLIMITED;
+
   final String poolName;
 
   @Nullable
@@ -64,16 +69,26 @@ public class CachePoolInfo {
   FsPermission mode;
 
   @Nullable
-  Integer weight;
+  Long limit;
+
+  @Nullable
+  Long maxRelativeExpiryMs;
 
   public CachePoolInfo(String poolName) {
     this.poolName = poolName;
   }
-  
+
+  /**
+   * @return Name of the pool.
+   */
   public String getPoolName() {
     return poolName;
   }
 
+  /**
+   * @return The owner of the pool. Along with the group and mode, determines
+   *         who has access to view and modify the pool.
+   */
   public String getOwnerName() {
     return ownerName;
   }
@@ -83,6 +98,10 @@ public class CachePoolInfo {
     return this;
   }
 
+  /**
+   * @return The group of the pool. Along with the owner and mode, determines
+   *         who has access to view and modify the pool.
+   */
   public String getGroupName() {
     return groupName;
   }
@@ -91,7 +110,11 @@ public class CachePoolInfo {
     this.groupName = groupName;
     return this;
   }
-  
+
+  /**
+   * @return Unix-style permissions of the pool. Along with the owner and group,
+   *         determines who has access to view and modify the pool.
+   */
   public FsPermission getMode() {
     return mode;
   }
@@ -101,12 +124,36 @@ public class CachePoolInfo {
     return this;
   }
 
-  public Integer getWeight() {
-    return weight;
+  /**
+   * @return The maximum aggregate number of bytes that can be cached by
+   *         directives in this pool.
+   */
+  public Long getLimit() {
+    return limit;
+  }
+
+  public CachePoolInfo setLimit(Long bytes) {
+    this.limit = bytes;
+    return this;
   }
 
-  public CachePoolInfo setWeight(Integer weight) {
-    this.weight = weight;
+  /**
+   * @return The maximum relative expiration of directives of this pool in
+   *         milliseconds
+   */
+  public Long getMaxRelativeExpiryMs() {
+    return maxRelativeExpiryMs;
+  }
+
+  /**
+   * Set the maximum relative expiration of directives of this pool in
+   * milliseconds.
+   * 
+   * @param ms in milliseconds
+   * @return This builder, for call chaining.
+   */
+  public CachePoolInfo setMaxRelativeExpiryMs(Long ms) {
+    this.maxRelativeExpiryMs = ms;
     return this;
   }
 
@@ -117,7 +164,8 @@ public class CachePoolInfo {
       append(", groupName:").append(groupName).
       append(", mode:").append((mode == null) ? "null" :
           String.format("0%03o", mode.toShort())).
-      append(", weight:").append(weight).
+      append(", limit:").append(limit).
+      append(", maxRelativeExpiryMs:").append(maxRelativeExpiryMs).
       append("}").toString();
   }
   
@@ -134,7 +182,8 @@ public class CachePoolInfo {
         append(ownerName, other.ownerName).
         append(groupName, other.groupName).
         append(mode, other.mode).
-        append(weight, other.weight).
+        append(limit, other.limit).
+        append(maxRelativeExpiryMs, other.maxRelativeExpiryMs).
         isEquals();
   }
 
@@ -145,7 +194,8 @@ public class CachePoolInfo {
         append(ownerName).
         append(groupName).
         append(mode).
-        append(weight).
+        append(limit).
+        append(maxRelativeExpiryMs).
         hashCode();
   }
 
@@ -153,8 +203,17 @@ public class CachePoolInfo {
     if (info == null) {
       throw new InvalidRequestException("CachePoolInfo is null");
     }
-    if ((info.getWeight() != null) && (info.getWeight() < 0)) {
-      throw new InvalidRequestException("CachePool weight is negative.");
+    if ((info.getLimit() != null) && (info.getLimit() < 0)) {
+      throw new InvalidRequestException("Limit is negative.");
+    }
+    if (info.getMaxRelativeExpiryMs() != null) {
+      long maxRelativeExpiryMs = info.getMaxRelativeExpiryMs();
+      if (maxRelativeExpiryMs < 0l) {
+        throw new InvalidRequestException("Max relative expiry is negative.");
+      }
+      if (maxRelativeExpiryMs > Expiration.MAX_RELATIVE_EXPIRY_MS) {
+        throw new InvalidRequestException("Max relative expiry is too big.");
+      }
     }
     validateName(info.poolName);
   }
@@ -167,66 +226,4 @@ public class CachePoolInfo {
       throw new IOException("invalid empty cache pool name");
     }
   }
-
-  public static CachePoolInfo readFrom(DataInput in) throws IOException {
-    String poolName = Text.readString(in);
-    CachePoolInfo info = new CachePoolInfo(poolName);
-    if (in.readBoolean()) {
-      info.setOwnerName(Text.readString(in));
-    }
-    if (in.readBoolean())  {
-      info.setGroupName(Text.readString(in));
-    }
-    if (in.readBoolean()) {
-      info.setMode(FsPermission.read(in));
-    }
-    if (in.readBoolean()) {
-      info.setWeight(in.readInt());
-    }
-    return info;
-  }
-
-  public void writeTo(DataOutput out) throws IOException {
-    Text.writeString(out, poolName);
-    boolean hasOwner, hasGroup, hasMode, hasWeight;
-    hasOwner = ownerName != null;
-    hasGroup = groupName != null;
-    hasMode = mode != null;
-    hasWeight = weight != null;
-    out.writeBoolean(hasOwner);
-    if (hasOwner) {
-      Text.writeString(out, ownerName);
-    }
-    out.writeBoolean(hasGroup);
-    if (hasGroup) {
-      Text.writeString(out, groupName);
-    }
-    out.writeBoolean(hasMode);
-    if (hasMode) {
-      mode.write(out);
-    }
-    out.writeBoolean(hasWeight);
-    if (hasWeight) {
-      out.writeInt(weight);
-    }
-  }
-
-  public void writeXmlTo(ContentHandler contentHandler) throws SAXException {
-    XMLUtils.addSaxString(contentHandler, "POOLNAME", poolName);
-    PermissionStatus perm = new PermissionStatus(ownerName,
-        groupName, mode);
-    FSEditLogOp.permissionStatusToXml(contentHandler, perm);
-    XMLUtils.addSaxString(contentHandler, "WEIGHT", Integer.toString(weight));
-  }
-
-  public static CachePoolInfo readXmlFrom(Stanza st) throws InvalidXmlException {
-    String poolName = st.getValue("POOLNAME");
-    PermissionStatus perm = FSEditLogOp.permissionStatusFromXml(st);
-    int weight = Integer.parseInt(st.getValue("WEIGHT"));
-    return new CachePoolInfo(poolName).
-        setOwnerName(perm.getUserName()).
-        setGroupName(perm.getGroupName()).
-        setMode(perm.getPermission()).
-        setWeight(weight);
-  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolStats.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolStats.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolStats.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolStats.java Fri Jan  3 07:26:52 2014
@@ -30,6 +30,7 @@ public class CachePoolStats {
   public static class Builder {
     private long bytesNeeded;
     private long bytesCached;
+    private long bytesOverlimit;
     private long filesNeeded;
     private long filesCached;
 
@@ -46,6 +47,11 @@ public class CachePoolStats {
       return this;
     }
 
+    public Builder setBytesOverlimit(long bytesOverlimit) {
+      this.bytesOverlimit = bytesOverlimit;
+      return this;
+    }
+
     public Builder setFilesNeeded(long filesNeeded) {
       this.filesNeeded = filesNeeded;
       return this;
@@ -57,20 +63,22 @@ public class CachePoolStats {
     }
 
     public CachePoolStats build() {
-      return new CachePoolStats(bytesNeeded, bytesCached, filesNeeded,
-          filesCached);
+      return new CachePoolStats(bytesNeeded, bytesCached, bytesOverlimit,
+          filesNeeded, filesCached);
     }
   };
 
   private final long bytesNeeded;
   private final long bytesCached;
+  private final long bytesOverlimit;
   private final long filesNeeded;
   private final long filesCached;
 
-  private CachePoolStats(long bytesNeeded, long bytesCached, long filesNeeded,
-      long filesCached) {
+  private CachePoolStats(long bytesNeeded, long bytesCached,
+      long bytesOverlimit, long filesNeeded, long filesCached) {
     this.bytesNeeded = bytesNeeded;
     this.bytesCached = bytesCached;
+    this.bytesOverlimit = bytesOverlimit;
     this.filesNeeded = filesNeeded;
     this.filesCached = filesCached;
   }
@@ -83,6 +91,10 @@ public class CachePoolStats {
     return bytesCached;
   }
 
+  public long getBytesOverlimit() {
+    return bytesOverlimit;
+  }
+
   public long getFilesNeeded() {
     return filesNeeded;
   }
@@ -95,6 +107,7 @@ public class CachePoolStats {
     return new StringBuilder().append("{").
       append("bytesNeeded:").append(bytesNeeded).
       append(", bytesCached:").append(bytesCached).
+      append(", bytesOverlimit:").append(bytesOverlimit).
       append(", filesNeeded:").append(filesNeeded).
       append(", filesCached:").append(filesCached).
       append("}").toString();

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Fri Jan  3 07:26:52 2014
@@ -19,9 +19,11 @@ package org.apache.hadoop.hdfs.protocol;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.EnumSet;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -354,7 +356,8 @@ public interface ClientProtocol {
    */
   @Idempotent
   public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
-      final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
+      final DatanodeInfo[] existings, final String[] existingStorageIDs,
+      final DatanodeInfo[] excludes,
       final int numAdditionalNodes, final String clientName
       ) throws AccessControlException, FileNotFoundException,
           SafeModeException, UnresolvedLinkException, IOException;
@@ -983,7 +986,7 @@ public interface ClientProtocol {
    */
   @AtMostOnce
   public void updatePipeline(String clientName, ExtendedBlock oldBlock, 
-      ExtendedBlock newBlock, DatanodeID[] newNodes)
+      ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs)
       throws IOException;
 
   /**
@@ -1099,23 +1102,24 @@ public interface ClientProtocol {
    * Add a CacheDirective to the CacheManager.
    * 
    * @param directive A CacheDirectiveInfo to be added
+   * @param flags {@link CacheFlag}s to use for this operation.
    * @return A CacheDirectiveInfo associated with the added directive
    * @throws IOException if the directive could not be added
    */
   @AtMostOnce
-  public long addCacheDirective(
-      CacheDirectiveInfo directive) throws IOException;
+  public long addCacheDirective(CacheDirectiveInfo directive,
+      EnumSet<CacheFlag> flags) throws IOException;
 
   /**
    * Modify a CacheDirective in the CacheManager.
    * 
-   * @return directive The directive to modify.  Must contain 
-   *                   a directive ID.
+   * @return directive The directive to modify. Must contain a directive ID.
+   * @param flags {@link CacheFlag}s to use for this operation.
    * @throws IOException if the directive could not be modified
    */
   @AtMostOnce
-  public void modifyCacheDirective(
-      CacheDirectiveInfo directive) throws IOException;
+  public void modifyCacheDirective(CacheDirectiveInfo directive,
+      EnumSet<CacheFlag> flags) throws IOException;
 
   /**
    * Remove a CacheDirectiveInfo from the CacheManager.

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java Fri Jan  3 07:26:52 2014
@@ -21,6 +21,8 @@ package org.apache.hadoop.hdfs.protocol;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This class represents the primary identifier for a Datanode.
  * Datanodes are identified by how they can be contacted (hostname
@@ -40,37 +42,46 @@ public class DatanodeID implements Compa
   private String ipAddr;     // IP address
   private String hostName;   // hostname claimed by datanode
   private String peerHostName; // hostname from the actual connection
-  private String storageID;  // unique per cluster storageID
   private int xferPort;      // data streaming port
   private int infoPort;      // info server port
   private int infoSecurePort; // info server port
   private int ipcPort;       // IPC server port
 
+  /**
+   * UUID identifying a given datanode. For upgraded Datanodes this is the
+   * same as the StorageID that was previously used by this Datanode. 
+   * For newly formatted Datanodes it is a UUID.
+   */
+  private String datanodeUuid = null;
+
   public DatanodeID(DatanodeID from) {
     this(from.getIpAddr(),
         from.getHostName(),
-        from.getStorageID(),
+        from.getDatanodeUuid(),
         from.getXferPort(),
         from.getInfoPort(),
         from.getInfoSecurePort(),
         from.getIpcPort());
     this.peerHostName = from.getPeerHostName();
   }
-  
+
   /**
    * Create a DatanodeID
    * @param ipAddr IP
    * @param hostName hostname
-   * @param storageID data storage ID
+   * @param datanodeUuid data node ID, UUID for new Datanodes, may be the
+   *                     storage ID for pre-UUID datanodes. NULL if unknown
+   *                     e.g. if this is a new datanode. A new UUID will
+   *                     be assigned by the namenode.
    * @param xferPort data transfer port
    * @param infoPort info server port 
    * @param ipcPort ipc server port
    */
-  public DatanodeID(String ipAddr, String hostName, String storageID,
+  public DatanodeID(String ipAddr, String hostName, String datanodeUuid,
       int xferPort, int infoPort, int infoSecurePort, int ipcPort) {
     this.ipAddr = ipAddr;
     this.hostName = hostName;
-    this.storageID = storageID;
+    this.datanodeUuid = checkDatanodeUuid(datanodeUuid);
     this.xferPort = xferPort;
     this.infoPort = infoPort;
     this.infoSecurePort = infoSecurePort;
@@ -85,8 +96,24 @@ public class DatanodeID implements Compa
     this.peerHostName = peerHostName;
   }
   
-  public void setStorageID(String storageID) {
-    this.storageID = storageID;
+  /**
+   * @return data node ID.
+   */
+  public String getDatanodeUuid() {
+    return datanodeUuid;
+  }
+
+  @VisibleForTesting
+  public void setDatanodeUuidForTesting(String datanodeUuid) {
+    this.datanodeUuid = datanodeUuid;
+  }
+
+  private String checkDatanodeUuid(String uuid) {
+    if (uuid == null || uuid.isEmpty()) {
+      return null;
+    } else {
+      return uuid;
+    }
   }
 
   /**
@@ -169,13 +196,6 @@ public class DatanodeID implements Compa
   }
 
   /**
-   * @return data storage ID.
-   */
-  public String getStorageID() {
-    return storageID;
-  }
-
-  /**
    * @return xferPort (the port for data streaming)
    */
   public int getXferPort() {
@@ -212,12 +232,12 @@ public class DatanodeID implements Compa
       return false;
     }
     return (getXferAddr().equals(((DatanodeID)to).getXferAddr()) &&
-            storageID.equals(((DatanodeID)to).getStorageID()));
+        datanodeUuid.equals(((DatanodeID)to).getDatanodeUuid()));
   }
   
   @Override
   public int hashCode() {
-    return getXferAddr().hashCode()^ storageID.hashCode();
+    return getXferAddr().hashCode()^ datanodeUuid.hashCode();
   }
   
   @Override

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Fri Jan  3 07:26:52 2014
@@ -115,7 +115,7 @@ public class DatanodeInfo extends Datano
       final long blockPoolUsed, final long cacheCapacity, final long cacheUsed,
       final long lastUpdate, final int xceiverCount,
       final AdminStates adminState) {
-    this(nodeID.getIpAddr(), nodeID.getHostName(), nodeID.getStorageID(),
+    this(nodeID.getIpAddr(), nodeID.getHostName(), nodeID.getDatanodeUuid(),
         nodeID.getXferPort(), nodeID.getInfoPort(), nodeID.getInfoSecurePort(),
         nodeID.getIpcPort(), capacity, dfsUsed, remaining, blockPoolUsed,
         cacheCapacity, cacheUsed, lastUpdate, xceiverCount, location,
@@ -124,13 +124,13 @@ public class DatanodeInfo extends Datano
 
   /** Constructor */
   public DatanodeInfo(final String ipAddr, final String hostName,
-      final String storageID, final int xferPort, final int infoPort,
+      final String datanodeUuid, final int xferPort, final int infoPort,
       final int infoSecurePort, final int ipcPort,
       final long capacity, final long dfsUsed, final long remaining,
       final long blockPoolUsed, final long cacheCapacity, final long cacheUsed,
       final long lastUpdate, final int xceiverCount,
       final String networkLocation, final AdminStates adminState) {
-    super(ipAddr, hostName, storageID, xferPort, infoPort,
+    super(ipAddr, hostName, datanodeUuid, xferPort, infoPort,
             infoSecurePort, ipcPort);
     this.capacity = capacity;
     this.dfsUsed = dfsUsed;

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java Fri Jan  3 07:26:52 2014
@@ -107,7 +107,10 @@ public class LayoutVersion {
         "block IDs in the edits log and image files"),
     EDITLOG_SUPPORT_RETRYCACHE(-47, "Record ClientId and CallId in editlog to " 
         + "enable rebuilding retry cache in case of HA failover"),
-    CACHING(-48, "Support for cache pools and path-based caching");
+    CACHING(-48, "Support for cache pools and path-based caching"),
+    ADD_DATANODE_AND_STORAGE_UUIDS(-49, "Replace StorageID with DatanodeUuid."
+        + " Use distinct StorageUuid per storage directory.");
+
     
     final int lv;
     final int ancestorLV;
@@ -248,3 +251,4 @@ public class LayoutVersion {
     throw new AssertionError("All layout versions are reserved.");
   }
 }
+

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Fri Jan  3 07:26:52 2014
@@ -21,7 +21,9 @@ import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.security.token.Token;
 
 import com.google.common.base.Preconditions;
@@ -40,6 +42,10 @@ public class LocatedBlock {
   private ExtendedBlock b;
   private long offset;  // offset of the first byte of the block in the file
   private DatanodeInfo[] locs;
+  /** Storage ID for each replica */
+  private String[] storageIDs;
+  // Storage type for each replica, if reported.
+  private StorageType[] storageTypes;
   // corrupt flag is true if all of the replicas of a block are corrupt.
   // else false. If block has few corrupt replicas, they are filtered and 
   // their locations are not part of this object
@@ -54,20 +60,34 @@ public class LocatedBlock {
   private static final DatanodeInfo[] EMPTY_LOCS = new DatanodeInfo[0];
 
   public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) {
-    this(b, locs, -1); // startOffset is unknown
-  }
-
-  public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, long startOffset) {
-    this(b, locs, startOffset, false);
+    this(b, locs, -1, false); // startOffset is unknown
   }
 
   public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, long startOffset, 
                       boolean corrupt) {
-    this(b, locs, startOffset, corrupt, EMPTY_LOCS);
+    this(b, locs, null, null, startOffset, corrupt, EMPTY_LOCS);
+  }
+
+  public LocatedBlock(ExtendedBlock b, DatanodeStorageInfo[] storages) {
+    this(b, storages, -1, false); // startOffset is unknown
   }
 
-  public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, long startOffset,
-      boolean corrupt, DatanodeInfo[] cachedLocs) {
+  public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs,
+                      String[] storageIDs, StorageType[] storageTypes) {
+    this(b, locs, storageIDs, storageTypes, -1, false, EMPTY_LOCS);
+  }
+
+  public LocatedBlock(ExtendedBlock b, DatanodeStorageInfo[] storages,
+      long startOffset, boolean corrupt) {
+    this(b, DatanodeStorageInfo.toDatanodeInfos(storages),
+        DatanodeStorageInfo.toStorageIDs(storages),
+        DatanodeStorageInfo.toStorageTypes(storages),
+        startOffset, corrupt, EMPTY_LOCS); // startOffset is unknown
+  }
+
+  public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, String[] storageIDs,
+                      StorageType[] storageTypes, long startOffset,
+                      boolean corrupt, DatanodeInfo[] cachedLocs) {
     this.b = b;
     this.offset = startOffset;
     this.corrupt = corrupt;
@@ -76,6 +96,8 @@ public class LocatedBlock {
     } else {
       this.locs = locs;
     }
+    this.storageIDs = storageIDs;
+    this.storageTypes = storageTypes;
     Preconditions.checkArgument(cachedLocs != null,
         "cachedLocs should not be null, use a different constructor");
     if (cachedLocs.length == 0) {
@@ -100,7 +122,15 @@ public class LocatedBlock {
   public DatanodeInfo[] getLocations() {
     return locs;
   }
+
+  public StorageType[] getStorageTypes() {
+    return storageTypes;
+  }
   
+  public String[] getStorageIDs() {
+    return storageIDs;
+  }
+
   public long getStartOffset() {
     return offset;
   }
@@ -161,3 +191,4 @@ public class LocatedBlock {
         + "}";
   }
 }
+

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java Fri Jan  3 07:26:52 2014
@@ -51,7 +51,7 @@ public class UnregisteredNodeException e
    */
   public UnregisteredNodeException(DatanodeID nodeID, DatanodeInfo storedNode) {
     super("Data node " + nodeID + " is attempting to report storage ID " 
-          + nodeID.getStorageID() + ". Node " 
+          + nodeID.getDatanodeUuid() + ". Node "
           + storedNode + " is expected to serve this storage.");
   }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Fri Jan  3 07:26:52 2014
@@ -320,7 +320,7 @@ public class ClientNamenodeProtocolServe
     try {
       HdfsFileStatus result = server.create(req.getSrc(),
           PBHelper.convert(req.getMasked()), req.getClientName(),
-          PBHelper.convert(req.getCreateFlag()), req.getCreateParent(),
+          PBHelper.convertCreateFlag(req.getCreateFlag()), req.getCreateParent(),
           (short) req.getReplication(), req.getBlockSize());
 
       if (result != null) {
@@ -425,14 +425,17 @@ public class ClientNamenodeProtocolServe
       throws ServiceException {
     try {
       List<DatanodeInfoProto> existingList = req.getExistingsList();
+      List<String> existingStorageIDsList = req.getExistingStorageUuidsList();
       List<DatanodeInfoProto> excludesList = req.getExcludesList();
-      LocatedBlock result = server.getAdditionalDatanode(
-          req.getSrc(), PBHelper.convert(req.getBlk()),
+      LocatedBlock result = server.getAdditionalDatanode(req.getSrc(),
+          PBHelper.convert(req.getBlk()),
           PBHelper.convert(existingList.toArray(
               new DatanodeInfoProto[existingList.size()])),
+          existingStorageIDsList.toArray(
+              new String[existingStorageIDsList.size()]),
           PBHelper.convert(excludesList.toArray(
               new DatanodeInfoProto[excludesList.size()])), 
-              req.getNumAdditionalNodes(), req.getClientName());
+          req.getNumAdditionalNodes(), req.getClientName());
       return GetAdditionalDatanodeResponseProto.newBuilder().setBlock(
           PBHelper.convert(result))
           .build();
@@ -833,10 +836,12 @@ public class ClientNamenodeProtocolServe
       UpdatePipelineRequestProto req) throws ServiceException {
     try {
       List<DatanodeIDProto> newNodes = req.getNewNodesList();
-      server
-          .updatePipeline(req.getClientName(), PBHelper.convert(req
-              .getOldBlock()), PBHelper.convert(req.getNewBlock()), PBHelper
-              .convert(newNodes.toArray(new DatanodeIDProto[newNodes.size()])));
+      List<String> newStorageIDs = req.getStorageIDsList();
+      server.updatePipeline(req.getClientName(),
+          PBHelper.convert(req.getOldBlock()),
+          PBHelper.convert(req.getNewBlock()),
+          PBHelper.convert(newNodes.toArray(new DatanodeIDProto[newNodes.size()])),
+          newStorageIDs.toArray(new String[newStorageIDs.size()]));
       return VOID_UPDATEPIPELINE_RESPONSE;
     } catch (IOException e) {
       throw new ServiceException(e);
@@ -1029,9 +1034,11 @@ public class ClientNamenodeProtocolServe
       RpcController controller, AddCacheDirectiveRequestProto request)
       throws ServiceException {
     try {
+      long id = server.addCacheDirective(
+          PBHelper.convert(request.getInfo()),
+          PBHelper.convertCacheFlags(request.getCacheFlags()));
       return AddCacheDirectiveResponseProto.newBuilder().
-              setId(server.addCacheDirective(
-                  PBHelper.convert(request.getInfo()))).build();
+              setId(id).build();
     } catch (IOException e) {
       throw new ServiceException(e);
     }
@@ -1043,7 +1050,8 @@ public class ClientNamenodeProtocolServe
       throws ServiceException {
     try {
       server.modifyCacheDirective(
-          PBHelper.convert(request.getInfo()));
+          PBHelper.convert(request.getInfo()),
+          PBHelper.convertCacheFlags(request.getCacheFlags()));
       return ModifyCacheDirectiveResponseProto.newBuilder().build();
     } catch (IOException e) {
       throw new ServiceException(e);

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java Fri Jan  3 07:26:52 2014
@@ -21,10 +21,12 @@ import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.EnumSet;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
+import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -351,7 +353,8 @@ public class ClientNamenodeProtocolTrans
 
   @Override
   public LocatedBlock getAdditionalDatanode(String src, ExtendedBlock blk,
-      DatanodeInfo[] existings, DatanodeInfo[] excludes,
+      DatanodeInfo[] existings, String[] existingStorageIDs,
+      DatanodeInfo[] excludes,
       int numAdditionalNodes, String clientName) throws AccessControlException,
       FileNotFoundException, SafeModeException, UnresolvedLinkException,
       IOException {
@@ -360,6 +363,7 @@ public class ClientNamenodeProtocolTrans
         .setSrc(src)
         .setBlk(PBHelper.convert(blk))
         .addAllExistings(PBHelper.convert(existings))
+        .addAllExistingStorageUuids(Arrays.asList(existingStorageIDs))
         .addAllExcludes(PBHelper.convert(excludes))
         .setNumAdditionalNodes(numAdditionalNodes)
         .setClientName(clientName)
@@ -796,12 +800,13 @@ public class ClientNamenodeProtocolTrans
 
   @Override
   public void updatePipeline(String clientName, ExtendedBlock oldBlock,
-      ExtendedBlock newBlock, DatanodeID[] newNodes) throws IOException {
+      ExtendedBlock newBlock, DatanodeID[] newNodes, String[] storageIDs) throws IOException {
     UpdatePipelineRequestProto req = UpdatePipelineRequestProto.newBuilder()
         .setClientName(clientName)
         .setOldBlock(PBHelper.convert(oldBlock))
         .setNewBlock(PBHelper.convert(newBlock))
         .addAllNewNodes(Arrays.asList(PBHelper.convert(newNodes)))
+        .addAllStorageIDs(storageIDs == null ? null : Arrays.asList(storageIDs))
         .build();
     try {
       rpcProxy.updatePipeline(null, req);
@@ -1000,24 +1005,32 @@ public class ClientNamenodeProtocolTrans
   }
 
   @Override
-  public long addCacheDirective(
-      CacheDirectiveInfo directive) throws IOException {
+  public long addCacheDirective(CacheDirectiveInfo directive,
+      EnumSet<CacheFlag> flags) throws IOException {
     try {
-      return rpcProxy.addCacheDirective(null, 
-              AddCacheDirectiveRequestProto.newBuilder().
-                  setInfo(PBHelper.convert(directive)).build()).getId();
+      AddCacheDirectiveRequestProto.Builder builder =
+          AddCacheDirectiveRequestProto.newBuilder().
+              setInfo(PBHelper.convert(directive));
+      if (!flags.isEmpty()) {
+        builder.setCacheFlags(PBHelper.convertCacheFlags(flags));
+      }
+      return rpcProxy.addCacheDirective(null, builder.build()).getId();
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }
   }
 
   @Override
-  public void modifyCacheDirective(
-      CacheDirectiveInfo directive) throws IOException {
+  public void modifyCacheDirective(CacheDirectiveInfo directive,
+      EnumSet<CacheFlag> flags) throws IOException {
     try {
-      rpcProxy.modifyCacheDirective(null,
+      ModifyCacheDirectiveRequestProto.Builder builder =
           ModifyCacheDirectiveRequestProto.newBuilder().
-              setInfo(PBHelper.convert(directive)).build());
+              setInfo(PBHelper.convert(directive));
+      if (!flags.isEmpty()) {
+        builder.setCacheFlags(PBHelper.convertCacheFlags(flags));
+      }
+      rpcProxy.modifyCacheDirective(null, builder.build());
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java Fri Jan  3 07:26:52 2014
@@ -245,7 +245,7 @@ public class DatanodeProtocolClientSideT
     for (StorageReceivedDeletedBlocks storageBlock : receivedAndDeletedBlocks) {
       StorageReceivedDeletedBlocksProto.Builder repBuilder = 
           StorageReceivedDeletedBlocksProto.newBuilder();
-      repBuilder.setStorageID(storageBlock.getStorageID());
+      repBuilder.setStorageUuid(storageBlock.getStorageID());
       for (ReceivedDeletedBlockInfo rdBlock : storageBlock.getBlocks()) {
         repBuilder.addBlocks(PBHelper.convert(rdBlock));
       }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java Fri Jan  3 07:26:52 2014
@@ -42,7 +42,6 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
@@ -102,14 +101,8 @@ public class DatanodeProtocolServerSideT
       HeartbeatRequestProto request) throws ServiceException {
     HeartbeatResponse response;
     try {
-      List<StorageReportProto> list = request.getReportsList();
-      StorageReport[] report = new StorageReport[list.size()];
-      int i = 0;
-      for (StorageReportProto p : list) {
-        report[i++] = new StorageReport(p.getStorageID(), p.getFailed(),
-            p.getCapacity(), p.getDfsUsed(), p.getRemaining(),
-            p.getBlockPoolUsed());
-      }
+      final StorageReport[] report = PBHelper.convertStorageReports(
+          request.getReportsList());
       response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
           report, request.getCacheCapacity(), request.getCacheUsed(),
           request.getXmitsInProgress(),
@@ -198,7 +191,7 @@ public class DatanodeProtocolServerSideT
       for (int j = 0; j < list.size(); j++) {
         rdBlocks[j] = PBHelper.convert(list.get(j));
       }
-      info[i] = new StorageReceivedDeletedBlocks(sBlock.getStorageID(), rdBlocks);
+      info[i] = new StorageReceivedDeletedBlocks(sBlock.getStorageUuid(), rdBlocks);
     }
     try {
       impl.blockReceivedAndDeleted(PBHelper.convert(request.getRegistration()),

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java Fri Jan  3 07:26:52 2014
@@ -82,6 +82,6 @@ public class InterDatanodeProtocolServer
       throw new ServiceException(e);
     }
     return UpdateReplicaUnderRecoveryResponseProto.newBuilder()
-        .setStorageID(storageID).build();
+        .setStorageUuid(storageID).build();
   }
 }
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java Fri Jan  3 07:26:52 2014
@@ -109,7 +109,7 @@ public class InterDatanodeProtocolTransl
         .setNewLength(newLength).setRecoveryId(recoveryId).build();
     try {
       return rpcProxy.updateReplicaUnderRecovery(NULL_CONTROLLER, req
-          ).getStorageID();
+          ).getStorageUuid();
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Fri Jan  3 07:26:52 2014
@@ -27,6 +27,7 @@ import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.List;
 
+import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FsServerDefaults;
@@ -35,6 +36,7 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
@@ -52,17 +54,18 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
-import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
-import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveEntryProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoExpirationProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveStatsProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheFlagProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolStatsProto;
@@ -122,6 +125,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectoryListingProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectoryStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -242,17 +247,20 @@ public class PBHelper {
 
   // DatanodeId
   public static DatanodeID convert(DatanodeIDProto dn) {
-    return new DatanodeID(dn.getIpAddr(), dn.getHostName(), dn.getStorageID(),
+    return new DatanodeID(dn.getIpAddr(), dn.getHostName(), dn.getDatanodeUuid(),
         dn.getXferPort(), dn.getInfoPort(), dn.hasInfoSecurePort() ? dn
         .getInfoSecurePort() : 0, dn.getIpcPort());
   }
 
   public static DatanodeIDProto convert(DatanodeID dn) {
+    // For wire compatibility with older versions we transmit the StorageID
+    // which is the same as the DatanodeUuid. Since StorageID is a required
+    // field we pass the empty string if the DatanodeUuid is not yet known.
     return DatanodeIDProto.newBuilder()
         .setIpAddr(dn.getIpAddr())
         .setHostName(dn.getHostName())
-        .setStorageID(dn.getStorageID())
         .setXferPort(dn.getXferPort())
+        .setDatanodeUuid(dn.getDatanodeUuid() != null ? dn.getDatanodeUuid() : "")
         .setInfoPort(dn.getInfoPort())
         .setInfoSecurePort(dn.getInfoSecurePort())
         .setIpcPort(dn.getIpcPort()).build();
@@ -294,12 +302,16 @@ public class PBHelper {
   public static BlockWithLocationsProto convert(BlockWithLocations blk) {
     return BlockWithLocationsProto.newBuilder()
         .setBlock(convert(blk.getBlock()))
-        .addAllStorageIDs(Arrays.asList(blk.getStorageIDs())).build();
+        .addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids()))
+        .addAllStorageUuids(Arrays.asList(blk.getStorageIDs())).build();
   }
 
   public static BlockWithLocations convert(BlockWithLocationsProto b) {
-    return new BlockWithLocations(convert(b.getBlock()), b.getStorageIDsList()
-        .toArray(new String[0]));
+    final List<String> datanodeUuids = b.getDatanodeUuidsList();
+    final List<String> storageUuids = b.getStorageUuidsList();
+    return new BlockWithLocations(convert(b.getBlock()),
+        datanodeUuids.toArray(new String[datanodeUuids.size()]),
+        storageUuids.toArray(new String[storageUuids.size()]));
   }
 
   public static BlocksWithLocationsProto convert(BlocksWithLocations blks) {
@@ -499,21 +511,7 @@ public class PBHelper {
   
   static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) {
     if (di == null) return null;
-    DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder();
-    if (di.getNetworkLocation() != null) {
-      builder.setLocation(di.getNetworkLocation());
-    }
-        
-    return builder.
-     setId(PBHelper.convert((DatanodeID) di)).
-     setCapacity(di.getCapacity()).
-     setDfsUsed(di.getDfsUsed()).
-     setRemaining(di.getRemaining()).
-     setBlockPoolUsed(di.getBlockPoolUsed()).
-     setLastUpdate(di.getLastUpdate()).
-     setXceiverCount(di.getXceiverCount()).
-     setAdminState(PBHelper.convert(di.getAdminState())).
-     build();     
+    return convert(di);
   }
   
   
@@ -557,15 +555,20 @@ public class PBHelper {
   
   public static DatanodeInfoProto convert(DatanodeInfo info) {
     DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder();
-    builder.setBlockPoolUsed(info.getBlockPoolUsed());
-    builder.setAdminState(PBHelper.convert(info.getAdminState()));
-    builder.setCapacity(info.getCapacity())
-        .setDfsUsed(info.getDfsUsed())
+    if (info.getNetworkLocation() != null) {
+      builder.setLocation(info.getNetworkLocation());
+    }
+    builder
         .setId(PBHelper.convert((DatanodeID)info))
-        .setLastUpdate(info.getLastUpdate())
-        .setLocation(info.getNetworkLocation())
+        .setCapacity(info.getCapacity())
+        .setDfsUsed(info.getDfsUsed())
         .setRemaining(info.getRemaining())
+        .setBlockPoolUsed(info.getBlockPoolUsed())
+        .setCacheCapacity(info.getCacheCapacity())
+        .setCacheUsed(info.getCacheUsed())
+        .setLastUpdate(info.getLastUpdate())
         .setXceiverCount(info.getXceiverCount())
+        .setAdminState(PBHelper.convert(info.getAdminState()))
         .build();
     return builder.build();
   }
@@ -601,6 +604,17 @@ public class PBHelper {
         "Found additional cached replica locations that are not in the set of"
         + " storage-backed locations!");
 
+    StorageType[] storageTypes = b.getStorageTypes();
+    if (storageTypes != null) {
+      for (int i = 0; i < storageTypes.length; ++i) {
+        builder.addStorageTypes(PBHelper.convertStorageType(storageTypes[i]));
+      }
+    }
+    final String[] storageIDs = b.getStorageIDs();
+    if (storageIDs != null) {
+      builder.addAllStorageIDs(Arrays.asList(storageIDs));
+    }
+
     return builder.setB(PBHelper.convert(b.getBlock()))
         .setBlockToken(PBHelper.convert(b.getBlockToken()))
         .setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
@@ -613,6 +627,25 @@ public class PBHelper {
     for (int i = 0; i < locs.size(); i++) {
       targets[i] = PBHelper.convert(locs.get(i));
     }
+
+    final int storageTypesCount = proto.getStorageTypesCount();
+    final StorageType[] storageTypes;
+    if (storageTypesCount == 0) {
+      storageTypes = null;
+    } else {
+      Preconditions.checkState(storageTypesCount == locs.size());
+      storageTypes = convertStorageTypeProtos(proto.getStorageTypesList());
+    }
+
+    final int storageIDsCount = proto.getStorageIDsCount();
+    final String[] storageIDs;
+    if (storageIDsCount == 0) {
+      storageIDs = null;
+    } else {
+      Preconditions.checkState(storageIDsCount == locs.size());
+      storageIDs = proto.getStorageIDsList().toArray(new String[storageIDsCount]);
+    }
+
     // Set values from the isCached list, re-using references from loc
     List<DatanodeInfo> cachedLocs = new ArrayList<DatanodeInfo>(locs.size());
     List<Boolean> isCachedList = proto.getIsCachedList();
@@ -623,7 +656,7 @@ public class PBHelper {
     }
 
     LocatedBlock lb = new LocatedBlock(PBHelper.convert(proto.getB()), targets,
-        proto.getOffset(), proto.getCorrupt(),
+        storageIDs, storageTypes, proto.getOffset(), proto.getCorrupt(),
         cachedLocs.toArray(new DatanodeInfo[0]));
     lb.setBlockToken(PBHelper.convert(proto.getBlockToken()));
 
@@ -766,7 +799,8 @@ public class PBHelper {
     for (int i = 0; i < blocks.length; i++) {
       builder.addBlocks(PBHelper.convert(blocks[i]));
     }
-    builder.addAllTargets(PBHelper.convert(cmd.getTargets()));
+    builder.addAllTargets(convert(cmd.getTargets()))
+           .addAllTargetStorageUuids(convert(cmd.getTargetStorageIDs()));
     return builder.build();
   }
   
@@ -799,6 +833,15 @@ public class PBHelper {
     return Arrays.asList(ret);
   }
 
+  private static List<StorageUuidsProto> convert(String[][] targetStorageUuids) {
+    StorageUuidsProto[] ret = new StorageUuidsProto[targetStorageUuids.length];
+    for (int i = 0; i < targetStorageUuids.length; i++) {
+      ret[i] = StorageUuidsProto.newBuilder()
+          .addAllStorageUuids(Arrays.asList(targetStorageUuids[i])).build();
+    }
+    return Arrays.asList(ret);
+  }
+
   public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) {
     DatanodeCommandProto.Builder builder = DatanodeCommandProto.newBuilder();
     if (datanodeCommand == null) {
@@ -878,6 +921,14 @@ public class PBHelper {
     for (int i = 0; i < targetList.size(); i++) {
       targets[i] = PBHelper.convert(targetList.get(i));
     }
+
+    List<StorageUuidsProto> targetStorageUuidsList = blkCmd.getTargetStorageUuidsList();
+    String[][] targetStorageIDs = new String[targetStorageUuidsList.size()][];
+    for(int i = 0; i < targetStorageIDs.length; i++) {
+      List<String> storageIDs = targetStorageUuidsList.get(i).getStorageUuidsList();
+      targetStorageIDs[i] = storageIDs.toArray(new String[storageIDs.size()]);
+    }
+
     int action = DatanodeProtocol.DNA_UNKNOWN;
     switch (blkCmd.getAction()) {
     case TRANSFER:
@@ -892,7 +943,8 @@ public class PBHelper {
     default:
       throw new AssertionError("Unknown action type: " + blkCmd.getAction());
     }
-    return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets);
+    return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets,
+        targetStorageIDs);
   }
 
   public static BlockIdCommand convert(BlockIdCommandProto blkIdCmd) {
@@ -1123,7 +1175,7 @@ public class PBHelper {
     return value;
   }
   
-  public static EnumSetWritable<CreateFlag> convert(int flag) {
+  public static EnumSetWritable<CreateFlag> convertCreateFlag(int flag) {
     EnumSet<CreateFlag> result = 
        EnumSet.noneOf(CreateFlag.class);   
     if ((flag & CreateFlagProto.APPEND_VALUE) == CreateFlagProto.APPEND_VALUE) {
@@ -1138,7 +1190,23 @@ public class PBHelper {
     }
     return new EnumSetWritable<CreateFlag>(result);
   }
-  
+
+  public static int convertCacheFlags(EnumSet<CacheFlag> flags) {
+    int value = 0;
+    if (flags.contains(CacheFlag.FORCE)) {
+      value |= CacheFlagProto.FORCE.getNumber();
+    }
+    return value;
+  }
+
+  public static EnumSet<CacheFlag> convertCacheFlags(int flags) {
+    EnumSet<CacheFlag> result = EnumSet.noneOf(CacheFlag.class);
+    if ((flags & CacheFlagProto.FORCE_VALUE) == CacheFlagProto.FORCE_VALUE) {
+      result.add(CacheFlag.FORCE);
+    }
+    return result;
+  }
+
   public static HdfsFileStatus convert(HdfsFileStatusProto fs) {
     if (fs == null)
       return null;
@@ -1422,11 +1490,12 @@ public class PBHelper {
 
   public static DatanodeStorageProto convert(DatanodeStorage s) {
     return DatanodeStorageProto.newBuilder()
-        .setState(PBHelper.convert(s.getState()))
-        .setStorageID(s.getStorageID()).build();
+        .setState(PBHelper.convertState(s.getState()))
+        .setStorageType(PBHelper.convertStorageType(s.getStorageType()))
+        .setStorageUuid(s.getStorageID()).build();
   }
 
-  private static StorageState convert(State state) {
+  private static StorageState convertState(State state) {
     switch(state) {
     case READ_ONLY:
       return StorageState.READ_ONLY;
@@ -1436,11 +1505,26 @@ public class PBHelper {
     }
   }
 
+  private static StorageTypeProto convertStorageType(
+      StorageType type) {
+    switch(type) {
+    case DISK:
+      return StorageTypeProto.DISK;
+    case SSD:
+      return StorageTypeProto.SSD;
+    default:
+      throw new IllegalStateException(
+          "BUG: StorageType not found, type=" + type);
+    }
+  }
+
   public static DatanodeStorage convert(DatanodeStorageProto s) {
-    return new DatanodeStorage(s.getStorageID(), PBHelper.convert(s.getState()));
+    return new DatanodeStorage(s.getStorageUuid(),
+                               PBHelper.convertState(s.getState()),
+                               PBHelper.convertType(s.getStorageType()));
   }
 
-  private static State convert(StorageState state) {
+  private static State convertState(StorageState state) {
     switch(state) {
     case READ_ONLY:
       return DatanodeStorage.State.READ_ONLY;
@@ -1450,14 +1534,50 @@ public class PBHelper {
     }
   }
 
+  private static StorageType convertType(StorageTypeProto type) {
+    switch(type) {
+      case DISK:
+        return StorageType.DISK;
+      case SSD:
+        return StorageType.SSD;
+      default:
+        throw new IllegalStateException(
+            "BUG: StorageTypeProto not found, type=" + type);
+    }
+  }
+
+  private static StorageType[] convertStorageTypeProtos(
+      List<StorageTypeProto> storageTypesList) {
+    final StorageType[] storageTypes = new StorageType[storageTypesList.size()];
+    for (int i = 0; i < storageTypes.length; ++i) {
+      storageTypes[i] = PBHelper.convertType(storageTypesList.get(i));
+    }
+    return storageTypes;
+  }
+
   public static StorageReportProto convert(StorageReport r) {
     StorageReportProto.Builder builder = StorageReportProto.newBuilder()
         .setBlockPoolUsed(r.getBlockPoolUsed()).setCapacity(r.getCapacity())
         .setDfsUsed(r.getDfsUsed()).setRemaining(r.getRemaining())
-        .setStorageID(r.getStorageID());
+        .setStorageUuid(r.getStorageID());
     return builder.build();
   }
 
+  public static StorageReport convert(StorageReportProto p) {
+    return new StorageReport(p.getStorageUuid(), p.getFailed(),
+        p.getCapacity(), p.getDfsUsed(), p.getRemaining(),
+        p.getBlockPoolUsed());
+  }
+
+  public static StorageReport[] convertStorageReports(
+      List<StorageReportProto> list) {
+    final StorageReport[] report = new StorageReport[list.size()];
+    for (int i = 0; i < report.length; i++) {
+      report[i] = convert(list.get(i));
+    }
+    return report;
+  }
+
   public static JournalInfo convert(JournalInfoProto info) {
     int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
     int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;
@@ -1684,8 +1804,11 @@ public class PBHelper {
     if (info.getMode() != null) {
       builder.setMode(info.getMode().toShort());
     }
-    if (info.getWeight() != null) {
-      builder.setWeight(info.getWeight());
+    if (info.getLimit() != null) {
+      builder.setLimit(info.getLimit());
+    }
+    if (info.getMaxRelativeExpiryMs() != null) {
+      builder.setMaxRelativeExpiry(info.getMaxRelativeExpiryMs());
     }
     return builder.build();
   }
@@ -1703,8 +1826,11 @@ public class PBHelper {
     if (proto.hasMode()) {
       info.setMode(new FsPermission((short)proto.getMode()));
     }
-    if (proto.hasWeight()) {
-      info.setWeight(proto.getWeight());
+    if (proto.hasLimit())  {
+      info.setLimit(proto.getLimit());
+    }
+    if (proto.hasMaxRelativeExpiry()) {
+      info.setMaxRelativeExpiryMs(proto.getMaxRelativeExpiry());
     }
     return info;
   }
@@ -1713,6 +1839,7 @@ public class PBHelper {
     CachePoolStatsProto.Builder builder = CachePoolStatsProto.newBuilder();
     builder.setBytesNeeded(stats.getBytesNeeded());
     builder.setBytesCached(stats.getBytesCached());
+    builder.setBytesOverlimit(stats.getBytesOverlimit());
     builder.setFilesNeeded(stats.getFilesNeeded());
     builder.setFilesCached(stats.getFilesCached());
     return builder.build();
@@ -1722,6 +1849,7 @@ public class PBHelper {
     CachePoolStats.Builder builder = new CachePoolStats.Builder();
     builder.setBytesNeeded(proto.getBytesNeeded());
     builder.setBytesCached(proto.getBytesCached());
+    builder.setBytesOverlimit(proto.getBytesOverlimit());
     builder.setFilesNeeded(proto.getFilesNeeded());
     builder.setFilesCached(proto.getFilesCached());
     return builder.build();
@@ -1756,3 +1884,4 @@ public class PBHelper {
     return new ExactSizeInputStream(input, size);
   }
 }
+

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Fri Jan  3 07:26:52 2014
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdfs.server.balancer;
 
 import static com.google.common.base.Preconditions.checkArgument;
-
 import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
 
 import java.io.BufferedInputStream;
@@ -221,9 +220,9 @@ public class Balancer {
   private Map<Block, BalancerBlock> globalBlockList
                  = new HashMap<Block, BalancerBlock>();
   private MovedBlocks movedBlocks = new MovedBlocks();
-  // Map storage IDs to BalancerDatanodes
-  private Map<String, BalancerDatanode> datanodes
-                 = new HashMap<String, BalancerDatanode>();
+  /** Map (datanodeUuid -> BalancerDatanodes) */
+  private final Map<String, BalancerDatanode> datanodeMap
+      = new HashMap<String, BalancerDatanode>();
   
   private NetworkTopology cluster;
 
@@ -241,6 +240,14 @@ public class Balancer {
     private PendingBlockMove() {
     }
     
+    @Override
+    public String toString() {
+      final Block b = block.getBlock();
+      return b + " with size=" + b.getNumBytes() + " from "
+          + source.getDisplayName() + " to " + target.getDisplayName()
+          + " through " + proxySource.getDisplayName();
+    }
+
     /* choose a block & a proxy source for this pendingMove 
      * whose source & target have already been chosen.
      * 
@@ -272,11 +279,7 @@ public class Balancer {
             if ( chooseProxySource() ) {
               movedBlocks.add(block);
               if (LOG.isDebugEnabled()) {
-                LOG.debug("Decided to move block "+ block.getBlockId()
-                    +" with a length of "+StringUtils.byteDesc(block.getNumBytes())
-                    + " bytes from " + source.getDisplayName()
-                    + " to " + target.getDisplayName()
-                    + " using proxy source " + proxySource.getDisplayName() );
+                LOG.debug("Decided to move " + this);
               }
               return true;
             }
@@ -353,17 +356,9 @@ public class Balancer {
         sendRequest(out);
         receiveResponse(in);
         bytesMoved.inc(block.getNumBytes());
-        LOG.info( "Moving block " + block.getBlock().getBlockId() +
-              " from "+ source.getDisplayName() + " to " +
-              target.getDisplayName() + " through " +
-              proxySource.getDisplayName() +
-              " is succeeded." );
+        LOG.info("Successfully moved " + this);
       } catch (IOException e) {
-        LOG.warn("Error moving block "+block.getBlockId()+
-            " from " + source.getDisplayName() + " to " +
-            target.getDisplayName() + " through " +
-            proxySource.getDisplayName() +
-            ": "+e.getMessage());
+        LOG.warn("Failed to move " + this + ": " + e.getMessage());
       } finally {
         IOUtils.closeStream(out);
         IOUtils.closeStream(in);
@@ -415,9 +410,7 @@ public class Balancer {
         @Override
         public void run() {
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Starting moving "+ block.getBlockId() +
-                " from " + proxySource.getDisplayName() + " to " +
-                target.getDisplayName());
+            LOG.debug("Start moving " + PendingBlockMove.this);
           }
           dispatch();
         }
@@ -464,11 +457,6 @@ public class Balancer {
       return block;
     }
     
-    /* Return the block id */
-    private long getBlockId() {
-      return block.getBlockId();
-    }
-    
     /* Return the length of the block */
     private long getNumBytes() {
       return block.getNumBytes();
@@ -552,7 +540,7 @@ public class Balancer {
     
     /* Get the storage id of the datanode */
     protected String getStorageID() {
-      return datanode.getStorageID();
+      return datanode.getDatanodeUuid();
     }
     
     /** Decide if still need to move more bytes */
@@ -675,10 +663,10 @@ public class Balancer {
         
           synchronized (block) {
             // update locations
-            for ( String storageID : blk.getStorageIDs() ) {
-              BalancerDatanode datanode = datanodes.get(storageID);
+            for (String datanodeUuid : blk.getDatanodeUuids()) {
+              final BalancerDatanode d = datanodeMap.get(datanodeUuid);
               if (datanode != null) { // not an unknown datanode
-                block.addLocation(datanode);
+                block.addLocation(d);
               }
             }
           }
@@ -852,16 +840,6 @@ public class Balancer {
                         DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT));
   }
   
-  /* Shuffle datanode array */
-  static private void shuffleArray(DatanodeInfo[] datanodes) {
-    for (int i=datanodes.length; i>1; i--) {
-      int randomIndex = DFSUtil.getRandom().nextInt(i);
-      DatanodeInfo tmp = datanodes[randomIndex];
-      datanodes[randomIndex] = datanodes[i-1];
-      datanodes[i-1] = tmp;
-    }
-  }
-  
   /* Given a data node set, build a network topology and decide
    * over-utilized datanodes, above average utilized datanodes, 
    * below average utilized datanodes, and underutilized datanodes. 
@@ -891,8 +869,7 @@ public class Balancer {
      * an increasing order or a decreasing order.
      */  
     long overLoadedBytes = 0L, underLoadedBytes = 0L;
-    shuffleArray(datanodes);
-    for (DatanodeInfo datanode : datanodes) {
+    for (DatanodeInfo datanode : DFSUtil.shuffle(datanodes)) {
       if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
         continue; // ignore decommissioning or decommissioned nodes
       }
@@ -923,13 +900,13 @@ public class Balancer {
               datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0);
         }
       }
-      this.datanodes.put(datanode.getStorageID(), datanodeS);
+      datanodeMap.put(datanode.getDatanodeUuid(), datanodeS);
     }
 
     //logging
     logNodes();
     
-    assert (this.datanodes.size() == 
+    assert (this.datanodeMap.size() == 
       overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
       aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size())
       : "Mismatched number of datanodes";
@@ -1001,9 +978,9 @@ public class Balancer {
     // At last, match all remaining nodes
     chooseNodes(ANY_OTHER);
     
-    assert (datanodes.size() >= sources.size()+targets.size())
+    assert (datanodeMap.size() >= sources.size()+targets.size())
       : "Mismatched number of datanodes (" +
-      datanodes.size() + " total, " +
+      datanodeMap.size() + " total, " +
       sources.size() + " sources, " +
       targets.size() + " targets)";
 
@@ -1304,7 +1281,7 @@ public class Balancer {
     this.aboveAvgUtilizedDatanodes.clear();
     this.belowAvgUtilizedDatanodes.clear();
     this.underUtilizedDatanodes.clear();
-    this.datanodes.clear();
+    this.datanodeMap.clear();
     this.sources.clear();
     this.targets.clear();  
     this.policy.reset();

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java Fri Jan  3 07:26:52 2014
@@ -75,7 +75,7 @@ public interface BlockCollection {
    * and set the locations.
    */
   public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
-      DatanodeDescriptor[] locations) throws IOException;
+      DatanodeStorageInfo[] targets) throws IOException;
 
   /**
    * @return whether the block collection is under construction.

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Fri Jan  3 07:26:52 2014
@@ -21,6 +21,7 @@ import java.util.LinkedList;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.util.LightWeightGSet;
 
@@ -39,11 +40,11 @@ public class BlockInfo extends Block imp
   private LightWeightGSet.LinkedElement nextLinkedElement;
 
   /**
-   * This array contains triplets of references. For each i-th datanode the
-   * block belongs to triplets[3*i] is the reference to the DatanodeDescriptor
-   * and triplets[3*i+1] and triplets[3*i+2] are references to the previous and
-   * the next blocks, respectively, in the list of blocks belonging to this
-   * data-node.
+   * This array contains triplets of references. For each i-th storage, the
+   * block belongs to triplets[3*i] is the reference to the
+   * {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are
+   * references to the previous and the next blocks, respectively, in the list
+   * of blocks belonging to this storage.
    * 
    * Using previous and next in Object triplets is done instead of a
    * {@link LinkedList} list to efficiently use memory. With LinkedList the cost
@@ -86,9 +87,14 @@ public class BlockInfo extends Block imp
   }
 
   public DatanodeDescriptor getDatanode(int index) {
+    DatanodeStorageInfo storage = getStorageInfo(index);
+    return storage == null ? null : storage.getDatanodeDescriptor();
+  }
+
+  DatanodeStorageInfo getStorageInfo(int index) {
     assert this.triplets != null : "BlockInfo is not initialized";
     assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
-    return (DatanodeDescriptor)triplets[index*3];
+    return (DatanodeStorageInfo)triplets[index*3];
   }
 
   private BlockInfo getPrevious(int index) {
@@ -111,14 +117,10 @@ public class BlockInfo extends Block imp
     return info;
   }
 
-  private void setDatanode(int index, DatanodeDescriptor node, BlockInfo previous,
-      BlockInfo next) {
+  private void setStorageInfo(int index, DatanodeStorageInfo storage) {
     assert this.triplets != null : "BlockInfo is not initialized";
-    int i = index * 3;
-    assert index >= 0 && i+2 < triplets.length : "Index is out of bound";
-    triplets[i] = node;
-    triplets[i+1] = previous;
-    triplets[i+2] = next;
+    assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
+    triplets[index*3] = storage;
   }
 
   /**
@@ -190,22 +192,34 @@ public class BlockInfo extends Block imp
   }
 
   /**
-   * Add data-node this block belongs to.
+   * Add a {@link DatanodeStorageInfo} location for a block
    */
-  public boolean addNode(DatanodeDescriptor node) {
-    if(findDatanode(node) >= 0) // the node is already there
-      return false;
+  boolean addStorage(DatanodeStorageInfo storage) {
+    boolean added = true;
+    int idx = findDatanode(storage.getDatanodeDescriptor());
+    if(idx >= 0) {
+      if (getStorageInfo(idx) == storage) { // the storage is already there
+        return false;
+      } else {
+        // The block is on the DN but belongs to a different storage.
+        // Update our state.
+        removeStorage(storage);
+        added = false;      // Just updating storage. Return false.
+      }
+    }
     // find the last null node
     int lastNode = ensureCapacity(1);
-    setDatanode(lastNode, node, null, null);
-    return true;
+    setStorageInfo(lastNode, storage);
+    setNext(lastNode, null);
+    setPrevious(lastNode, null);
+    return added;
   }
 
   /**
-   * Remove data-node from the block.
+   * Remove {@link DatanodeStorageInfo} location for a block
    */
-  public boolean removeNode(DatanodeDescriptor node) {
-    int dnIndex = findDatanode(node);
+  boolean removeStorage(DatanodeStorageInfo storage) {
+    int dnIndex = findStorageInfo(storage);
     if(dnIndex < 0) // the node is not found
       return false;
     assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : 
@@ -213,10 +227,13 @@ public class BlockInfo extends Block imp
     // find the last not null node
     int lastNode = numNodes()-1; 
     // replace current node triplet by the lastNode one 
-    setDatanode(dnIndex, getDatanode(lastNode), getPrevious(lastNode),
-        getNext(lastNode));
+    setStorageInfo(dnIndex, getStorageInfo(lastNode));
+    setNext(dnIndex, getNext(lastNode)); 
+    setPrevious(dnIndex, getPrevious(lastNode)); 
     // set the last triplet to null
-    setDatanode(lastNode, null, null, null);
+    setStorageInfo(lastNode, null);
+    setNext(lastNode, null); 
+    setPrevious(lastNode, null); 
     return true;
   }
 
@@ -236,37 +253,70 @@ public class BlockInfo extends Block imp
     }
     return -1;
   }
+  /**
+   * Find specified DatanodeStorageInfo.
+   * @param dn
+   * @return index or -1 if not found.
+   */
+  int findStorageInfo(DatanodeInfo dn) {
+    int len = getCapacity();
+    for(int idx = 0; idx < len; idx++) {
+      DatanodeStorageInfo cur = getStorageInfo(idx);
+      if(cur == null)
+        break;
+      if(cur.getDatanodeDescriptor() == dn)
+        return idx;
+    }
+    return -1;
+  }
+  
+  /**
+   * Find specified DatanodeStorageInfo.
+   * @param storageInfo
+   * @return index or -1 if not found.
+   */
+  int findStorageInfo(DatanodeStorageInfo storageInfo) {
+    int len = getCapacity();
+    for(int idx = 0; idx < len; idx++) {
+      DatanodeStorageInfo cur = getStorageInfo(idx);
+      if(cur == storageInfo)
+        return idx;
+      if(cur == null)
+        break;
+    }
+    return -1;
+  }
 
   /**
    * Insert this block into the head of the list of blocks 
-   * related to the specified DatanodeDescriptor.
+   * related to the specified DatanodeStorageInfo.
    * If the head is null then form a new list.
    * @return current block as the new head of the list.
    */
-  public BlockInfo listInsert(BlockInfo head, DatanodeDescriptor dn) {
-    int dnIndex = this.findDatanode(dn);
+  BlockInfo listInsert(BlockInfo head, DatanodeStorageInfo storage) {
+    int dnIndex = this.findStorageInfo(storage);
     assert dnIndex >= 0 : "Data node is not found: current";
     assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : 
             "Block is already in the list and cannot be inserted.";
     this.setPrevious(dnIndex, null);
     this.setNext(dnIndex, head);
     if(head != null)
-      head.setPrevious(head.findDatanode(dn), this);
+      head.setPrevious(head.findStorageInfo(storage), this);
     return this;
   }
 
   /**
    * Remove this block from the list of blocks 
-   * related to the specified DatanodeDescriptor.
+   * related to the specified DatanodeStorageInfo.
    * If this block is the head of the list then return the next block as 
    * the new head.
    * @return the new head of the list or null if the list becomes
-   * empty after deletion.
+   * empy after deletion.
    */
-  public BlockInfo listRemove(BlockInfo head, DatanodeDescriptor dn) {
+  BlockInfo listRemove(BlockInfo head, DatanodeStorageInfo storage) {
     if(head == null)
       return null;
-    int dnIndex = this.findDatanode(dn);
+    int dnIndex = this.findStorageInfo(storage);
     if(dnIndex < 0) // this block is not on the data-node list
       return head;
 
@@ -275,9 +325,9 @@ public class BlockInfo extends Block imp
     this.setNext(dnIndex, null);
     this.setPrevious(dnIndex, null);
     if(prev != null)
-      prev.setNext(prev.findDatanode(dn), next);
+      prev.setNext(prev.findStorageInfo(storage), next);
     if(next != null)
-      next.setPrevious(next.findDatanode(dn), prev);
+      next.setPrevious(next.findStorageInfo(storage), prev);
     if(this == head)  // removing the head
       head = next;
     return head;
@@ -289,7 +339,7 @@ public class BlockInfo extends Block imp
    *
    * @return the new head of the list.
    */
-  public BlockInfo moveBlockToHead(BlockInfo head, DatanodeDescriptor dn,
+  public BlockInfo moveBlockToHead(BlockInfo head, DatanodeStorageInfo storage,
       int curIndex, int headIndex) {
     if (head == this) {
       return this;
@@ -298,9 +348,9 @@ public class BlockInfo extends Block imp
     BlockInfo prev = this.setPrevious(curIndex, null);
 
     head.setPrevious(headIndex, this);
-    prev.setNext(prev.findDatanode(dn), next);
+    prev.setNext(prev.findStorageInfo(storage), next);
     if (next != null)
-      next.setPrevious(next.findDatanode(dn), prev);
+      next.setPrevious(next.findStorageInfo(storage), prev);
     return this;
   }
 
@@ -328,10 +378,10 @@ public class BlockInfo extends Block imp
    * @return BlockInfoUnderConstruction -  an under construction block.
    */
   public BlockInfoUnderConstruction convertToBlockUnderConstruction(
-      BlockUCState s, DatanodeDescriptor[] targets) {
+      BlockUCState s, DatanodeStorageInfo[] targets) {
     if(isComplete()) {
-      return new BlockInfoUnderConstruction(
-          this, getBlockCollection().getBlockReplication(), s, targets);
+      return new BlockInfoUnderConstruction(this,
+          getBlockCollection().getBlockReplication(), s, targets);
     }
     // the block is already under construction
     BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)this;

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java Fri Jan  3 07:26:52 2014
@@ -63,12 +63,12 @@ public class BlockInfoUnderConstruction 
    * corresponding replicas.
    */
   static class ReplicaUnderConstruction extends Block {
-    private DatanodeDescriptor expectedLocation;
+    private final DatanodeStorageInfo expectedLocation;
     private ReplicaState state;
     private boolean chosenAsPrimary;
 
     ReplicaUnderConstruction(Block block,
-                             DatanodeDescriptor target,
+                             DatanodeStorageInfo target,
                              ReplicaState state) {
       super(block);
       this.expectedLocation = target;
@@ -82,7 +82,7 @@ public class BlockInfoUnderConstruction 
      * It is not guaranteed, but expected, that the data-node actually has
      * the replica.
      */
-    DatanodeDescriptor getExpectedLocation() {
+    private DatanodeStorageInfo getExpectedStorageLocation() {
       return expectedLocation;
     }
 
@@ -118,7 +118,7 @@ public class BlockInfoUnderConstruction 
      * Is data-node the replica belongs to alive.
      */
     boolean isAlive() {
-      return expectedLocation.isAlive;
+      return expectedLocation.getDatanodeDescriptor().isAlive;
     }
 
     @Override // Block
@@ -162,7 +162,7 @@ public class BlockInfoUnderConstruction 
    */
   public BlockInfoUnderConstruction(Block blk, int replication,
                              BlockUCState state,
-                             DatanodeDescriptor[] targets) {
+                             DatanodeStorageInfo[] targets) {
     super(blk, replication);
     assert getBlockUCState() != BlockUCState.COMPLETE :
       "BlockInfoUnderConstruction cannot be in COMPLETE state";
@@ -186,7 +186,7 @@ public class BlockInfoUnderConstruction 
   }
 
   /** Set expected locations */
-  public void setExpectedLocations(DatanodeDescriptor[] targets) {
+  public void setExpectedLocations(DatanodeStorageInfo[] targets) {
     int numLocations = targets == null ? 0 : targets.length;
     this.replicas = new ArrayList<ReplicaUnderConstruction>(numLocations);
     for(int i = 0; i < numLocations; i++)
@@ -198,12 +198,12 @@ public class BlockInfoUnderConstruction 
    * Create array of expected replica locations
    * (as has been assigned by chooseTargets()).
    */
-  public DatanodeDescriptor[] getExpectedLocations() {
+  public DatanodeStorageInfo[] getExpectedStorageLocations() {
     int numLocations = replicas == null ? 0 : replicas.size();
-    DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocations];
+    DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations];
     for(int i = 0; i < numLocations; i++)
-      locations[i] = replicas.get(i).getExpectedLocation();
-    return locations;
+      storages[i] = replicas.get(i).getExpectedStorageLocation();
+    return storages;
   }
 
   /** Get the number of expected locations */
@@ -244,9 +244,9 @@ public class BlockInfoUnderConstruction 
     // The replica list is unchanged.
     for (ReplicaUnderConstruction r : replicas) {
       if (genStamp != r.getGenerationStamp()) {
-        r.getExpectedLocation().removeBlock(this);
+        r.getExpectedStorageLocation().removeBlock(this);
         NameNode.blockStateChangeLog.info("BLOCK* Removing stale replica "
-            + "from location: " + r.getExpectedLocation());
+            + "from location: " + r.getExpectedStorageLocation());
       }
     }
   }
@@ -302,31 +302,44 @@ public class BlockInfoUnderConstruction 
       if (!(replicas.get(i).isAlive() && !replicas.get(i).getChosenAsPrimary())) {
         continue;
       }
-      if (replicas.get(i).getExpectedLocation().getLastUpdate() > mostRecentLastUpdate) {
-        primary = replicas.get(i);
+      final ReplicaUnderConstruction ruc = replicas.get(i);
+      final long lastUpdate = ruc.getExpectedStorageLocation().getDatanodeDescriptor().getLastUpdate(); 
+      if (lastUpdate > mostRecentLastUpdate) {
         primaryNodeIndex = i;
-        mostRecentLastUpdate = primary.getExpectedLocation().getLastUpdate();
+        primary = ruc;
+        mostRecentLastUpdate = lastUpdate;
       }
     }
     if (primary != null) {
-      primary.getExpectedLocation().addBlockToBeRecovered(this);
+      primary.getExpectedStorageLocation().getDatanodeDescriptor().addBlockToBeRecovered(this);
       primary.setChosenAsPrimary(true);
       NameNode.blockStateChangeLog.info("BLOCK* " + this
         + " recovery started, primary=" + primary);
     }
   }
 
-  void addReplicaIfNotPresent(DatanodeDescriptor dn,
+  void addReplicaIfNotPresent(DatanodeStorageInfo storage,
                      Block block,
                      ReplicaState rState) {
-    for (ReplicaUnderConstruction r : replicas) {
-      if (r.getExpectedLocation() == dn) {
+    Iterator<ReplicaUnderConstruction> it = replicas.iterator();
+    while (it.hasNext()) {
+      ReplicaUnderConstruction r = it.next();
+      if(r.getExpectedStorageLocation() == storage) {
         // Record the gen stamp from the report
         r.setGenerationStamp(block.getGenerationStamp());
         return;
+      } else if (r.getExpectedStorageLocation().getDatanodeDescriptor() ==
+          storage.getDatanodeDescriptor()) {
+
+        // The Datanode reported that the block is on a different storage
+        // than the one chosen by BlockPlacementPolicy. This can occur as
+        // we allow Datanodes to choose the target storage. Update our
+        // state by removing the stale entry and adding a new one.
+        it.remove();
+        break;
       }
     }
-    replicas.add(new ReplicaUnderConstruction(block, dn, rState));
+    replicas.add(new ReplicaUnderConstruction(block, storage, rState));
   }
 
   @Override // BlockInfo



Mime
View raw message