hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aengin...@apache.org
Subject [37/48] hadoop git commit: HDFS-9694. Make existing DFSClient#getFileChecksum() work for striped blocks. Contributed by Kai Zheng
Date Mon, 28 Mar 2016 17:36:05 GMT
HDFS-9694. Make existing DFSClient#getFileChecksum() work for striped blocks. Contributed by Kai Zheng


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3a4ff777
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3a4ff777
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3a4ff777

Branch: refs/heads/HDFS-7240
Commit: 3a4ff7776e8fab6cc87932b9aa8fb48f7b69c720
Parents: a337ceb
Author: Uma Maheswara Rao G <uma.gangumalla@intel.com>
Authored: Sat Mar 26 19:58:09 2016 -0700
Committer: Uma Maheswara Rao G <uma.gangumalla@intel.com>
Committed: Sat Mar 26 19:58:09 2016 -0700

----------------------------------------------------------------------
 .../dev-support/findbugsExcludeFile.xml         |   1 +
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |  11 +-
 .../apache/hadoop/hdfs/FileChecksumHelper.java  | 187 ++++++++++--
 .../hadoop/hdfs/protocol/StripedBlockInfo.java  |  61 ++++
 .../datatransfer/DataTransferProtocol.java      |  16 +-
 .../hadoop/hdfs/protocol/datatransfer/Op.java   |   1 +
 .../hdfs/protocol/datatransfer/Sender.java      |  19 ++
 .../hadoop/hdfs/protocolPB/PBHelperClient.java  |  42 ++-
 .../hadoop/hdfs/util/StripedBlockUtil.java      |  12 +
 .../src/main/proto/datatransfer.proto           |   9 +-
 .../hdfs/protocol/datatransfer/Receiver.java    |  28 ++
 .../server/datanode/BlockChecksumHelper.java    | 284 +++++++++++++++----
 .../hdfs/server/datanode/DataXceiver.java       |  43 +++
 .../apache/hadoop/hdfs/TestFileChecksum.java    | 247 ++++++++++++++++
 14 files changed, 878 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a4ff777/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
index 2c3329e..9d6ab9a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
@@ -8,6 +8,7 @@
       <Class name="org.apache.hadoop.hdfs.protocol.LocatedBlock"/>
       <Class name="org.apache.hadoop.hdfs.protocol.BlockStoragePolicy"/>
       <Class name="org.apache.hadoop.hdfs.protocol.CorruptFileBlocks"/>
