hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r695755 - in /hadoop/core/trunk: ./ src/core/org/apache/hadoop/fs/ src/core/org/apache/hadoop/io/ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/hdfs/org/apach...
Date Tue, 16 Sep 2008 07:44:21 GMT
Author: szetszwo
Date: Tue Sep 16 00:44:18 2008
New Revision: 695755

URL: http://svn.apache.org/viewvc?rev=695755&view=rev
Log:
HADOOP-3981. Implement distributed file checksum and change DistCp to use it.

Added:
    hadoop/core/trunk/src/core/org/apache/hadoop/fs/MD5MD5CRC32FileChecksum.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java
    hadoop/core/trunk/src/core/org/apache/hadoop/fs/LengthFileChecksum.java
    hadoop/core/trunk/src/core/org/apache/hadoop/io/MD5Hash.java
    hadoop/core/trunk/src/core/org/apache/hadoop/io/WritableUtils.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeStatistics.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeStatisticsMBean.java
    hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
    hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCp.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=695755&r1=695754&r2=695755&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Sep 16 00:44:18 2008
@@ -156,6 +156,10 @@
     HADOOP-3992. Add a synthetic load generation facility to the test
     directory. (hairong via szetszwo)
 
+    HADOOP-3981. Implement a distributed file checksum algorithm in HDFS
+    and change DistCp to use file checksum for comparing src and dst files
+    (szetszwo)
+
   IMPROVEMENTS
 
     HADOOP-3908. Fuse-dfs: better error message if llibhdfs.so doesn't exist.

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java?rev=695755&r1=695754&r2=695755&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java Tue Sep 16 00:44:18 2008
@@ -1289,10 +1289,12 @@
    * Get the checksum of a file.
    *
    * @param f The file path
-   * @return The checksum 
+   * @return The file checksum.  The default return value is null,
+   *  which indicates that no checksum algorithm is implemented
+   *  in the corresponding FileSystem.
    */
   public FileChecksum getFileChecksum(Path f) throws IOException {
-    return new LengthFileChecksum(getFileStatus(f).getLen());
+    return null;
   }
 
   /**

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/LengthFileChecksum.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/LengthFileChecksum.java?rev=695755&r1=695754&r2=695755&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/LengthFileChecksum.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/LengthFileChecksum.java Tue Sep 16 00:44:18
2008
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.fs;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/** An abstract class representing file checksums for files. */
-public class LengthFileChecksum extends FileChecksum {
-  public static final String ALGORITHM_NAME = "FILE-LENGTH";
-  
-  private long value;
-
-  /** Constructor */
-  public LengthFileChecksum() {}
-
-  public LengthFileChecksum(long value) {
-    this.value = value;
-  }
-
-  /** {@inheritDoc} */ 
-  public String getAlgorithmName() {return ALGORITHM_NAME;}
-
-  /** {@inheritDoc} */ 
-  public int getLength() {return Long.SIZE/Byte.SIZE;}
-
-  /** {@inheritDoc} */ 
-  public byte[] getBytes() {
-    final byte[] b = new byte[getLength()];
-    for(int i = 0; i < b.length; i++) {
-      b[i] = (byte)(value >>> (i*Byte.SIZE));
-    }
-    return b;
-  }
-
-  /** {@inheritDoc} */ 
-  public void readFields(DataInput in) throws IOException {
-    value = in.readLong();
-  }
-
-  /** {@inheritDoc} */ 
-  public void write(DataOutput out) throws IOException {
-    out.writeLong(value);    
-  }
-
-  /** {@inheritDoc} */ 
-  public String toString() {
-    return getClass().getSimpleName() + ": " + value;
-  }
-}
\ No newline at end of file

