hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ji...@apache.org
Subject svn commit: r1596931 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src: main/java/org/apache/hadoop/fs/ main/java/org/apache/hadoop/hdfs/ main/java/org/apache/hadoop/hdfs/server/datanode/ main/java/org/apache/hadoop/hdfs/server/datanode/web...
Date Thu, 22 May 2014 18:17:12 GMT
Author: jing9
Date: Thu May 22 18:17:11 2014
New Revision: 1596931

URL: http://svn.apache.org/r1596931
Log:
MAPREDUCE-5899. Support incremental data copy in DistCp. Contributed by Jing Zhao.

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetFileChecksum.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java?rev=1596931&r1=1596930&r2=1596931&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
Thu May 22 18:17:11 2014
@@ -116,7 +116,7 @@ public class Hdfs extends AbstractFileSy
   @Override
   public FileChecksum getFileChecksum(Path f) 
       throws IOException, UnresolvedLinkException {
-    return dfs.getFileChecksum(getUriPath(f));
+    return dfs.getFileChecksum(getUriPath(f), Long.MAX_VALUE);
   }
 
   @Override

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1596931&r1=1596930&r2=1596931&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
Thu May 22 18:17:11 2014
@@ -1801,15 +1801,19 @@ public class DFSClient implements java.i
    }
 
   /**
-   * Get the checksum of a file.
+   * Get the checksum of the whole file of a range of the file. Note that the
+   * range always starts from the beginning of the file.
    * @param src The file path
+   * @param length The length of the range
    * @return The checksum 
    * @see DistributedFileSystem#getFileChecksum(Path)
    */
-  public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
+  public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
+      throws IOException {
     checkOpen();
-    return getFileChecksum(src, clientName, namenode, socketFactory,
-        dfsClientConf.socketTimeout, getDataEncryptionKey(),
+    Preconditions.checkArgument(length >= 0);
+    return getFileChecksum(src, length, clientName, namenode,
+        socketFactory, dfsClientConf.socketTimeout, getDataEncryptionKey(),
         dfsClientConf.connectToDnViaHostname);
   }
   
@@ -1850,8 +1854,9 @@ public class DFSClient implements java.i
   }
 
   /**
-   * Get the checksum of a file.
+   * Get the checksum of the whole file or a range of the file.
    * @param src The file path
+   * @param length the length of the range, i.e., the range is [0, length]
    * @param clientName the name of the client requesting the checksum.
    * @param namenode the RPC proxy for the namenode
    * @param socketFactory to create sockets to connect to DNs
@@ -1861,12 +1866,13 @@ public class DFSClient implements java.i
    * @return The checksum 
    */
   private static MD5MD5CRC32FileChecksum getFileChecksum(String src,
-      String clientName,
-      ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
+      long length, String clientName, ClientProtocol namenode,
+      SocketFactory socketFactory, int socketTimeout,
       DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
       throws IOException {
-    //get all block locations
-    LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE);
+    //get block locations for the file range
+    LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0,
+        length);
     if (null == blockLocations) {
       throw new FileNotFoundException("File does not exist: " + src);
     }
@@ -1878,10 +1884,11 @@ public class DFSClient implements java.i
     boolean refetchBlocks = false;
     int lastRetriedIndex = -1;
 
-    //get block checksum for each block
-    for(int i = 0; i < locatedblocks.size(); i++) {
+    // get block checksum for each block
+    long remaining = length;
+    for(int i = 0; i < locatedblocks.size() && remaining > 0; i++) {
       if (refetchBlocks) {  // refetch to get fresh tokens
-        blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE);
+        blockLocations = callGetBlockLocations(namenode, src, 0, length);
         if (null == blockLocations) {
           throw new FileNotFoundException("File does not exist: " + src);
         }
@@ -1890,6 +1897,10 @@ public class DFSClient implements java.i
       }
       LocatedBlock lb = locatedblocks.get(i);
       final ExtendedBlock block = lb.getBlock();
+      if (remaining < block.getNumBytes()) {
+        block.setNumBytes(remaining);
+      }
+      remaining -= block.getNumBytes();
       final DatanodeInfo[] datanodes = lb.getLocations();
       
       //try each datanode location of the block

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1596931&r1=1596930&r2=1596931&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
Thu May 22 18:17:11 2014
@@ -68,14 +68,12 @@ import org.apache.hadoop.hdfs.protocol.C
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
@@ -85,7 +83,6 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Progressable;
 
@@ -1142,7 +1139,7 @@ public class DistributedFileSystem exten
       @Override
       public FileChecksum doCall(final Path p)
           throws IOException, UnresolvedLinkException {
-        return dfs.getFileChecksum(getPathName(p));
+        return dfs.getFileChecksum(getPathName(p), Long.MAX_VALUE);
       }
 
       @Override
@@ -1154,6 +1151,32 @@ public class DistributedFileSystem exten
   }
 
   @Override