+      <Class name="org.apache.hadoop.hdfs.protocol.StripedBlockInfo"/>
       <Class name="org.apache.hadoop.hdfs.protocol.DirectoryListing"/>
       <Class name="org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier"/>
       <Class name="org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey"/>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a4ff777/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 3506d3a..88bd219 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -1704,7 +1704,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
 
   /**
    * Get the checksum of the whole file or a range of the file. Note that the
-   * range always starts from the beginning of the file.
+   * range always starts from the beginning of the file. The file can be
+   * in replicated form, or striped mode. It can be used to checksum and compare
+   * two replicated files, or two striped files, but not applicable for two
+   * files of different block layout forms.
    * @param src The file path
    * @param length the length of the range, i.e., the range is [0, length]
    * @return The checksum
@@ -1717,7 +1720,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
 
     LocatedBlocks blockLocations = getBlockLocations(src, length);
 
-    FileChecksumHelper.FileChecksumComputer maker =
+    FileChecksumHelper.FileChecksumComputer maker;
+    ErasureCodingPolicy ecPolicy = blockLocations.getErasureCodingPolicy();
+    maker = ecPolicy != null ?
+        new FileChecksumHelper.StripedFileNonStripedChecksumComputer(src,
+            length, blockLocations, namenode, this, ecPolicy) :
         new FileChecksumHelper.ReplicatedFileChecksumComputer(src, length,
             blockLocations, namenode, this);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a4ff777/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
index d15db9f..dfd9393 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
@@ -22,10 +22,13 @@ import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
@@ -75,7 +78,7 @@ final class FileChecksumHelper {
     private int bytesPerCRC = -1;
     private DataChecksum.Type crcType = DataChecksum.Type.DEFAULT;
     private long crcPerBlock = 0;
-    private boolean refetchBlocks = false;
+    private boolean isRefetchBlocks = false;
     private int lastRetriedIndex = -1;
 
     /**
@@ -127,8 +130,11 @@ final class FileChecksumHelper {
       return blockLocations;
     }
 
-    void setBlockLocations(LocatedBlocks blockLocations) {
-      this.blockLocations = blockLocations;
+    void refetchBlocks() throws IOException {
+      this.blockLocations = getClient().getBlockLocations(getSrc(),
+          getLength());
+      this.locatedBlocks = getBlockLocations().getLocatedBlocks();
+      this.isRefetchBlocks = false;
     }
 
     int getTimeout() {
@@ -143,10 +149,6 @@ final class FileChecksumHelper {
       return locatedBlocks;
     }
 
-    void setLocatedBlocks(List<LocatedBlock> locatedBlocks) {
-      this.locatedBlocks = locatedBlocks;
-    }
-
     long getRemaining() {
       return remaining;
     }
@@ -180,11 +182,11 @@ final class FileChecksumHelper {
     }
 
     boolean isRefetchBlocks() {
-      return refetchBlocks;
+      return isRefetchBlocks;
     }
 
     void setRefetchBlocks(boolean refetchBlocks) {
-      this.refetchBlocks = refetchBlocks;
+      this.isRefetchBlocks = refetchBlocks;
     }
 
     int getLastRetriedIndex() {
@@ -278,10 +280,7 @@ final class FileChecksumHelper {
            blockIdx < getLocatedBlocks().size() && getRemaining() >= 0;
            blockIdx++) {
         if (isRefetchBlocks()) {  // refetch to get fresh tokens
-          setBlockLocations(getClient().getBlockLocations(getSrc(),
-              getLength()));
-          setLocatedBlocks(getBlockLocations().getLocatedBlocks());
-          setRefetchBlocks(false);
+          refetchBlocks();
         }
 
         LocatedBlock locatedBlock = getLocatedBlocks().get(blockIdx);
@@ -380,15 +379,13 @@ final class FileChecksumHelper {
         }
 
         //read md5
-        final MD5Hash md5 = new MD5Hash(
-            checksumData.getMd5().toByteArray());
+        final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray());
         md5.write(getMd5out());
 
         // read crc-type
         final DataChecksum.Type ct;
         if (checksumData.hasCrcType()) {
-          ct = PBHelperClient.convert(checksumData
-              .getCrcType());
+          ct = PBHelperClient.convert(checksumData.getCrcType());
         } else {
           LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
               "inferring checksum by reading first byte");
@@ -413,4 +410,160 @@ final class FileChecksumHelper {
       }
     }
   }
+
+  /**
+   * Striped file checksum computing.
+   */
+  static class StripedFileNonStripedChecksumComputer
+      extends FileChecksumComputer {
+    private final ErasureCodingPolicy ecPolicy;
+    private int bgIdx;
+
+    StripedFileNonStripedChecksumComputer(String src, long length,
+                                          LocatedBlocks blockLocations,
+                                          ClientProtocol namenode,
+                                          DFSClient client,
+                                          ErasureCodingPolicy ecPolicy)
+        throws IOException {
+      super(src, length, blockLocations, namenode, client);
+
+      this.ecPolicy = ecPolicy;
+    }
+
+    @Override
+    void checksumBlocks() throws IOException {
+      int tmpTimeout = 3000 * 1 + getClient().getConf().getSocketTimeout();
+      setTimeout(tmpTimeout);
+
+      for (bgIdx = 0;
+           bgIdx < getLocatedBlocks().size() && getRemaining() >= 0; bgIdx++) {
+        if (isRefetchBlocks()) {  // refetch to get fresh tokens
+          refetchBlocks();
+        }
+
+        LocatedBlock locatedBlock = getLocatedBlocks().get(bgIdx);
+        LocatedStripedBlock blockGroup = (LocatedStripedBlock) locatedBlock;
+
+        if (!checksumBlockGroup(blockGroup)) {
+          throw new IOException("Fail to get block MD5 for " + locatedBlock);
+        }
+      }
+    }
+
+
+    private boolean checksumBlockGroup(
+        LocatedStripedBlock blockGroup) throws IOException {
+      ExtendedBlock block = blockGroup.getBlock();
+      if (getRemaining() < block.getNumBytes()) {
+        block.setNumBytes(getRemaining());
+      }
+      setRemaining(getRemaining() - block.getNumBytes());
+
+      StripedBlockInfo stripedBlockInfo = new StripedBlockInfo(block,
+          blockGroup.getLocations(), blockGroup.getBlockTokens(), ecPolicy);
+      DatanodeInfo[] datanodes = blockGroup.getLocations();
+
+      //try each datanode in the block group.
+      boolean done = false;
+      for (int j = 0; !done && j < datanodes.length; j++) {
+        try {
+          tryDatanode(blockGroup, stripedBlockInfo, datanodes[j]);
+          done = true;
+        } catch (InvalidBlockTokenException ibte) {
+          if (bgIdx > getLastRetriedIndex()) {
+            LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
+                    + "for file {} for block {} from datanode {}. Will retry "
+                    + "the block once.",
+                getSrc(), block, datanodes[j]);
+            setLastRetriedIndex(bgIdx);
+            done = true; // actually it's not done; but we'll retry
+            bgIdx--; // repeat at bgIdx-th block
+            setRefetchBlocks(true);
+          }
+        } catch (IOException ie) {
+          LOG.warn("src={}" + ", datanodes[{}]={}",
+              getSrc(), j, datanodes[j], ie);
+        }
+      }
+
+      return done;
+    }
+
+    /**
+     * Return true when sounds good to continue or retry, false when severe
+     * condition or totally failed.
+     */
+    private void tryDatanode(LocatedStripedBlock blockGroup,
+                             StripedBlockInfo stripedBlockInfo,
+                             DatanodeInfo datanode) throws IOException {
+
+      try (IOStreamPair pair = getClient().connectToDN(datanode,
+          getTimeout(), blockGroup.getBlockToken())) {
+
+        LOG.debug("write to {}: {}, blockGroup={}",
+            datanode, Op.BLOCK_GROUP_CHECKSUM, blockGroup);
+
+        // get block MD5
+        createSender(pair).blockGroupChecksum(stripedBlockInfo,
+            blockGroup.getBlockToken());
+
+        BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(
+            PBHelperClient.vintPrefixed(pair.in));
+
+        String logInfo = "for blockGroup " + blockGroup +
+            " from datanode " + datanode;
+        DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
+
+        OpBlockChecksumResponseProto checksumData = reply.getChecksumResponse();
+
+        //read byte-per-checksum
+        final int bpc = checksumData.getBytesPerCrc();
+        if (bgIdx == 0) { //first block
+          setBytesPerCRC(bpc);
+        } else {
+          if (bpc != getBytesPerCRC()) {
+            throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
+                + " but bytesPerCRC=" + getBytesPerCRC());
+          }
+        }
+
+        //read crc-per-block
+        final long cpb = checksumData.getCrcPerBlock();
+        if (getLocatedBlocks().size() > 1 && bgIdx == 0) { // first block
+          setCrcPerBlock(cpb);
+        }
+
+        //read md5
+        final MD5Hash md5 = new MD5Hash(
+            checksumData.getMd5().toByteArray());
+        md5.write(getMd5out());
+
+        // read crc-type
+        final DataChecksum.Type ct;
+        if (checksumData.hasCrcType()) {
+          ct = PBHelperClient.convert(checksumData.getCrcType());
+        } else {
+          LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
+              "inferring checksum by reading first byte");
+          ct = getClient().inferChecksumTypeByReading(blockGroup, datanode);
+        }
+
+        if (bgIdx == 0) {
+          setCrcType(ct);
+        } else if (getCrcType() != DataChecksum.Type.MIXED &&
+            getCrcType() != ct) {
+          // if crc types are mixed in a file
+          setCrcType(DataChecksum.Type.MIXED);
+        }
+
+        if (LOG.isDebugEnabled()) {
+          if (bgIdx == 0) {
+            LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
+                + ", crcPerBlock=" + getCrcPerBlock());
+          }
+          LOG.debug("got reply from " + datanode + ": md5=" + md5);
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a4ff777/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/StripedBlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/StripedBlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/StripedBlockInfo.java
new file mode 100644
index 0000000..74e8081
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/StripedBlockInfo.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * Striped block info that can be sent elsewhere to do block group level things,
+ * like checksum, and etc.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class StripedBlockInfo {
+  private final ExtendedBlock block;
+  private final DatanodeInfo[] datanodes;
+  private final Token<BlockTokenIdentifier>[] blockTokens;
+  private final ErasureCodingPolicy ecPolicy;
+
+  public StripedBlockInfo(ExtendedBlock block, DatanodeInfo[] datanodes,
+                          Token<BlockTokenIdentifier>[] blockTokens,
+                          ErasureCodingPolicy ecPolicy) {
+    this.block = block;
+    this.datanodes = datanodes;
+    this.blockTokens = blockTokens;
+    this.ecPolicy = ecPolicy;
+  }
+
+  public ExtendedBlock getBlock() {
+    return block;
+  }
+
+  public DatanodeInfo[] getDatanodes() {
+    return datanodes;
+  }
+
+  public Token<BlockTokenIdentifier>[] getBlockTokens() {
+    return blockTokens;
+  }
+
+  public ErasureCodingPolicy getErasureCodingPolicy() {
+    return ecPolicy;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a4ff777/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
index 4aa545b..ad3f2ad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
@@ -197,6 +198,17 @@ public interface DataTransferProtocol {
    * @param blockToken security token for accessing the block.
    * @throws IOException
    */
-  void blockChecksum(final ExtendedBlock blk,
-      final Token<BlockTokenIdentifier> blockToken) throws IOException;
+  void blockChecksum(ExtendedBlock blk,
+      Token<BlockTokenIdentifier> blockToken) throws IOException;
+
+
+  /**
+   * Get striped block group checksum (MD5 of CRC32).
+   *
+   * @param stripedBlockInfo a striped block info.
+   * @param blockToken security token for accessing the block.
+   * @throws IOException
+   */
+  void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
+          Token<BlockTokenIdentifier> blockToken) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a4ff777/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
index 511574c..94250e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
@@ -38,6 +38,7 @@ public enum Op {
   REQUEST_SHORT_CIRCUIT_FDS((byte)87),
   RELEASE_SHORT_CIRCUIT_FDS((byte)88),
   REQUEST_SHORT_CIRCUIT_SHM((byte)89),
+  BLOCK_GROUP_CHECKSUM((byte)90),
   CUSTOM((byte)127);
 
   /** The code for this operation. */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a4ff777/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
index 6545681..585ed99 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -28,11 +28,13 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
@@ -261,4 +263,21 @@ public class Sender implements DataTransferProtocol {
 
     send(out, Op.BLOCK_CHECKSUM, proto);
   }
+
+  @Override
+  public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
+         Token<BlockTokenIdentifier> blockToken) throws IOException {
+    OpBlockGroupChecksumProto proto = OpBlockGroupChecksumProto.newBuilder()
+        .setHeader(DataTransferProtoUtil.buildBaseHeader(
+            stripedBlockInfo.getBlock(), blockToken))
+        .setDatanodes(PBHelperClient.convertToProto(
+            stripedBlockInfo.getDatanodes()))
+        .addAllBlockTokens(PBHelperClient.convert(
+            stripedBlockInfo.getBlockTokens()))
+        .setEcPolicy(PBHelperClient.convertErasureCodingPolicy(
+            stripedBlockInfo.getErasureCodingPolicy()))
+        .build();
+
+    send(out, Op.BLOCK_GROUP_CHECKSUM, proto);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a4ff777/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index 38e875c..4759373 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -553,10 +553,8 @@ public class PBHelperClient {
           proto.getCorrupt(),
           cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()]));
       List<TokenProto> tokenProtos = proto.getBlockTokensList();
-      Token<BlockTokenIdentifier>[] blockTokens = new Token[indices.length];
-      for (int i = 0; i < indices.length; i++) {
-        blockTokens[i] = convert(tokenProtos.get(i));
-      }
+      Token<BlockTokenIdentifier>[] blockTokens =
+          convertTokens(tokenProtos);
       ((LocatedStripedBlock) lb).setBlockTokens(blockTokens);
     }
     lb.setBlockToken(convert(proto.getBlockToken()));