Added: hadoop/core/trunk/src/core/org/apache/hadoop/fs/MD5MD5CRC32FileChecksum.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/MD5MD5CRC32FileChecksum.java?rev=695755&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/MD5MD5CRC32FileChecksum.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/MD5MD5CRC32FileChecksum.java Tue Sep 16
00:44:18 2008
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.io.WritableUtils;
+
+/** MD5 of MD5 of CRC32. */
+public class MD5MD5CRC32FileChecksum extends FileChecksum {
+  public static final int LENGTH = MD5Hash.MD5_LEN
+      + (Integer.SIZE + Long.SIZE)/Byte.SIZE;
+
+  private int bytesPerCRC;
+  private long crcPerBlock;
+  private MD5Hash md5;
+
+  /** Same as this(0, 0, null) */
+  public MD5MD5CRC32FileChecksum() {
+    this(0, 0, null);
+  }
+
+  /** Create a MD5FileChecksum */
+  public MD5MD5CRC32FileChecksum(int bytesPerCRC, long crcPerBlock, MD5Hash md5) {
+    this.bytesPerCRC = bytesPerCRC;
+    this.crcPerBlock = crcPerBlock;
+    this.md5 = md5;
+  }
+  
+  /** {@inheritDoc} */ 
+  public String getAlgorithmName() {
+    return "MD5-of-" + crcPerBlock + "MD5-of-" + bytesPerCRC + "CRC32";
+  }
+
+  /** {@inheritDoc} */ 
+  public int getLength() {return LENGTH;}
+
+  /** {@inheritDoc} */ 
+  public byte[] getBytes() {
+    return WritableUtils.toByteArray(this);
+  }
+
+  /** {@inheritDoc} */ 
+  public void readFields(DataInput in) throws IOException {
+    bytesPerCRC = in.readInt();
+    crcPerBlock = in.readLong();
+    md5 = MD5Hash.read(in);
+  }
+
+  /** {@inheritDoc} */ 
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(bytesPerCRC);
+    out.writeLong(crcPerBlock);
+    md5.write(out);    
+  }
+
+  /** {@inheritDoc} */ 
+  public String toString() {
+    return getAlgorithmName() + ":" + md5;
+  }
+}
\ No newline at end of file

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/io/MD5Hash.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/MD5Hash.java?rev=695755&r1=695754&r2=695755&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/io/MD5Hash.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/io/MD5Hash.java Tue Sep 16 00:44:18 2008
@@ -21,13 +21,15 @@
 import java.io.IOException;
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.InputStream;
 import java.util.Arrays;
 import java.security.*;
 
 /** A Writable for MD5 hash values.
  */
