hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject hadoop git commit: HDFS-6133. Add a feature for replica pinning so that a pinned replica will not be moved by Balancer/Mover. Contributed by zhaoyunjiong
Date Wed, 11 Feb 2015 23:16:51 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 2ca76df21 -> 65a6cf47e


HDFS-6133. Add a feature for replica pinning so that a pinned replica will not be moved by
Balancer/Mover.  Contributed by zhaoyunjiong

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/65a6cf47
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/65a6cf47
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/65a6cf47

Branch: refs/heads/branch-2
Commit: 65a6cf47ec2c5c6a64b32a042eb115c8e2d615b2
Parents: 2ca76df
Author: Tsz-Wo Nicholas Sze <szetszwo@hortonworks.com>
Authored: Wed Feb 11 15:09:29 2015 -0800
Committer: Tsz-Wo Nicholas Sze <szetszwo@hortonworks.com>
Committed: Wed Feb 11 15:16:34 2015 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  4 ++
 .../org/apache/hadoop/hdfs/DFSOutputStream.java | 22 +++++-
 .../hadoop/hdfs/DistributedFileSystem.java      |  7 +-
 .../datatransfer/DataTransferProtocol.java      |  6 +-
 .../hdfs/protocol/datatransfer/Receiver.java    | 10 +--
 .../hdfs/protocol/datatransfer/Sender.java      |  8 ++-
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 21 ++++++
 .../hdfs/server/datanode/BlockReceiver.java     | 12 +++-
 .../hadoop/hdfs/server/datanode/DataNode.java   |  2 +-
 .../hdfs/server/datanode/DataXceiver.java       | 34 +++++++---
 .../server/datanode/fsdataset/FsDatasetSpi.java | 13 ++++
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 40 +++++++++++
 .../src/main/proto/datatransfer.proto           |  3 +
 .../src/main/resources/hdfs-default.xml         |  6 ++
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     | 27 ++++++--
 .../hadoop/hdfs/TestDataTransferProtocol.java   |  2 +-
 .../hdfs/server/balancer/TestBalancer.java      | 71 +++++++++++++++++---
 .../server/datanode/SimulatedFSDataset.java     | 11 +++
 .../hdfs/server/datanode/TestDiskError.java     |  2 +-
 .../extdataset/ExternalDatasetImpl.java         |  9 +++
 21 files changed, 274 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/65a6cf47/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 923e8f8..7fe580e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -30,6 +30,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7720. Quota by Storage Type API, tools and ClientNameNode Protocol
     changes. (Xiaoyu Yao via Arpit Agarwal)
 
+    HDFS-6133. Add a feature for replica pinning so that a pinned replica
+    will not be moved by Balancer/Mover.  (zhaoyunjiong via szetszwo)
+
   IMPROVEMENTS
 
     HDFS-7055. Add tracing to DFSInputStream (cmccabe)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65a6cf47/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index d1a2b24..9c87b73 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -781,4 +781,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   // 10 days
   public static final long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
       TimeUnit.DAYS.toMillis(10);
+  public static final String DFS_DATANODE_BLOCK_PINNING_ENABLED = 
+    "dfs.datanode.block-pinning.enabled";
+  public static final boolean DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT =
+    false;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65a6cf47/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 601560c..243d263 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -1442,11 +1442,13 @@ public class DFSOutputStream extends FSOutputSummer
           ExtendedBlock blockCopy = new ExtendedBlock(block);
           blockCopy.setNumBytes(blockSize);
 
+          boolean[] targetPinnings = getPinnings(nodes);
           // send the request
           new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
               dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, 
               nodes.length, block.getNumBytes(), bytesSent, newGS,
-              checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile);
+              checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
+            (targetPinnings == null ? false : targetPinnings[0]), targetPinnings);
   
           // receive ack for connect
           BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@@ -1534,6 +1536,24 @@ public class DFSOutputStream extends FSOutputSummer
       }
     }
 