+  public FileChecksum getFileChecksum(Path f, final long length)
+      throws IOException {
+    statistics.incrementReadOps(1);
+    Path absF = fixRelativePart(f);
+    return new FileSystemLinkResolver<FileChecksum>() {
+      @Override
+      public FileChecksum doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        return dfs.getFileChecksum(getPathName(p), length);
+      }
+
+      @Override
+      public FileChecksum next(final FileSystem fs, final Path p)
+          throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+          return ((DistributedFileSystem) fs).getFileChecksum(p, length);
+        } else {
+          throw new UnsupportedFileSystemException(
+              "getFileChecksum(Path, long) is not supported by "
+                  + fs.getClass().getSimpleName()); 
+        }
+      }
+    }.resolve(this, absF);
+  }
+
+  @Override
   public void setPermission(Path p, final FsPermission permission
       ) throws IOException {
     statistics.incrementWriteOps(1);

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1596931&r1=1596930&r2=1596931&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
Thu May 22 18:17:11 2014
@@ -42,6 +42,7 @@ import java.net.Socket;
 import java.net.SocketException;
 import java.net.UnknownHostException;
 import java.nio.channels.ClosedChannelException;
+import java.security.MessageDigest;
 import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
@@ -83,6 +84,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
+import com.google.common.base.Preconditions;
 import com.google.common.net.InetAddresses;
 import com.google.protobuf.ByteString;
 
@@ -802,7 +804,44 @@ class DataXceiver extends Receiver imple
       IOUtils.closeStream(out);
     }
   }