-public class MD5Hash implements WritableComparable {
+public class MD5Hash implements WritableComparable<MD5Hash> {
   public static final int MD5_LEN = 16;
+
   private static ThreadLocal<MessageDigest> DIGESTER_FACTORY = new ThreadLocal<MessageDigest>()
{
     protected MessageDigest initialValue() {
       try {
@@ -87,6 +89,18 @@
     return digest(data, 0, data.length);
   }
 
+  /** Construct a hash value for the content from the InputStream. */
+  public static MD5Hash digest(InputStream in) throws IOException {
+    final byte[] buffer = new byte[4*1024]; 
+
+    final MessageDigest digester = DIGESTER_FACTORY.get();
+    for(int n; (n = in.read(buffer)) != -1; ) {
+      digester.update(buffer, 0, n);
+    }
+
+    return new MD5Hash(digester.digest());
+  }
+
   /** Construct a hash value for a byte array. */
   public static MD5Hash digest(byte[] data, int start, int len) {
     byte[] digest;
@@ -143,8 +157,7 @@
 
 
   /** Compares this object with the specified object for order.*/
-  public int compareTo(Object o) {
-    MD5Hash that = (MD5Hash)o;
+  public int compareTo(MD5Hash that) {
     return WritableComparator.compareBytes(this.digest, 0, MD5_LEN,
                                            that.digest, 0, MD5_LEN);
   }

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/io/WritableUtils.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/WritableUtils.java?rev=695755&r1=695754&r2=695755&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/io/WritableUtils.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/io/WritableUtils.java Tue Sep 16 00:44:18
2008
@@ -428,4 +428,18 @@
                             "due to end of input.");
     }
   }
+
+  /** Convert writables to a byte array */
+  public static byte[] toByteArray(Writable... writables) {
+    final DataOutputBuffer out = new DataOutputBuffer();
+    try {
+      for(Writable w : writables) {
+        w.write(out);
+      }
+      out.close();
+    } catch (IOException e) {
+      throw new RuntimeException("Fail to convert writables to a byte array",e);
+    }
+    return out.getData();
+  }
 }

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=695755&r1=695754&r2=695755&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Tue Sep 16 00:44:18 2008
@@ -580,6 +580,109 @@
   }
 
   /**
+   * Get the checksum of a file.
+   * @param src The file path
+   * @return The checksum 
+   */
+  MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
+    checkOpen();
+    
+    //get all block locations
+    final List<LocatedBlock> locatedblocks
+        = callGetBlockLocations(src, 0, Long.MAX_VALUE).getLocatedBlocks();
+    final DataOutputBuffer md5out = new DataOutputBuffer();
+    int bytesPerCRC = 0;
+    long crcPerBlock = 0;
+
+    //get block checksum for each block
+    for(int i = 0; i < locatedblocks.size(); i++) {
+      LocatedBlock lb = locatedblocks.get(i);
+      final Block block = lb.getBlock();
+      final DatanodeInfo[] datanodes = lb.getLocations();
+      
+      //try each datanode location of the block
+      final int timeout = 3000 * datanodes.length + socketTimeout;
+      boolean done = false;
+      for(int j = 0; !done && j < datanodes.length; j++) {
+        //connect to a datanode
+        final Socket sock = socketFactory.createSocket();
+        sock.connect(NetUtils.createSocketAddr(datanodes[j].getName()), timeout);
+        sock.setSoTimeout(timeout);
+
+        DataOutputStream out = new DataOutputStream(
+            new BufferedOutputStream(NetUtils.getOutputStream(sock), 
+                                     DataNode.SMALL_BUFFER_SIZE));
+        DataInputStream in = new DataInputStream(NetUtils.getInputStream(sock));
+
+        // get block MD5
+        try {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("write to " + datanodes[j].getName() + ": "
+                + OP_BLOCK_CHECKSUM + ", block=" + block);
+          }
+          out.writeShort(DATA_TRANSFER_VERSION);
+          out.write(OP_BLOCK_CHECKSUM);
+          out.writeLong(block.getBlockId());
+          out.writeLong(block.getGenerationStamp());
+          out.flush();
+         
+          final short reply = in.readShort();
+          if (reply != OP_STATUS_SUCCESS) {
+            throw new IOException("Bad response " + reply + " for block "
+                + block + " from datanode " + datanodes[j].getName());
+          }
+
+          //read byte-per-checksum
+          final int bpc = in.readInt(); 
+          if (i == 0) { //first block
+            bytesPerCRC = bpc;
+          }
+          else if (bpc != bytesPerCRC) {
+            throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
+                + " but bytesPerCRC=" + bytesPerCRC);
+          }
+          
+          //read crc-per-block
+          final long cpb = in.readLong();
+          if (locatedblocks.size() > 1 && i == 0) {
+            crcPerBlock = cpb;
+          }
+
+          //read md5
+          final MD5Hash md5 = MD5Hash.read(in);
+          md5.write(md5out);
+          
+          done = true;
+
+          if (LOG.isDebugEnabled()) {
+            if (i == 0) {
+              LOG.debug("set bytesPerCRC=" + bytesPerCRC
+                  + ", crcPerBlock=" + crcPerBlock);
+            }
+            LOG.debug("got reply from " + datanodes[j].getName()
+                + ": md5=" + md5);
+          }
+        } catch (IOException ie) {
+          LOG.warn("src=" + src + ", datanodes[" + j + "].getName()="
+              + datanodes[j].getName(), ie);
+        } finally {
+          IOUtils.closeStream(in);
+          IOUtils.closeStream(out);
+          IOUtils.closeSocket(sock);        
+        }
+      }
+
+      if (!done) {
+        throw new IOException("Fail to get block MD5 for " + block);
+      }
+    }
+
+    //compute file MD5
+    final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData()); 
+    return new MD5MD5CRC32FileChecksum(bytesPerCRC, crcPerBlock, fileMD5);
+  }
+
+  /**
    * Set permissions to a file or directory.
    * @param src path name.
    * @param permission

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=695755&r1=695754&r2=695755&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java Tue Sep 16
00:44:18 2008
@@ -406,6 +406,11 @@
     }
   }
 
+  /** {@inheritDoc} */
+  public MD5MD5CRC32FileChecksum getFileChecksum(Path f) throws IOException {
+    return dfs.getFileChecksum(getPathName(f));
+  }
+
   /** {@inheritDoc }*/
   public void setPermission(Path p, FsPermission permission
       ) throws IOException {

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java?rev=695755&r1=695754&r2=695755&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java Tue Sep 16
00:44:18 2008
@@ -87,6 +87,7 @@
   public static final byte OP_READ_METADATA = (byte) 82;
   public static final byte OP_REPLACE_BLOCK = (byte) 83;
   public static final byte OP_COPY_BLOCK = (byte) 84;
+  public static final byte OP_BLOCK_CHECKSUM = (byte) 85;
   
   public static final int OP_STATUS_SUCCESS = 0;  
   public static final int OP_STATUS_ERROR = 1;  

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=695755&r1=695754&r2=695755&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue
Sep 16 00:44:18 2008
@@ -33,8 +33,10 @@
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
 import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
 
@@ -114,6 +116,10 @@
         copyBlock(in);
         datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime);
         break;