+    private boolean[] getPinnings(DatanodeInfo[] nodes) {
+      if (favoredNodes == null) {
+        return null;
+      } else {
+        boolean[] pinnings = new boolean[nodes.length];
+        for (int i = 0; i < nodes.length; i++) {
+          pinnings[i] = false;
+          for (int j = 0; j < favoredNodes.length; j++) {
+            if (nodes[i].getXferAddrWithHostname().equals(favoredNodes[j])) {
+              pinnings[i] = true;
+              break;
+            }
+          }
+        }
+        return pinnings;
+      }
+    }
+
     private LocatedBlock locateFollowingBlock(long start,
         DatanodeInfo[] excludedNodes)  throws IOException {
       int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65a6cf47/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 6775896..46933c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -346,9 +346,10 @@ public class DistributedFileSystem extends FileSystem {
    * Progressable)} with the addition of favoredNodes that is a hint to 
    * where the namenode should place the file blocks.
    * The favored nodes hint is not persisted in HDFS. Hence it may be honored
-   * at the creation time only. HDFS could move the blocks during balancing or
-   * replication, to move the blocks from favored nodes. A value of null means
-   * no favored nodes for this create
+   * at the creation time only. And with favored nodes, blocks will be pinned
+   * on the datanodes to prevent balancing move the block. HDFS could move the
+   * blocks during replication, to move the blocks from favored nodes. A value
+   * of null means no favored nodes for this create
    */
   public HdfsDataOutputStream create(final Path f,
       final FsPermission permission, final boolean overwrite,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65a6cf47/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
index f6b99e6..5fc263e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
@@ -92,6 +92,8 @@ public interface DataTransferProtocol {
    * @param minBytesRcvd minimum number of bytes received.
    * @param maxBytesRcvd maximum number of bytes received.
    * @param latestGenerationStamp the latest generation stamp of the block.
+   * @param pinning whether to pin the block, so Balancer won't move it.
+   * @param targetPinnings whether to pin the block on target datanode
    */
   public void writeBlock(final ExtendedBlock blk,
       final StorageType storageType, 
@@ -107,7 +109,9 @@ public interface DataTransferProtocol {
       final long latestGenerationStamp,
       final DataChecksum requestedChecksum,
       final CachingStrategy cachingStrategy,
-      final boolean allowLazyPersist) throws IOException;
+      final boolean allowLazyPersist,
+      final boolean pinning,
+      final boolean[] targetPinnings) throws IOException;
   /**
    * Transfer a block to another datanode.
    * The block stage must be

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65a6cf47/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index ffbfc21..7994027 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -149,10 +149,12 @@ public abstract class Receiver implements DataTransferProtocol {
           (proto.hasCachingStrategy() ?
               getCachingStrategy(proto.getCachingStrategy()) :
             CachingStrategy.newDefaultStrategy()),
-            (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false));
-     } finally {
-      if (traceScope != null) traceScope.close();
-     }
+          (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false),
+          (proto.hasPinning() ? proto.getPinning(): false),
+          (PBHelper.convertBooleanList(proto.getTargetPinningsList())));
+    } finally {
+     if (traceScope != null) traceScope.close();
+    }
   }
 
   /** Receive {@link Op#TRANSFER_BLOCK} */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65a6cf47/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
index 844c270..eb30afb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -129,7 +129,9 @@ public class Sender implements DataTransferProtocol {
       final long latestGenerationStamp,
       DataChecksum requestedChecksum,
       final CachingStrategy cachingStrategy,
-      final boolean allowLazyPersist) throws IOException {
+      final boolean allowLazyPersist,
+      final boolean pinning,
+      final boolean[] targetPinnings) throws IOException {
     ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
         blk, clientName, blockToken);
     
@@ -148,7 +150,9 @@ public class Sender implements DataTransferProtocol {
       .setLatestGenerationStamp(latestGenerationStamp)
       .setRequestedChecksum(checksumProto)
       .setCachingStrategy(getCachingStrategy(cachingStrategy))
-      .setAllowLazyPersist(allowLazyPersist);
+      .setAllowLazyPersist(allowLazyPersist)
+      .setPinning(pinning)
+      .addAllTargetPinnings(PBHelper.convert(targetPinnings, 1));
     
     if (source != null) {
       proto.setSource(PBHelper.convertDatanodeInfo(source));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65a6cf47/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 7c705cf..547823a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -2954,4 +2954,25 @@ public class PBHelper {
         ezKeyVersionName);
   }
 
+  public static List<Boolean> convert(boolean[] targetPinnings, int idx) {
+    List<Boolean> pinnings = new ArrayList<Boolean>();
+    if (targetPinnings == null) {
+      pinnings.add(Boolean.FALSE);
+    } else {
+      for (; idx < targetPinnings.length; ++idx) {
+        pinnings.add(Boolean.valueOf(targetPinnings[idx]));
+      }
+    }
+    return pinnings;
+  }
+
+  public static boolean[] convertBooleanList(
+    List<Boolean> targetPinningsList) {
+    final boolean[] targetPinnings = new boolean[targetPinningsList.size()];
+    for (int i = 0; i < targetPinningsList.size(); i++) {
+      targetPinnings[i] = targetPinningsList.get(i);
+    }
+    return targetPinnings;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65a6cf47/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index cfd2442..3d6c66e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -132,6 +132,8 @@ class BlockReceiver implements Closeable {
   private long lastResponseTime = 0;
   private boolean isReplaceBlock = false;
   private DataOutputStream replyOut = null;
+  
+  private boolean pinning;
 
   BlockReceiver(final ExtendedBlock block, final StorageType storageType,
       final DataInputStream in,
@@ -141,7 +143,8 @@ class BlockReceiver implements Closeable {
       final String clientname, final DatanodeInfo srcDataNode,
       final DataNode datanode, DataChecksum requestedChecksum,
       CachingStrategy cachingStrategy,
-      final boolean allowLazyPersist) throws IOException {
+      final boolean allowLazyPersist,
+      final boolean pinning) throws IOException {
     try{
       this.block = block;
       this.in = in;
@@ -165,12 +168,14 @@ class BlockReceiver implements Closeable {
       this.isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
           || stage == BlockConstructionStage.TRANSFER_FINALIZED;
 
+      this.pinning = pinning;
       if (LOG.isDebugEnabled()) {
         LOG.debug(getClass().getSimpleName() + ": " + block
             + "\n  isClient  =" + isClient + ", clientname=" + clientname
             + "\n  isDatanode=" + isDatanode + ", srcDataNode=" + srcDataNode
             + "\n  inAddr=" + inAddr + ", myAddr=" + myAddr
             + "\n  cachingStrategy = " + cachingStrategy
+            + "\n  pinning=" + pinning
             );
       }
 
@@ -1287,6 +1292,11 @@ class BlockReceiver implements Closeable {
           : 0;
       block.setNumBytes(replicaInfo.getNumBytes());
       datanode.data.finalizeBlock(block);
+      
+      if (pinning) {
+        datanode.data.setPinning(block);
+      }
+      
       datanode.closeBlock(
           block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid());
       if (ClientTraceLog.isInfoEnabled() && isClient) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65a6cf47/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index b872bbb..cfd33cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -2117,7 +2117,7 @@ public class DataNode extends ReconfigurableBase
         new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
             clientname, targets, targetStorageTypes, srcNode,
             stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy,
-            false);
+            false, false, null);
 
         // send data & checksum
         blockSender.sendBlock(out, unbufOut, null);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65a6cf47/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 3a2723f..bb5323a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -581,7 +581,9 @@ class DataXceiver extends Receiver implements Runnable {
       final long latestGenerationStamp,
       DataChecksum requestedChecksum,
       CachingStrategy cachingStrategy,
-      final boolean allowLazyPersist) throws IOException {
+      final boolean allowLazyPersist,
+      final boolean pinning,
+      final boolean[] targetPinnings) throws IOException {
     previousOpClientName = clientname;
     updateCurrentThreadName("Receiving block " + block);
     final boolean isDatanode = clientname.length() == 0;
@@ -594,14 +596,14 @@ class DataXceiver extends Receiver implements Runnable {
       throw new IOException(stage + " does not support multiple targets "
           + Arrays.asList(targets));
     }
-
+    
     if (LOG.isDebugEnabled()) {
       LOG.debug("opWriteBlock: stage=" + stage + ", clientname=" + clientname 
       		+ "\n  block  =" + block + ", newGs=" + latestGenerationStamp
       		+ ", bytesRcvd=[" + minBytesRcvd + ", " + maxBytesRcvd + "]"
           + "\n  targets=" + Arrays.asList(targets)
           + "; pipelineSize=" + pipelineSize + ", srcDataNode=" + srcDataNode
-          );
+          + ", pinning=" + pinning);
       LOG.debug("isDatanode=" + isDatanode
           + ", isClient=" + isClient
           + ", isTransfer=" + isTransfer);
@@ -643,7 +645,7 @@ class DataXceiver extends Receiver implements Runnable {
             peer.getLocalAddressString(),
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
             clientname, srcDataNode, datanode, requestedChecksum,
-            cachingStrategy, allowLazyPersist);
+            cachingStrategy, allowLazyPersist, pinning);
 
         storageUuid = blockReceiver.getStorageUuid();
       } else {
@@ -686,10 +688,19 @@ class DataXceiver extends Receiver implements Runnable {
           mirrorIn = new DataInputStream(unbufMirrorIn);
 
           // Do not propagate allowLazyPersist to downstream DataNodes.
-          new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
+          if (targetPinnings != null && targetPinnings.length > 0) {
+            new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
               blockToken, clientname, targets, targetStorageTypes, srcDataNode,
               stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
-              latestGenerationStamp, requestedChecksum, cachingStrategy, false);
+              latestGenerationStamp, requestedChecksum, cachingStrategy,
+              false, targetPinnings[0], targetPinnings);
+          } else {
+            new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
+              blockToken, clientname, targets, targetStorageTypes, srcDataNode,
+              stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
+              latestGenerationStamp, requestedChecksum, cachingStrategy,
+              false, false, targetPinnings);
+          }
 
           mirrorOut.flush();
 
@@ -949,7 +960,14 @@ class DataXceiver extends Receiver implements Runnable {
       }
 
     }
-
+    
+    if (datanode.data.getPinning(block)) {
+      String msg = "Not able to copy block " + block.getBlockId() + " " +
+          "to " + peer.getRemoteAddressString() + " because it's pinned ";
+      LOG.info(msg);
+      sendResponse(ERROR, msg);
+    }
+    
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
       String msg = "Not able to copy block " + block.getBlockId() + " " +
           "to " + peer.getRemoteAddressString() + " because threads " +
@@ -1109,7 +1127,7 @@ class DataXceiver extends Receiver implements Runnable {
             proxyReply, proxySock.getRemoteSocketAddress().toString(),
             proxySock.getLocalSocketAddress().toString(),
             null, 0, 0, 0, "", null, datanode, remoteChecksum,
-            CachingStrategy.newDropBehind(), false);
+            CachingStrategy.newDropBehind(), false, false);
         
         // receive a block
         blockReceiver.receiveBlock(null, null, replyOut, null, 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65a6cf47/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 25bc7a5..cc7aec5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -516,4 +516,17 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean
{
      */
     public ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block,
         StorageType targetStorageType) throws IOException;
+
+  /**
+   * Set a block to be pinned on this datanode so that it cannot be moved
+   * by Balancer/Mover.
+   *
+   * It is a no-op when dfs.datanode.block-pinning.enabled is set to false.
+   */
+  public void setPinning(ExtendedBlock block) throws IOException;
+
+  /**
+   * Check whether the block was pinned
+   */
+  public boolean getPinning(ExtendedBlock block) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65a6cf47/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 2abc94d..cefb206 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -51,6 +51,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
@@ -242,6 +246,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   // Used for synchronizing access to usage stats
   private final Object statsLock = new Object();
 
+  final LocalFileSystem localFS;
+
+  private boolean blockPinningEnabled;
+  
   /**
    * An FSDataset has a directory where it loads its data files.
    */
@@ -301,6 +309,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     lazyWriter = new Daemon(new LazyWriter(conf));
     lazyWriter.start();
     registerMBean(datanode.getDatanodeUuid());
+    localFS = FileSystem.getLocal(conf);
+    blockPinningEnabled = conf.getBoolean(
+      DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED,
+      DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT);
   }
 
   private void addVolume(Collection<StorageLocation> dataLocations,
@@ -2870,5 +2882,33 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       shouldRun = false;
     }
   }
+  
+  @Override
+  public void setPinning(ExtendedBlock block) throws IOException {
+    if (!blockPinningEnabled) {
+      return;
+    }
+
+    File f = getBlockFile(block);
+    Path p = new Path(f.getAbsolutePath());
+    
+    FsPermission oldPermission = localFS.getFileStatus(
+        new Path(f.getAbsolutePath())).getPermission();
+    //sticky bit is used for pinning purpose
+    FsPermission permission = new FsPermission(oldPermission.getUserAction(),
+        oldPermission.getGroupAction(), oldPermission.getOtherAction(), true);
+    localFS.setPermission(p, permission);
+  }
+  
+  @Override
+  public boolean getPinning(ExtendedBlock block) throws IOException {
+    if (!blockPinningEnabled) {
+      return  false;
+    }
+    File f = getBlockFile(block);
+        
+    FileStatus fss = localFS.getFileStatus(new Path(f.getAbsolutePath()));
+    return fss.getPermission().getStickyBit();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65a6cf47/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
index 9512688..d72bb5e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
@@ -123,6 +123,9 @@ message OpWriteBlockProto {
    * to ignore this hint.
    */
   optional bool allowLazyPersist = 13 [default = false];
+  //whether to pin the block, so Balancer won't move it.
+  optional bool pinning = 14 [default = false];
+  repeated bool targetPinnings = 15;
 }
   
 message OpTransferBlockProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65a6cf47/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 8296414..bfaa33b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2235,4 +2235,10 @@
   </description>
 </property>
 
+  <property>
+    <name>dfs.datanode.block-pinning.enabled</name>
+    <value>false</value>
+    <description>Whether pin blocks on favored DataNode.</description>
+  </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65a6cf47/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 3929f46..8e659c3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -317,13 +317,21 @@ public class DFSTestUtil {
   public static void createFile(FileSystem fs, Path fileName, int bufferLen,
                                 long fileLen, long blockSize, short replFactor, long seed)
       throws IOException {
-    createFile(fs, fileName, false, bufferLen, fileLen, blockSize,
-            replFactor, seed, false);
+    createFile(fs, fileName, false, bufferLen, fileLen, blockSize, replFactor,
+      seed, false);
   }
 
   public static void createFile(FileSystem fs, Path fileName,
       boolean isLazyPersist, int bufferLen, long fileLen, long blockSize,
       short replFactor, long seed, boolean flush) throws IOException {
+        createFile(fs, fileName, isLazyPersist, bufferLen, fileLen, blockSize,
+          replFactor, seed, flush, null);
+  }
+
+  public static void createFile(FileSystem fs, Path fileName,
+      boolean isLazyPersist, int bufferLen, long fileLen, long blockSize,
+      short replFactor, long seed, boolean flush,
+      InetSocketAddress[] favoredNodes) throws IOException {
   assert bufferLen > 0;
   if (!fs.mkdirs(fileName.getParent())) {
       throw new IOException("Mkdirs failed to create " +
@@ -336,10 +344,19 @@ public class DFSTestUtil {
     createFlags.add(LAZY_PERSIST);
   }
   try {
-      out = fs.create(fileName, FsPermission.getFileDefault(), createFlags,
-        fs.getConf().getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
+    if (favoredNodes == null) {
+      out = fs.create(
+        fileName,
+        FsPermission.getFileDefault(),
+        createFlags,
+        fs.getConf().getInt(
+          CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
         replFactor, blockSize, null);
-
+    } else {
+      out = ((DistributedFileSystem) fs).create(fileName,
+        FsPermission.getDefault(), true, bufferLen, replFactor, blockSize,
+        null, favoredNodes);
+    }
       if (fileLen > 0) {
         byte[] toWrite = new byte[bufferLen];
         Random rb = new Random(seed);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65a6cf47/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
index fd4f1a5..0dcdc98 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
@@ -534,6 +534,6 @@ public class TestDataTransferProtocol {
         BlockTokenSecretManager.DUMMY_TOKEN, "cl",
         new DatanodeInfo[1], new StorageType[1], null, stage,
         0, block.getNumBytes(), block.getNumBytes(), newGS,
-        checksum, CachingStrategy.newDefaultStrategy(), false);
+        checksum, CachingStrategy.newDefaultStrategy(), false, false, null);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65a6cf47/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 16dbdfd..a5346b2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -17,12 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.balancer;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
 import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
 import static org.junit.Assert.assertEquals;
@@ -33,6 +28,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.URI;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -59,12 +55,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.StorageType;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
@@ -310,6 +302,63 @@ public class TestBalancer {
   }
   
   /**
+   * Make sure that balancer can't move pinned blocks.
+   * If specified favoredNodes when create file, blocks will be pinned use 
+   * sticky bit.
+   * @throws Exception
+   */
+  @Test(timeout=100000)
+  public void testBalancerWithPinnedBlocks() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    conf.setBoolean(DFS_DATANODE_BLOCK_PINNING_ENABLED, true);
+    
+    long[] capacities =  new long[] { CAPACITY, CAPACITY };
+    String[] racks = { RACK0, RACK1 };
+    int numOfDatanodes = capacities.length;
+
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
+      .hosts(new String[]{"localhost", "localhost"})
+      .racks(racks).simulatedCapacities(capacities).build();
+
+    try {
+      cluster.waitActive();
+      client = NameNodeProxies.createProxy(conf,
+          cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
+      
+      // fill up the cluster to be 80% full
+      long totalCapacity = sum(capacities);
+      long totalUsedSpace = totalCapacity * 8 / 10;
+      InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes];
+      for (int i = 0; i < favoredNodes.length; i++) {
+        favoredNodes[i] = cluster.getDataNodes().get(i).getXferAddress();
+      }
+
+      DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024,
+          totalUsedSpace / numOfDatanodes, DEFAULT_BLOCK_SIZE,
+          (short) numOfDatanodes, 0, false, favoredNodes);
+      
+      // start up an empty node with the same capacity
+      cluster.startDataNodes(conf, 1, true, null, new String[] { RACK2 },
+          new long[] { CAPACITY });
+      
+      totalCapacity += CAPACITY;
+      
+      // run balancer and validate results
+      waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
+
+      // start rebalancing
+      Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+      int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
+      assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
+      
+    } finally {
+      cluster.shutdown();
+    }
+    
+  }
+  
+  /**
    * Wait until balanced: each datanode gives utilization within 
    * BALANCE_ALLOWED_VARIANCE of average
    * @throws IOException

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65a6cf47/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index ad6466e..5c543a7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -128,6 +128,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi>
{
     SimulatedOutputStream oStream = null;
     private long bytesAcked;
     private long bytesRcvd;
+    private boolean pinned = false;
     BInfo(String bpid, Block b, boolean forWriting) throws IOException {
       theBlock = new Block(b);
       if (theBlock.getNumBytes() < 0) {
@@ -1275,5 +1276,15 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi>
{
     // TODO Auto-generated method stub
     return null;
   }
+  
+  @Override
+  public void setPinning(ExtendedBlock b) throws IOException {
+    blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned = true;
+  }
+  
+  @Override
+  public boolean getPinning(ExtendedBlock b) throws IOException {
+    return blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65a6cf47/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
index f440bb6..fb219d7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
@@ -152,7 +152,7 @@ public class TestDiskError {
         BlockTokenSecretManager.DUMMY_TOKEN, "",
         new DatanodeInfo[0], new StorageType[0], null,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
-        checksum, CachingStrategy.newDefaultStrategy(), false);
+        checksum, CachingStrategy.newDefaultStrategy(), false, false, null);
     out.flush();
 
     // close the connection before sending the content of the block

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65a6cf47/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 87a0f10..bc91625 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -406,4 +406,13 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl>
{
   public long getNumBlocksFailedToUncache() {
     return 0;
   }
+
+  @Override
+  public void setPinning(ExtendedBlock block) throws IOException {    
+  }
+
+  @Override
+  public boolean getPinning(ExtendedBlock block) throws IOException {
+    return false;
+  }
 }


Mime
View raw message