-  
+
+  private MD5Hash calcPartialBlockChecksum(ExtendedBlock block,
+      long requestLength, DataChecksum checksum, DataInputStream checksumIn)
+      throws IOException {
+    final int bytesPerCRC = checksum.getBytesPerChecksum();
+    final int csize = checksum.getChecksumSize();
+    final byte[] buffer = new byte[4*1024];
+    MessageDigest digester = MD5Hash.getDigester();
+
+    long remaining = requestLength / bytesPerCRC * csize;
+    for (int toDigest = 0; remaining > 0; remaining -= toDigest) {
+      toDigest = checksumIn.read(buffer, 0,
+          (int) Math.min(remaining, buffer.length));
+      if (toDigest < 0) {
+        break;
+      }
+      digester.update(buffer, 0, toDigest);
+    }
+    
+    int partialLength = (int) (requestLength % bytesPerCRC);
+    if (partialLength > 0) {
+      byte[] buf = new byte[partialLength];
+      final InputStream blockIn = datanode.data.getBlockInputStream(block,
+          requestLength - partialLength);
+      try {
+        // Get the CRC of the partialLength.
+        IOUtils.readFully(blockIn, buf, 0, partialLength);
+      } finally {
+        IOUtils.closeStream(blockIn);
+      }
+      checksum.update(buf, 0, partialLength);
+      byte[] partialCrc = new byte[csize];
+      checksum.writeValue(partialCrc, 0, true);
+      digester.update(partialCrc);
+    }
+    return new MD5Hash(digester.digest());
+  }
+
   @Override
   public void blockChecksum(final ExtendedBlock block,
       final Token<BlockTokenIdentifier> blockToken) throws IOException {
@@ -810,25 +849,32 @@ class DataXceiver extends Receiver imple
         getOutputStream());
     checkAccess(out, true, block, blockToken,
         Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
-    updateCurrentThreadName("Reading metadata for block " + block);
-    final LengthInputStream metadataIn = 
-      datanode.data.getMetaDataInputStream(block);
-    final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(
-        metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
+    // client side now can specify a range of the block for checksum
+    long requestLength = block.getNumBytes();
+    Preconditions.checkArgument(requestLength >= 0);
+    long visibleLength = datanode.data.getReplicaVisibleLength(block);
+    boolean partialBlk = requestLength < visibleLength;
 
+    updateCurrentThreadName("Reading metadata for block " + block);
+    final LengthInputStream metadataIn = datanode.data
+        .getMetaDataInputStream(block);
+    
+    final DataInputStream checksumIn = new DataInputStream(
+        new BufferedInputStream(metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
     updateCurrentThreadName("Getting checksum for block " + block);
     try {
       //read metadata file
-      final BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
-      final DataChecksum checksum = header.getChecksum(); 
+      final BlockMetadataHeader header = BlockMetadataHeader
+          .readHeader(checksumIn);
+      final DataChecksum checksum = header.getChecksum();
+      final int csize = checksum.getChecksumSize();
       final int bytesPerCRC = checksum.getBytesPerChecksum();
-      final long crcPerBlock = checksum.getChecksumSize() > 0 
-              ? (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize())/checksum.getChecksumSize()
-              : 0;
-      
-      //compute block checksum
-      final MD5Hash md5 = MD5Hash.digest(checksumIn);
+      final long crcPerBlock = csize <= 0 ? 0 : 
+        (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize()) / csize;
 
+      final MD5Hash md5 = partialBlk && crcPerBlock > 0 ? 
+          calcPartialBlockChecksum(block, requestLength, checksum, checksumIn)
+            : MD5Hash.digest(checksumIn);
       if (LOG.isDebugEnabled()) {
         LOG.debug("block=" + block + ", bytesPerCRC=" + bytesPerCRC
             + ", crcPerBlock=" + crcPerBlock + ", md5=" + md5);
@@ -841,8 +887,7 @@ class DataXceiver extends Receiver imple
           .setBytesPerCrc(bytesPerCRC)
           .setCrcPerBlock(crcPerBlock)
           .setMd5(ByteString.copyFrom(md5.getDigest()))
-          .setCrcType(PBHelper.convert(checksum.getChecksumType()))
-          )
+          .setCrcType(PBHelper.convert(checksum.getChecksumType())))
         .build()
         .writeDelimitedTo(out);
       out.flush();

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1596931&r1=1596930&r2=1596931&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
Thu May 22 18:17:11 2014
@@ -74,7 +74,6 @@ import org.apache.hadoop.hdfs.web.resour
 import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
 import org.apache.hadoop.hdfs.web.resources.UriFsPathParam;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -452,7 +451,7 @@ public class DatanodeWebHdfsMethods {
       MD5MD5CRC32FileChecksum checksum = null;
       DFSClient dfsclient = newDfsClient(nnId, conf);
       try {
-        checksum = dfsclient.getFileChecksum(fullpath);
+        checksum = dfsclient.getFileChecksum(fullpath, Long.MAX_VALUE);
         dfsclient.close();
         dfsclient = null;
       } finally {

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetFileChecksum.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetFileChecksum.java?rev=1596931&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetFileChecksum.java
(added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetFileChecksum.java
Thu May 22 18:17:11 2014
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestGetFileChecksum {
+  private static final int BLOCKSIZE = 1024;
+  private static final short REPLICATION = 3;
+
+  private Configuration conf;
+  private MiniDFSCluster cluster;
+  private DistributedFileSystem dfs;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION)
+        .build();
+    cluster.waitActive();
+    dfs = cluster.getFileSystem();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  public void testGetFileChecksum(final Path foo, final int appendLength)
+      throws Exception {
+    final int appendRounds = 16;
+    FileChecksum[] fc = new FileChecksum[appendRounds + 1];
+    DFSTestUtil.createFile(dfs, foo, appendLength, REPLICATION, 0L);
+    fc[0] = dfs.getFileChecksum(foo);
+    for (int i = 0; i < appendRounds; i++) {
+      DFSTestUtil.appendFile(dfs, foo, appendLength);
+      fc[i + 1] = dfs.getFileChecksum(foo);
+    }
+
+    for (int i = 0; i < appendRounds + 1; i++) {
+      FileChecksum checksum = dfs.getFileChecksum(foo, appendLength * (i+1));
+      Assert.assertTrue(checksum.equals(fc[i]));
+    }
+  }
+
+  @Test
+  public void testGetFileChecksum() throws Exception {
+    testGetFileChecksum(new Path("/foo"), BLOCKSIZE / 4);
+    testGetFileChecksum(new Path("/bar"), BLOCKSIZE / 4 - 1);
+  }
+}



Mime
View raw message