+      case OP_BLOCK_CHECKSUM: //get the checksum of a block
+        getBlockChecksum(in);
+        datanode.myMetrics.blockChecksumOp.inc(DataNode.now() - startTime);
+        break;
       default:
         throw new IOException("Unknown opcode " + op + " in data stream");
       }
@@ -414,6 +420,49 @@
   }
   
   /**
+   * Get block checksum (MD5 of CRC32).
+   * @param in
+   */
+  void getBlockChecksum(DataInputStream in) throws IOException {
+    final Block block = new Block(in.readLong(), 0 , in.readLong());
+
+    DataOutputStream out = null;
+    final MetaDataInputStream metadataIn = datanode.data.getMetaDataInputStream(block);
+    final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(
+        metadataIn, BUFFER_SIZE));
+
+    try {
+      //read metadata file
+      final BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
+      final DataChecksum checksum = header.getChecksum(); 
+      final int bytesPerCRC = checksum.getBytesPerChecksum();
+      final long crcPerBlock = (metadataIn.getLength()
+          - BlockMetadataHeader.getHeaderSize())/checksum.getChecksumSize();
+      
+      //compute block checksum
+      final MD5Hash md5 = MD5Hash.digest(checksumIn);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("block=" + block + ", bytesPerCRC=" + bytesPerCRC
+            + ", crcPerBlock=" + crcPerBlock + ", md5=" + md5);
+      }
+
+      //write reply
+      out = new DataOutputStream(
+          NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
+      out.writeShort(OP_STATUS_SUCCESS);
+      out.writeInt(bytesPerCRC);
+      out.writeLong(crcPerBlock);
+      md5.write(out);
+      out.flush();
+    } finally {
+      IOUtils.closeStream(out);
+      IOUtils.closeStream(checksumIn);
+      IOUtils.closeStream(metadataIn);
+    }
+  }
+
+  /**
    * Read a block from the disk and then sends it to a destination.
    * 
    * @param in The stream to read from

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java?rev=695755&r1=695754&r2=695755&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
(original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
Tue Sep 16 00:44:18 2008
@@ -76,6 +76,8 @@
                 new MetricsTimeVaryingRate("writeBlockOp");
   public MetricsTimeVaryingRate readMetadataOp = 
                 new MetricsTimeVaryingRate("readMetadataOp");
+  public MetricsTimeVaryingRate blockChecksumOp = 
+                new MetricsTimeVaryingRate("blockChecksumOp");
   public MetricsTimeVaryingRate copyBlockOp = 
                 new MetricsTimeVaryingRate("copyBlockOp");
   public MetricsTimeVaryingRate replaceBlockOp = 
@@ -130,6 +132,7 @@
       readBlockOp.pushMetric(metricsRecord);
       writeBlockOp.pushMetric(metricsRecord);
       readMetadataOp.pushMetric(metricsRecord);
+      blockChecksumOp.pushMetric(metricsRecord);
       copyBlockOp.pushMetric(metricsRecord);
       replaceBlockOp.pushMetric(metricsRecord);
       heartbeats.pushMetric(metricsRecord);
@@ -141,6 +144,7 @@
     readBlockOp.resetMinMax();
     writeBlockOp.resetMinMax();
     readMetadataOp.resetMinMax();
+    blockChecksumOp.resetMinMax();
     copyBlockOp.resetMinMax();
     replaceBlockOp.resetMinMax();
     heartbeats.resetMinMax();

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeStatistics.java?rev=695755&r1=695754&r2=695755&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeStatistics.java
(original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeStatistics.java
Tue Sep 16 00:44:18 2008
@@ -197,6 +197,34 @@
   /**
    * @inheritDoc
    */