@@ -564,6 +562,18 @@ public class PBHelperClient {
     return lb;
   }
 
+  static public Token<BlockTokenIdentifier>[] convertTokens(
+      List<TokenProto> tokenProtos) {
+
+    @SuppressWarnings("unchecked")
+    Token<BlockTokenIdentifier>[] blockTokens = new Token[tokenProtos.size()];
+    for (int i = 0; i < blockTokens.length; i++) {
+      blockTokens[i] = convert(tokenProtos.get(i));
+    }
+
+    return blockTokens;
+  }
+
   static public DatanodeInfo convert(DatanodeInfoProto di) {
     if (di == null) return null;
     return new DatanodeInfo(
@@ -815,9 +825,7 @@ public class PBHelperClient {
       byte[] indices = sb.getBlockIndices();
       builder.setBlockIndices(PBHelperClient.getByteString(indices));
       Token<BlockTokenIdentifier>[] blockTokens = sb.getBlockTokens();
-      for (int i = 0; i < indices.length; i++) {
-        builder.addBlockTokens(PBHelperClient.convert(blockTokens[i]));
-      }
+      builder.addAllBlockTokens(convert(blockTokens));
     }
 
     return builder.setB(PBHelperClient.convert(b.getBlock()))
@@ -825,6 +833,16 @@ public class PBHelperClient {
         .setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
   }
 
+  public static List<TokenProto> convert(
+      Token<BlockTokenIdentifier>[] blockTokens) {
+    List<TokenProto> results = new ArrayList<>(blockTokens.length);
+    for (Token<BlockTokenIdentifier> bt : blockTokens) {
+      results.add(convert(bt));
+    }
+
+    return results;
+  }
+
   public static BlockStoragePolicy convert(BlockStoragePolicyProto proto) {
     List<StorageTypeProto> cList = proto.getCreationPolicy()
         .getStorageTypesList();
@@ -2500,4 +2518,14 @@ public class PBHelperClient {
         .setId(policy.getId());
     return builder.build();
   }
+
+  public static HdfsProtos.DatanodeInfosProto convertToProto(
+      DatanodeInfo[] datanodeInfos) {
+    HdfsProtos.DatanodeInfosProto.Builder builder =
+        HdfsProtos.DatanodeInfosProto.newBuilder();
+    for (DatanodeInfo datanodeInfo : datanodeInfos) {
+      builder.addDatanodes(PBHelperClient.convert(datanodeInfo));
+    }
+    return builder.build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a4ff777/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index 43772e2..0819376 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -76,6 +76,18 @@ public class StripedBlockUtil {
   public static final Logger LOG = LoggerFactory.getLogger(StripedBlockUtil.class);
 
   /**
+   * Parses a striped block group into individual blocks.
+   * @param bg The striped block group
+   * @param ecPolicy The erasure coding policy
+   * @return An array of the blocks in the group
+   */
+  public static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg,
+                                               ErasureCodingPolicy ecPolicy) {
+    return parseStripedBlockGroup(bg, ecPolicy.getCellSize(),
+        ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
+  }
+
+  /**
    * This method parses a striped block group into individual blocks.
    *
    * @param bg The striped block group

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a4ff777/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
index a091d41..522ee06 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
@@ -74,7 +74,6 @@ message OpReadBlockProto {
   optional CachingStrategyProto cachingStrategy = 5;
 }
 
-
 message ChecksumProto {
   required ChecksumTypeProto type = 1;
   required uint32 bytesPerChecksum = 2;
@@ -149,6 +148,14 @@ message OpBlockChecksumProto {
   required BaseHeaderProto header = 1;
 }
 
+message OpBlockGroupChecksumProto {
+  required BaseHeaderProto header = 1;
+  required DatanodeInfosProto datanodes = 2;
+  // each internal block has a block token
+  repeated hadoop.common.TokenProto blockTokens = 3;
+  required ErasureCodingPolicyProto ecPolicy = 4;
+}
+
 /**
  * An ID uniquely identifying a shared memory segment.
  */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a4ff777/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index e040157..b2f26f8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -26,11 +26,13 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
@@ -111,6 +113,9 @@ public abstract class Receiver implements DataTransferProtocol {
     case BLOCK_CHECKSUM:
       opBlockChecksum(in);
       break;
+    case BLOCK_GROUP_CHECKSUM:
+      opStripedBlockChecksum(in);
+      break;
     case TRANSFER_BLOCK:
       opTransferBlock(in);
       break;
@@ -290,4 +295,27 @@ public abstract class Receiver implements DataTransferProtocol {
       if (traceScope != null) traceScope.close();
     }
   }
+
+  /** Receive OP_STRIPED_BLOCK_CHECKSUM. */
+  private void opStripedBlockChecksum(DataInputStream dis) throws IOException {
+    OpBlockGroupChecksumProto proto =
+        OpBlockGroupChecksumProto.parseFrom(vintPrefixed(dis));
+    TraceScope traceScope = continueTraceSpan(proto.getHeader(),
+        proto.getClass().getSimpleName());
+    StripedBlockInfo stripedBlockInfo = new StripedBlockInfo(
+        PBHelperClient.convert(proto.getHeader().getBlock()),
+        PBHelperClient.convert(proto.getDatanodes()),
+        PBHelperClient.convertTokens(proto.getBlockTokensList()),
+        PBHelperClient.convertErasureCodingPolicy(proto.getEcPolicy())
+    );
+
+    try {
+      blockGroupChecksum(stripedBlockInfo,
+          PBHelperClient.convert(proto.getHeader().getToken()));
+    } finally {
+      if (traceScope != null) {
+        traceScope.close();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a4ff777/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
index 9a5552d..1f1a25c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
@@ -19,16 +19,30 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.security.MessageDigest;
@@ -41,13 +55,87 @@ final class BlockChecksumHelper {
 
   static final Logger LOG = LoggerFactory.getLogger(BlockChecksumHelper.class);
 
-  private BlockChecksumHelper() {}
+  private BlockChecksumHelper() {
+  }
 
   /**
    * The abstract base block checksum computer.
    */
-  static abstract class BlockChecksumComputer {
+  static abstract class AbstractBlockChecksumComputer {
     private final DataNode datanode;
+
+    private byte[] outBytes;
+    private int bytesPerCRC = -1;
+    private DataChecksum.Type crcType = null;
+    private long crcPerBlock = -1;
+    private int checksumSize = -1;
+
+    AbstractBlockChecksumComputer(DataNode datanode) throws IOException {
+      this.datanode = datanode;
+    }
+
+    abstract void compute() throws IOException;
+
+    Sender createSender(IOStreamPair pair) {
+      DataOutputStream out = (DataOutputStream) pair.out;
+      return new Sender(out);
+    }
+
+    DataNode getDatanode() {
+      return datanode;
+    }
+
+    InputStream getBlockInputStream(ExtendedBlock block, long seekOffset)
+        throws IOException {
+      return datanode.data.getBlockInputStream(block, seekOffset);
+    }
+
+    void setOutBytes(byte[] bytes) {
+      this.outBytes = bytes;
+    }
+
+    byte[] getOutBytes() {
+      return outBytes;
+    }
+
+    int getBytesPerCRC() {
+      return bytesPerCRC;
+    }
+
+    public void setBytesPerCRC(int bytesPerCRC) {
+      this.bytesPerCRC = bytesPerCRC;
+    }
+
+    public void setCrcType(DataChecksum.Type crcType) {
+      this.crcType = crcType;
+    }
+
+    public void setCrcPerBlock(long crcPerBlock) {
+      this.crcPerBlock = crcPerBlock;
+    }
+
+    public void setChecksumSize(int checksumSize) {
+      this.checksumSize = checksumSize;
+    }
+
+    DataChecksum.Type getCrcType() {
+      return crcType;
+    }
+
+    long getCrcPerBlock() {
+      return crcPerBlock;
+    }
+
+    int getChecksumSize() {
+      return checksumSize;
+    }
+  }
+
+  /**
+   * The abstract base block checksum computer.
+   */
+  static abstract class BlockChecksumComputer
+      extends AbstractBlockChecksumComputer {
     private final ExtendedBlock block;
     // client side now can specify a range of the block for checksum
     private final long requestLength;
@@ -56,17 +144,12 @@ final class BlockChecksumHelper {
     private final long visibleLength;
     private final boolean partialBlk;
 
-    private byte[] outBytes;
-    private int bytesPerCRC = -1;
-    private DataChecksum.Type crcType = null;
-    private long crcPerBlock = -1;
-    private int checksumSize = -1;
     private BlockMetadataHeader header;
     private DataChecksum checksum;
 
     BlockChecksumComputer(DataNode datanode,
                           ExtendedBlock block) throws IOException {
-      this.datanode = datanode;
+      super(datanode);
       this.block = block;
       this.requestLength = block.getNumBytes();
       Preconditions.checkArgument(requestLength >= 0);
@@ -81,98 +164,80 @@ final class BlockChecksumHelper {
           new BufferedInputStream(metadataIn, ioFileBufferSize));
     }
 
-    protected DataNode getDatanode() {
-      return datanode;
+    Sender createSender(IOStreamPair pair) {
+      DataOutputStream out = (DataOutputStream) pair.out;
+      return new Sender(out);
     }
 
-    protected ExtendedBlock getBlock() {
+
+    ExtendedBlock getBlock() {
       return block;
     }
 
-    protected long getRequestLength() {
+    long getRequestLength() {
       return requestLength;
     }
 
-    protected LengthInputStream getMetadataIn() {
+    LengthInputStream getMetadataIn() {
       return metadataIn;
     }
 
-    protected DataInputStream getChecksumIn() {
+    DataInputStream getChecksumIn() {
       return checksumIn;
     }
 
-    protected long getVisibleLength() {
+    long getVisibleLength() {
       return visibleLength;
     }
 
-    protected boolean isPartialBlk() {
+    boolean isPartialBlk() {
       return partialBlk;
     }
 
-    protected void setOutBytes(byte[] bytes) {
-      this.outBytes = bytes;
-    }
-
-    protected byte[] getOutBytes() {
-      return outBytes;
-    }
-
-    protected int getBytesPerCRC() {
-      return bytesPerCRC;
-    }
-
-    protected DataChecksum.Type getCrcType() {
-      return crcType;
-    }
-
-    protected long getCrcPerBlock() {
-      return crcPerBlock;
-    }
-
-    protected int getChecksumSize() {
-      return checksumSize;
-    }
-
-    protected BlockMetadataHeader getHeader() {
+    BlockMetadataHeader getHeader() {
       return header;
     }
 
-    protected DataChecksum getChecksum() {
+    DataChecksum getChecksum() {
       return checksum;
     }
 
     /**
      * Perform the block checksum computing.
+     *
      * @throws IOException
      */
     abstract void compute() throws IOException;
 
     /**
      * Read block metadata header.
+     *
      * @throws IOException
      */
-    protected void readHeader() throws IOException {
+    void readHeader() throws IOException {
       //read metadata file
       header = BlockMetadataHeader.readHeader(checksumIn);
       checksum = header.getChecksum();
-      checksumSize = checksum.getChecksumSize();
-      bytesPerCRC = checksum.getBytesPerChecksum();
-      crcPerBlock = checksumSize <= 0 ? 0 :
+      setChecksumSize(checksum.getChecksumSize());
+      setBytesPerCRC(checksum.getBytesPerChecksum());
+      long crcPerBlock = checksum.getChecksumSize() <= 0 ? 0 :
           (metadataIn.getLength() -
-              BlockMetadataHeader.getHeaderSize()) / checksumSize;
-      crcType = checksum.getChecksumType();
+              BlockMetadataHeader.getHeaderSize()) / checksum.getChecksumSize();
+      setCrcPerBlock(crcPerBlock);
+      setCrcType(checksum.getChecksumType());
     }
 
     /**
      * Calculate partial block checksum.
+     *
      * @return
      * @throws IOException
      */
-    protected byte[] crcPartialBlock() throws IOException {
-      int partialLength = (int) (requestLength % bytesPerCRC);
+    byte[] crcPartialBlock() throws IOException {
+      int partialLength = (int) (requestLength % getBytesPerCRC());
       if (partialLength > 0) {
         byte[] buf = new byte[partialLength];
-        final InputStream blockIn = datanode.data.getBlockInputStream(block,
+        final InputStream blockIn = getBlockInputStream(block,
             requestLength - partialLength);
         try {
           // Get the CRC of the partialLength.
@@ -181,7 +246,7 @@ final class BlockChecksumHelper {
           IOUtils.closeStream(blockIn);
         }
         checksum.update(buf, 0, partialLength);
-        byte[] partialCrc = new byte[checksumSize];
+        byte[] partialCrc = new byte[getChecksumSize()];
         checksum.writeValue(partialCrc, 0, true);
         return partialCrc;
       }
@@ -229,7 +294,7 @@ final class BlockChecksumHelper {
     }
 
     private MD5Hash checksumPartialBlock() throws IOException {
-      byte[] buffer = new byte[4*1024];
+      byte[] buffer = new byte[4 * 1024];
       MessageDigest digester = MD5Hash.getDigester();
 
       long remaining = (getRequestLength() / getBytesPerCRC())
@@ -251,4 +316,115 @@ final class BlockChecksumHelper {
       return new MD5Hash(digester.digest());
     }
   }
-}
+
+  /**
+   * Non-striped block group checksum computer for striped blocks.
+   */
+  static class BlockGroupNonStripedChecksumComputer
+      extends AbstractBlockChecksumComputer {
+
+    private final ExtendedBlock blockGroup;
+    private final ErasureCodingPolicy ecPolicy;
+    private final DatanodeInfo[] datanodes;
+    private final Token<BlockTokenIdentifier>[] blockTokens;
+
+    private final DataOutputBuffer md5writer = new DataOutputBuffer();
+
+    BlockGroupNonStripedChecksumComputer(DataNode datanode,
+                                         StripedBlockInfo stripedBlockInfo)
+        throws IOException {
+      super(datanode);
+      this.blockGroup = stripedBlockInfo.getBlock();
+      this.ecPolicy = stripedBlockInfo.getErasureCodingPolicy();
+      this.datanodes = stripedBlockInfo.getDatanodes();
+      this.blockTokens = stripedBlockInfo.getBlockTokens();
+    }
+
+    @Override
+    void compute() throws IOException {
+      for (int idx = 0; idx < ecPolicy.getNumDataUnits(); idx++) {
+        ExtendedBlock block =
+            StripedBlockUtil.constructInternalBlock(blockGroup,
+            ecPolicy.getCellSize(), ecPolicy.getNumDataUnits(), idx);
+        DatanodeInfo targetDatanode = datanodes[idx];
+        Token<BlockTokenIdentifier> blockToken = blockTokens[idx];
+        checksumBlock(block, idx, blockToken, targetDatanode);
+      }
+
+      MD5Hash md5out = MD5Hash.digest(md5writer.getData());
+      setOutBytes(md5out.getDigest());
+    }
+
+    private void checksumBlock(ExtendedBlock block, int blockIdx,
+                               Token<BlockTokenIdentifier> blockToken,
+                               DatanodeInfo targetDatanode) throws IOException {
+      int timeout = 3000;
+      try (IOStreamPair pair = getDatanode().connectToDN(targetDatanode,
+          timeout, block, blockToken)) {
+
+        LOG.debug("write to {}: {}, block={}",
+            getDatanode(), Op.BLOCK_CHECKSUM, block);
+
+        // get block MD5
+        createSender(pair).blockChecksum(block, blockToken);
+
+        final DataTransferProtos.BlockOpResponseProto reply =
+            DataTransferProtos.BlockOpResponseProto.parseFrom(
+                PBHelperClient.vintPrefixed(pair.in));
+
+        String logInfo = "for block " + block
+            + " from datanode " + targetDatanode;
+        DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
+
+        DataTransferProtos.OpBlockChecksumResponseProto checksumData =
+            reply.getChecksumResponse();
+
+        //read byte-per-checksum
+        final int bpc = checksumData.getBytesPerCrc();
+        if (blockIdx == 0) { //first block
+          setBytesPerCRC(bpc);
+        } else if (bpc != getBytesPerCRC()) {
+          throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
+              + " but bytesPerCRC=" + getBytesPerCRC());
+        }
+
+        //read crc-per-block
+        final long cpb = checksumData.getCrcPerBlock();
+        if (blockIdx == 0) {
+          setCrcPerBlock(cpb);
+        }
+
+        //read md5
+        final MD5Hash md5 = new MD5Hash(
+            checksumData.getMd5().toByteArray());
+        md5.write(md5writer);
+
+        // read crc-type
+        final DataChecksum.Type ct;
+        if (checksumData.hasCrcType()) {
+          ct = PBHelperClient.convert(checksumData.getCrcType());
+        } else {
+          LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
+              "inferring checksum by reading first byte");
+          ct = DataChecksum.Type.DEFAULT;
+        }
+
+        if (blockIdx == 0) { // first block
+          setCrcType(ct);
+        } else if (getCrcType() != DataChecksum.Type.MIXED &&
+            getCrcType() != ct) {
+          // if crc types are mixed in a file
+          setCrcType(DataChecksum.Type.MIXED);
+        }
+
+        if (LOG.isDebugEnabled()) {
+          if (blockIdx == 0) {
+            LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
+                + ", crcPerBlock=" + getCrcPerBlock());
+          }
+          LOG.debug("got reply from " + targetDatanode + ": md5=" + md5);
+        }
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a4ff777/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 1d4a79a..63bf5ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
@@ -46,7 +47,9 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.BlockChecksumComputer;
+import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.AbstractBlockChecksumComputer;
 import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.ReplicatedBlockChecksumComputer;
+import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.BlockGroupNonStripedChecksumComputer;
 import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
 import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
 import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.NewShmInfo;
@@ -924,6 +927,46 @@ class DataXceiver extends Receiver implements Runnable {
   }
 
   @Override
+  public void blockGroupChecksum(final StripedBlockInfo stripedBlockInfo,
+                                 final Token<BlockTokenIdentifier> blockToken)
+      throws IOException {
+    updateCurrentThreadName("Getting checksum for block group" +
+        stripedBlockInfo.getBlock());
+    final DataOutputStream out = new DataOutputStream(getOutputStream());
+    checkAccess(out, true, stripedBlockInfo.getBlock(), blockToken,
+        Op.BLOCK_GROUP_CHECKSUM, BlockTokenIdentifier.AccessMode.READ);
+
+    AbstractBlockChecksumComputer maker =
+        new BlockGroupNonStripedChecksumComputer(datanode, stripedBlockInfo);
+
+    try {
+      maker.compute();
+
+      //write reply
+      BlockOpResponseProto.newBuilder()
+          .setStatus(SUCCESS)
+          .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder()
+              .setBytesPerCrc(maker.getBytesPerCRC())
+              .setCrcPerBlock(maker.getCrcPerBlock())
+              .setMd5(ByteString.copyFrom(maker.getOutBytes()))
+              .setCrcType(PBHelperClient.convert(maker.getCrcType())))
+          .build()
+          .writeDelimitedTo(out);
+      out.flush();
+    } catch (IOException ioe) {
+      LOG.info("blockChecksum " + stripedBlockInfo.getBlock() +
+          " received exception " + ioe);
+      incrDatanodeNetworkErrors();
+      throw ioe;
+    } finally {
+      IOUtils.closeStream(out);
+    }
+
+    //update metrics
+    datanode.metrics.addBlockChecksumOp(elapsed());
+  }
+
+  @Override
   public void copyBlock(final ExtendedBlock block,
       final Token<BlockTokenIdentifier> blockToken) throws IOException {
     updateCurrentThreadName("Copying block " + block);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a4ff777/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java
new file mode 100644
index 0000000..7cee344
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * This test serves a prototype to demo the idea proposed so far. It creates two
+ * files using the same data, one is in replica mode, the other is in stripped
+ * layout. For simple, it assumes 6 data blocks in both files and the block size
+ * are the same.
+ */
+public class TestFileChecksum {
+  public static final Log LOG = LogFactory.getLog(TestFileChecksum.class);
+
+  private int dataBlocks = StripedFileTestUtil.NUM_DATA_BLOCKS;
+  private int parityBlocks = StripedFileTestUtil.NUM_PARITY_BLOCKS;
+
+  private MiniDFSCluster cluster;
+  private DistributedFileSystem fs;
+  private Configuration conf;
+  private DFSClient client;
+
+  private int cellSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
+  private int stripesPerBlock = 6;
+  private int blockSize = cellSize * stripesPerBlock;
+  private int numBlockGroups = 10;
+  private int stripSize = cellSize * dataBlocks;
+  private int blockGroupSize = stripesPerBlock * stripSize;
+  private int fileSize = numBlockGroups * blockGroupSize;
+
+  private String ecDir = "/striped";
+  private String stripedFile1 = ecDir + "/stripedFileChecksum1";
+  private String stripedFile2 = ecDir + "/stripedFileChecksum2";
+  private String replicatedFile = "/replicatedFileChecksum";
+
+  @Before
+  public void setup() throws IOException {
+    int numDNs = dataBlocks + parityBlocks + 2;
+    conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
+        false);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+    Path ecPath = new Path(ecDir);
+    cluster.getFileSystem().mkdir(ecPath, FsPermission.getDirDefault());
+    cluster.getFileSystem().getClient().setErasureCodingPolicy(ecDir, null);
+    fs = cluster.getFileSystem();
+    client = fs.getClient();
+
+    prepareTestFiles();
+
+    getDataNodeToKill(stripedFile1);
+    getDataNodeToKill(replicatedFile);
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @Test
+  public void testStripedFileChecksum1() throws Exception {
+    int length = 0;
+    testStripedFileChecksum(length, length + 10);
+  }
+
+  @Test
+  public void testStripedFileChecksum2() throws Exception {
+    int length = stripSize - 1;
+    testStripedFileChecksum(length, length - 10);
+  }
+
+  @Test
+  public void testStripedFileChecksum3() throws Exception {
+    int length = stripSize;
+    testStripedFileChecksum(length, length - 10);
+  }
+
+  @Test
+  public void testStripedFileChecksum4() throws Exception {
+    int length = stripSize + cellSize * 2;
+    testStripedFileChecksum(length, length - 10);
+  }
+
+  @Test
+  public void testStripedFileChecksum5() throws Exception {
+    int length = blockGroupSize;
+    testStripedFileChecksum(length, length - 10);
+  }
+
+  @Test
+  public void testStripedFileChecksum6() throws Exception {
+    int length = blockGroupSize + blockSize;
+    testStripedFileChecksum(length, length - 10);
+  }
+
+  @Test
+  public void testStripedFileChecksum7() throws Exception {
+    int length = -1; // whole file
+    testStripedFileChecksum(length, fileSize);
+  }
+
+  void testStripedFileChecksum(int range1, int range2) throws Exception {
+    FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1,
+        range1, false);
+    FileChecksum stripedFileChecksum2 = getFileChecksum(stripedFile2,
+        range1, false);
+    FileChecksum stripedFileChecksum3 = getFileChecksum(stripedFile2,
+        range2, false);
+
+    LOG.info("stripedFileChecksum1:" + stripedFileChecksum1);
+    LOG.info("stripedFileChecksum2:" + stripedFileChecksum2);
+    LOG.info("stripedFileChecksum3:" + stripedFileChecksum3);
+
+    Assert.assertTrue(stripedFileChecksum1.equals(stripedFileChecksum2));
+    if (range1 >=0 && range1 != range2) {
+      Assert.assertFalse(stripedFileChecksum1.equals(stripedFileChecksum3));
+    }
+  }
+
+  @Test
+  public void testStripedAndReplicatedFileChecksum() throws Exception {
+    FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1,
+        10, false);
+    FileChecksum replicatedFileChecksum = getFileChecksum(replicatedFile,
+        10, false);
+
+    Assert.assertFalse(stripedFileChecksum1.equals(replicatedFileChecksum));
+  }
+
+  /*
+  // TODO: allow datanode failure, HDFS-9833
+  @Test
+  public void testStripedAndReplicatedWithFailure() throws Exception {
+    FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1,
+        10, true);
+    FileChecksum replicatedFileChecksum = getFileChecksum(replicatedFile,
+        10, true);
+
+    Assert.assertFalse(stripedFileChecksum1.equals(replicatedFileChecksum));
+  }*/
+
+  private FileChecksum getFileChecksum(String filePath, int range,
+                                       boolean killDn) throws Exception {
+    int dnIdxToDie = -1;
+    if (killDn) {
+      dnIdxToDie = getDataNodeToKill(filePath);
+      DataNode dnToDie = cluster.getDataNodes().get(dnIdxToDie);
+      shutdownDataNode(dnToDie);
+    }
+
+    Path testPath = new Path(filePath);
+    FileChecksum fc;
+
+    if (range >= 0) {
+      fc = fs.getFileChecksum(testPath, range);
+    } else {
+      fc = fs.getFileChecksum(testPath);
+    }
+
+    if (dnIdxToDie != -1) {
+      cluster.restartDataNode(dnIdxToDie, true);
+    }
+
+    return fc;
+  }
+
+  void prepareTestFiles() throws IOException {
+    byte[] fileData = StripedFileTestUtil.generateBytes(fileSize);
+
+    String[] filePaths = new String[] {
+        stripedFile1, stripedFile2, replicatedFile
+    };
+
+    for (String filePath : filePaths) {
+      Path testPath = new Path(filePath);
+      DFSTestUtil.writeFile(fs, testPath, fileData);
+    }
+  }
+
+  void shutdownDataNode(DataNode dataNode) throws IOException {
+    /*
+     * Kill the datanode which contains one replica
+     * We need to make sure it dead in namenode: clear its update time and
+     * trigger NN to check heartbeat.
+     */
+    dataNode.shutdown();
+    cluster.setDataNodeDead(dataNode.getDatanodeId());
+  }
+
+  /**
+   * Determine the datanode that hosts the first block of the file. For simple
+   * this just returns the first datanode as it's firstly tried.
+   */
+  int getDataNodeToKill(String filePath) throws IOException {
+    LocatedBlocks locatedBlocks = client.getLocatedBlocks(filePath, 0);
+
+    LocatedBlock locatedBlock = locatedBlocks.get(0);
+    DatanodeInfo[] datanodes = locatedBlock.getLocations();
+    DatanodeInfo chosenDn = datanodes[0];
+
+    int idx = 0;
+    for (DataNode dn : cluster.getDataNodes()) {
+      if (dn.getInfoPort() == chosenDn.getInfoPort()) {
+        return idx;
+      }
+      idx++;
+    }
+
+    return -1;
+  }
+}


Mime
View raw message