+  public long getBlockChecksumOpAverageTime() {
+    return myMetrics.blockChecksumOp.getPreviousIntervalAverageTime();
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public long getBlockChecksumOpMaxTime() {
+    return myMetrics.blockChecksumOp.getMaxTime();
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public long getBlockChecksumOpMinTime() {
+    return myMetrics.blockChecksumOp.getMinTime();
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public int getBlockChecksumOpNum() {
+    return myMetrics.blockChecksumOp.getPreviousIntervalNumOps();
+  }
+
+  /**
+   * @inheritDoc
+   */
   public long getReplaceBlockOpAverageTime() {
     return myMetrics.replaceBlockOp.getPreviousIntervalAverageTime();
   }

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeStatisticsMBean.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeStatisticsMBean.java?rev=695755&r1=695754&r2=695755&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeStatisticsMBean.java
(original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeStatisticsMBean.java
Tue Sep 16 00:44:18 2008
@@ -185,6 +185,29 @@
    */
   long getReadMetadataOpMaxTime();
   
+  /**
+   * Number of block BlockChecksum in last interval
+   * @return number of operations
+   */
+  int getBlockChecksumOpNum(); 
+
+  /**
+   * Average time for BlockChecksum Operation in last interval
+   * @return time in msec
+   */
+  long getBlockChecksumOpAverageTime();
+  
+  /**
+   *   The Minimum BlockChecksum Operation Time since reset was called
+   * @return time in msec
+   */
+  long getBlockChecksumOpMinTime();
+  
+  /**
+   *   The Maximum BlockChecksum Operation Time since reset was called
+   * @return time in msec
+   */
+  long getBlockChecksumOpMaxTime();
   
   /**
    * Number of CopyBlock Operation in last interval

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java?rev=695755&r1=695754&r2=695755&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java Tue Sep 16 00:44:18
2008
@@ -58,7 +58,6 @@
 
 public class TestFileSystem extends TestCase {
   private static final Log LOG = FileSystem.LOG;
-  private static final Random RAN = new Random();
 
   private static Configuration conf = new Configuration();
   private static int BUFFER_SIZE = conf.getInt("io.file.buffer.size", 4096);
@@ -619,38 +618,4 @@
     assertTrue(map.containsKey(lowercaseCachekey2));    
 
   }
-  
-  public void testFileChecksum() throws IOException {
-    final long seed = RAN.nextLong();
-    System.out.println("seed=" + seed);
-    RAN.setSeed(seed);
-
-    final Configuration conf = new Configuration();
-    final String dir = ROOT + "/fileChecksum";
-    final LocalFileSystem fs = FileSystem.getLocal(conf);
-    final Path foo = new Path(dir, "foo");
-
-    //generate random data
-    final byte[] data = new byte[RAN.nextInt(3*1024) + 10*1024];
-    RAN.nextBytes(data);
-
-    //write data to a file
-    final FSDataOutputStream out = fs.create(foo);
-    out.write(data);
-    out.close();
-    
-    //compute checksum
-    final FileChecksum cs1 = fs.getFileChecksum(foo);
-    System.out.println("cs1=" + cs1);
-    
-    //rename the file and verify again
-    final Path bar = new Path(dir, "bar");
-    fs.rename(foo, bar);
-
-    { //verify checksum
-      final FileChecksum cs2 = fs.getFileChecksum(bar);
-      assertEquals(cs1.hashCode(), cs2.hashCode());
-      assertEquals(cs1, cs2);
-    }
-  }
 }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=695755&r1=695754&r2=695755&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Tue Sep
16 00:44:18 2008
@@ -18,15 +18,20 @@
 
 package org.apache.hadoop.hdfs;
 
+import java.io.IOException;
 import java.net.URI;
+import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 public class TestDistributedFileSystem extends junit.framework.TestCase {
+  private static final Random RAN = new Random();
+
   public void testFileSystemCloseAll() throws Exception {
     Configuration conf = new Configuration();
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 0, true, null);
@@ -110,4 +115,55 @@
       if (cluster != null) {cluster.shutdown();}
     }
   }
+  
+  public void testFileChecksum() throws IOException {
+    final long seed = RAN.nextLong();
+    System.out.println("seed=" + seed);
+    RAN.setSeed(seed);
+
+    final Configuration conf = new Configuration();
+    final MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+    final FileSystem fs = cluster.getFileSystem();
+
+    final String dir = "/fileChecksum";
+    final int block_size = 1024;
+    final int buffer_size = conf.getInt("io.file.buffer.size", 4096);
+    conf.setInt("io.bytes.per.checksum", 512);
+
+    //try different number of blocks
+    for(int n = 0; n < 5; n++) {
+      //generate random data
+      final byte[] data = new byte[RAN.nextInt(block_size/2-1)+n*block_size+1];
+      RAN.nextBytes(data);
+      System.out.println("data.length=" + data.length);
+  
+      //write data to a file
+      final Path foo = new Path(dir, "foo" + n);
+      {
+        final FSDataOutputStream out = fs.create(foo, false, buffer_size,
+            (short)2, block_size);
+        out.write(data);
+        out.close();
+      }
+      
+      //compute checksum
+      final FileChecksum foocs = fs.getFileChecksum(foo);
+      System.out.println("foocs=" + foocs);
+      
+      //write another file
+      final Path bar = new Path(dir, "bar" + n);
+      {
+        final FSDataOutputStream out = fs.create(bar, false, buffer_size,
+            (short)2, block_size);
+        out.write(data);
+        out.close();
+      }
+  
+      { //verify checksum
+        final FileChecksum barcs = fs.getFileChecksum(bar);
+        assertEquals(foocs.hashCode(), barcs.hashCode());
+        assertEquals(foocs, barcs);
+      }
+    }
+  }
 }

Modified: hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCp.java?rev=695755&r1=695754&r2=695755&view=diff
==============================================================================
--- hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCp.java (original)
+++ hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCp.java Tue Sep 16 00:44:18 2008
@@ -336,9 +336,10 @@
      * be meaningful in this context.
      * @throws IOException 
      */
-    private boolean needsUpdate(FileSystem srcfs, Path srcpath,
+    private boolean needsUpdate(FileStatus srcstatus,
         FileSystem dstfs, Path dstpath) throws IOException {
-      return update && !sameFile(srcfs, srcpath, dstfs, dstpath);
+      return update && !sameFile(srcstatus.getPath().getFileSystem(job),
+          srcstatus, dstfs, dstpath);
     }
     
     private FSDataOutputStream create(Path f, Reporter reporter,
@@ -387,10 +388,8 @@
         return;
       }
 
-      final Path srcpath = srcstat.getPath();
-      final FileSystem srcfs = srcpath.getFileSystem(job);
       if (destFileSys.exists(absdst) && !overwrite
-          && !needsUpdate(srcfs, srcpath, destFileSys, absdst)) {
+          && !needsUpdate(srcstat, destFileSys, absdst)) {
         outc.collect(null, new Text("SKIP: " + srcstat.getPath()));
         ++skipcount;
         reporter.incrCounter(Counter.SKIP, 1);
@@ -1050,8 +1049,7 @@
             }
             else {
               //skip file if the src and the dst files are the same.
-              final Path absdst = new Path(args.dst, dst);
-              skipfile = update && sameFile(srcfs,child.getPath(),dstfs,absdst);
+              skipfile = update && sameFile(srcfs, child, dstfs, new Path(args.dst,
dst));
               
               if (!skipfile) {
                 ++fileCount;
@@ -1134,15 +1132,38 @@
   }
 
   /**
-   * Check whether the src and the dst are the same.
-   * Two files are considered as the same if they have the same size.
+   * Check whether the contents of src and dst are the same.
+   * 
+   * Return false if dstpath does not exist
+   * 
+   * If the files have different sizes, return false.
+   * 
+   * If the files have the same sizes, the file checksums will be compared.
+   * 
+   * When file checksum is not supported in any of file systems,
+   * two files are considered as the same if they have the same size.
    */
-  static private boolean sameFile(FileSystem srcfs, Path srcpath,
+  static private boolean sameFile(FileSystem srcfs, FileStatus srcstatus,
       FileSystem dstfs, Path dstpath) throws IOException {
+    FileStatus dststatus;
     try {
-      final FileChecksum srccs = srcfs.getFileChecksum(srcpath);
-      final FileChecksum dstcs = dstfs.getFileChecksum(dstpath);
-      return srccs != null && srccs.equals(dstcs);
+      dststatus = dstfs.getFileStatus(dstpath);
+    } catch(FileNotFoundException fnfe) {
+      return false;
+    }
+
+    //same length?
+    if (srcstatus.getLen() != dststatus.getLen()) {
+      return false;
+    }
+
+    //compare checksums
+    try {
+      final FileChecksum srccs = srcfs.getFileChecksum(srcstatus.getPath());
+      final FileChecksum dstcs = dstfs.getFileChecksum(dststatus.getPath());
+      //return true if checksum is not supported
+      //(i.e. some of the checksums is null)
+      return srccs == null || dstcs == null || srccs.equals(dstcs);
     } catch(FileNotFoundException fnfe) {
       return false;
     }



Mime
View raw message