hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hair...@apache.org
Subject svn commit: r828120 - in /hadoop/hdfs/branches/branch-0.21: ./ .eclipse.templates/.launches/ src/contrib/block_forensics/ src/contrib/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/ha...
Date Wed, 21 Oct 2009 18:01:55 GMT
Author: hairong
Date: Wed Oct 21 18:01:54 2009
New Revision: 828120

URL: http://svn.apache.org/viewvc?rev=828120&view=rev
Log:
Merge -c 828116 to move the change of HDFS-679 from trunk to branch 0.21

Modified:
    hadoop/hdfs/branches/branch-0.21/   (props changed)
    hadoop/hdfs/branches/branch-0.21/.eclipse.templates/.launches/   (props changed)
    hadoop/hdfs/branches/branch-0.21/CHANGES.txt
    hadoop/hdfs/branches/branch-0.21/build.xml   (props changed)
    hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/   (props changed)
    hadoop/hdfs/branches/branch-0.21/src/contrib/hdfsproxy/   (props changed)
    hadoop/hdfs/branches/branch-0.21/src/java/   (props changed)
    hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java
  (props changed)
    hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
    hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
    hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
    hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
  (props changed)
    hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/hdfs/protocol/   (props
changed)
    hadoop/hdfs/branches/branch-0.21/src/test/hdfs/   (props changed)
    hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
    hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
    hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
    hadoop/hdfs/branches/branch-0.21/src/webapps/datanode/   (props changed)
    hadoop/hdfs/branches/branch-0.21/src/webapps/hdfs/   (props changed)
    hadoop/hdfs/branches/branch-0.21/src/webapps/secondary/   (props changed)

Propchange: hadoop/hdfs/branches/branch-0.21/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 21 18:01:54 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/hdfs:713112
 /hadoop/hdfs/branches/HDFS-265:796829-820463
-/hadoop/hdfs/trunk:818294-818298,824552,824944,826149
+/hadoop/hdfs/trunk:818294-818298,824552,824944,826149,828116

Propchange: hadoop/hdfs/branches/branch-0.21/.eclipse.templates/.launches/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 21 18:01:54 2009
@@ -1 +1 @@
-/hadoop/hdfs/trunk/.eclipse.templates/.launches:824552,824944,826149
+/hadoop/hdfs/trunk/.eclipse.templates/.launches:824552,824944,826149,828116

Modified: hadoop/hdfs/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/CHANGES.txt?rev=828120&r1=828119&r2=828120&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/hdfs/branches/branch-0.21/CHANGES.txt Wed Oct 21 18:01:54 2009
@@ -402,6 +402,9 @@
 
     HDFS-668. TestFileAppend3#TC7 sometimes hangs. (hairong)
 
+    HDFS-679. Appending to a partial chunk incorrectly assumes the
+    first packet fills up the partial chunk. (hairong)
+
 Release 0.20.2 - Unreleased
 
   BUG FIXES

Propchange: hadoop/hdfs/branches/branch-0.21/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 21 18:01:54 2009
@@ -1,4 +1,4 @@
 /hadoop/core/branches/branch-0.19/hdfs/build.xml:713112
 /hadoop/core/trunk/build.xml:779102
 /hadoop/hdfs/branches/HDFS-265/build.xml:796829-820463
-/hadoop/hdfs/trunk/build.xml:818294-818298,824552,824944,825229,826149
+/hadoop/hdfs/trunk/build.xml:818294-818298,824552,824944,825229,826149,828116

Propchange: hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 21 18:01:54 2009
@@ -1 +1 @@
-/hadoop/hdfs/trunk/src/contrib/block_forensics:824552,824944,826149
+/hadoop/hdfs/trunk/src/contrib/block_forensics:824552,824944,826149,828116

Propchange: hadoop/hdfs/branches/branch-0.21/src/contrib/hdfsproxy/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 21 18:01:54 2009
@@ -1,4 +1,4 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/contrib/hdfsproxy:713112
 /hadoop/core/trunk/src/contrib/hdfsproxy:776175-784663
 /hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy:796829-820463
-/hadoop/hdfs/trunk/src/contrib/hdfsproxy:818294-818298,824552,824944,826149
+/hadoop/hdfs/trunk/src/contrib/hdfsproxy:818294-818298,824552,824944,826149,828116

Propchange: hadoop/hdfs/branches/branch-0.21/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 21 18:01:54 2009
@@ -1,4 +1,4 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/java:713112
 /hadoop/core/trunk/src/hdfs:776175-785643,785929-786278
 /hadoop/hdfs/branches/HDFS-265/src/java:796829-820463
-/hadoop/hdfs/trunk/src/java:818294-818298,824552,824944,826149
+/hadoop/hdfs/trunk/src/java:818294-818298,824552,824944,826149,828116

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=828120&r1=828119&r2=828120&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java Wed Oct
21 18:01:54 2009
@@ -3422,16 +3422,18 @@
         waitAndQueuePacket(currentPacket);
         currentPacket = null;
 
-        // If this was the first write after reopening a file, then the above
-        // write filled up any partial chunk. Tell the summer to generate full 
+        // If the reopened file did not end at chunk boundary and the above
+        // write filled up its partial chunk. Tell the summer to generate full 
         // crc chunks from now on.
-        if (appendChunk) {
+        if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
           appendChunk = false;
           resetChecksumChunk(bytesPerChecksum);
         }
-        int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);
-        computePacketChunkSize(psize, bytesPerChecksum);
-        
+
+        if (!appendChunk) {
+          int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);
+          computePacketChunkSize(psize, bytesPerChecksum);
+        }
         //
         // if encountering a block boundary, send an empty packet to 
         // indicate the end of block and reset bytesCurBlock.

Propchange: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 21 18:01:54 2009
@@ -1,5 +1,5 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/RecoveryInProgressException.java:713112
 /hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/RecoveryInProgressException.java:776175-785643,785929-786278
 /hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/RecoveryInProgressException.java:817353-818319,818321-818553
-/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java:824552,824944,826149
+/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java:824552,824944,826149,828116
 /hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/RecoveryInProgressException.java:796829-800617,800619-803337,804756-805652,808672-809439,811495-813103,813105-813630,814223-815964,818294-818298

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=828120&r1=828119&r2=828120&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
(original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
Wed Oct 21 18:01:54 2009
@@ -124,26 +124,25 @@
               " while receiving block " + block + " from " + inAddr);
         }
       }
-      streams = replicaInfo.createStreams();
+      // read checksum meta information
+      this.checksum = DataChecksum.newDataChecksum(in);
+      this.bytesPerChecksum = checksum.getBytesPerChecksum();
+      this.checksumSize = checksum.getChecksumSize();
+      
+      boolean isCreate = stage == BlockConstructionStage.PIPELINE_SETUP_CREATE 
+      || clientName.length() == 0;
+      streams = replicaInfo.createStreams(isCreate,
+          this.bytesPerChecksum, this.checksumSize);
       if (streams != null) {
         this.out = streams.dataOut;
         this.checksumOut = new DataOutputStream(new BufferedOutputStream(
                                                   streams.checksumOut, 
                                                   SMALL_BUFFER_SIZE));
         
-        // read checksum meta information
-        this.checksum = DataChecksum.newDataChecksum(in);
-        this.bytesPerChecksum = checksum.getBytesPerChecksum();
-        this.checksumSize = checksum.getChecksumSize();
-        
         // write data chunk header if creating a new replica
-        if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE 
-            || clientName.length() == 0) {
+        if (isCreate) {
           BlockMetadataHeader.writeHeader(checksumOut, checksum);
-        } else {
-          datanode.data.setChannelPosition(block, streams, 0, 
-              BlockMetadataHeader.getHeaderSize());
-        }
+        } 
       }
     } catch (ReplicaAlreadyExistsException bae) {
       throw bae;
@@ -449,6 +448,7 @@
     }
     
     // update received bytes
+    long firstByteInBlock = offsetInBlock;
     offsetInBlock += len;
     if (replicaInfo.getNumBytes() < offsetInBlock) {
       replicaInfo.setNumBytes(offsetInBlock);
@@ -479,8 +479,11 @@
                                                             checksumSize;
 
       if ( buf.remaining() != (checksumLen + len)) {
-        throw new IOException("Data remaining in packet does not match " +
-                              "sum of checksumLen and dataLen");
+        throw new IOException("Data remaining in packet does not match" +
+                              "sum of checksumLen and dataLen " +
+                              " size remaining: " + buf.remaining() +
+                              " data len: " + len +
+                              " checksum Len: " + checksumLen);
       }
       int checksumOff = buf.position();
       int dataOff = checksumOff + checksumLen;
@@ -500,11 +503,29 @@
       }
 
       try {
-        if (replicaInfo.getBytesOnDisk()<offsetInBlock) {
+        long onDiskLen = replicaInfo.getBytesOnDisk();
+        if (onDiskLen<offsetInBlock) {
           //finally write to the disk :
-          setBlockPosition(offsetInBlock-len);
           
-          out.write(pktBuf, dataOff, len);
+          if (onDiskLen % bytesPerChecksum != 0) { 
+            // prepare to overwrite last checksum
+            adjustCrcFilePosition();
+          }
+          
+          // If this is a partial chunk, then read in pre-existing checksum
+          if (firstByteInBlock % bytesPerChecksum != 0) {
+            LOG.info("Packet starts at " + firstByteInBlock +
+                     " for block " + block +
+                     " which is not a multiple of bytesPerChecksum " +
+                     bytesPerChecksum);
+            long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
+                onDiskLen / bytesPerChecksum * checksumSize;
+            computePartialChunkCrc(onDiskLen, offsetInChecksum, bytesPerChecksum);
+          }
+
+          int startByteToDisk = dataOff+(int)(onDiskLen-firstByteInBlock);
+          int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
+          out.write(pktBuf, startByteToDisk, numBytesToDisk);
 
           // If this is a partial chunk, then verify that this is the only
           // chunk in the packet. Calculate new crc for this chunk.
@@ -516,7 +537,7 @@
                                     " len = " + len + 
                                     " bytesPerChecksum " + bytesPerChecksum);
             }
-            partialCrc.update(pktBuf, dataOff, len);
+            partialCrc.update(pktBuf, startByteToDisk, numBytesToDisk);
             byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
             checksumOut.write(buf);
             LOG.debug("Writing out partial crc for data len " + len);
@@ -626,14 +647,10 @@
   }
 
   /**
-   * Sets the file pointer in the local block file to the specified value.
+   * Adjust the file pointer in the local meta file so that the last checksum
+   * will be overwritten.
    */
-  private void setBlockPosition(long offsetInBlock) throws IOException {
-    if (datanode.data.getChannelPosition(block, streams) == offsetInBlock) {
-      return;                   // nothing to do 
-    }
-    long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
-                            offsetInBlock / bytesPerChecksum * checksumSize;
+  private void adjustCrcFilePosition() throws IOException {
     if (out != null) {
      out.flush();
     }
@@ -641,23 +658,8 @@
       checksumOut.flush();
     }
 
-    // If this is a partial chunk, then read in pre-existing checksum
-    if (offsetInBlock % bytesPerChecksum != 0) {
-      LOG.info("setBlockPosition trying to set position to " +
-               offsetInBlock +
-               " for block " + block +
-               " which is not a multiple of bytesPerChecksum " +
-               bytesPerChecksum);
-      computePartialChunkCrc(offsetInBlock, offsetInChecksum, bytesPerChecksum);
-    }
-
-    LOG.info("Changing block file offset of block " + block + " from " + 
-        datanode.data.getChannelPosition(block, streams) +
-             " to " + offsetInBlock +
-             " meta file offset to " + offsetInChecksum);
-
-    // set the position of the block file
-    datanode.data.setChannelPosition(block, streams, offsetInBlock, offsetInChecksum);
+    // rollback the position of the meta file
+    datanode.data.adjustCrcChannelPosition(block, streams, checksumSize);
   }
 
   /**

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=828120&r1=828119&r2=828120&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
(original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
Wed Oct 21 18:01:54 2009
@@ -27,6 +27,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
@@ -1337,37 +1338,18 @@
   }
 
   /**
-   * Retrieves the offset in the block to which the
-   * the next write will write data to.
+   * Sets the offset in the meta file so that the
+   * last checksum will be overwritten.
    */
-  public long getChannelPosition(Block b, BlockWriteStreams streams) 
-                                 throws IOException {
-    FileOutputStream file = (FileOutputStream) streams.dataOut;
-    return file.getChannel().position();
-  }
-
-  /**
-   * Sets the offset in the block to which the
-   * the next write will write data to.
-   */
-  public void setChannelPosition(Block b, BlockWriteStreams streams, 
-                                 long dataOffset, long ckOffset) 
-                                 throws IOException {
-    long size = 0;
-    synchronized (this) {
-      size = getReplicaInfo(b).getBlockFile().length();
-    }
-    if (size < dataOffset) {
-      String msg = "Trying to change block file offset of block " + b +
-                     " to " + dataOffset +
-                     " but actual size of file is " +
-                     size;
-      throw new IOException(msg);
-    }
-    FileOutputStream file = (FileOutputStream) streams.dataOut;
-    file.getChannel().position(dataOffset);
-    file = (FileOutputStream) streams.checksumOut;
-    file.getChannel().position(ckOffset);
+  public void adjustCrcChannelPosition(Block b, BlockWriteStreams streams, 
+      int checksumSize) throws IOException {
+    FileOutputStream file = (FileOutputStream) streams.checksumOut;
+    FileChannel channel = file.getChannel();
+    long oldPos = channel.position();
+    long newPos = oldPos - checksumSize;
+    DataNode.LOG.info("Changing meta file offset of block " + b + " from " + 
+        oldPos + " to " + newPos);
+    channel.position(newPos);
   }
 
   synchronized File createTmpFile( FSVolume vol, Block blk ) throws IOException {

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=828120&r1=828119&r2=828120&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
(original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
Wed Oct 21 18:01:54 2009
@@ -306,27 +306,15 @@
   public void shutdown();
 
   /**
-   * Returns the current offset in the data stream.
-   * @param b
-   * @param stream The stream to the data file and checksum file
-   * @return the position of the file pointer in the data stream
-   * @throws IOException
-   */
-  public long getChannelPosition(Block b, BlockWriteStreams stream) throws IOException;
-
-  /**
-   * Sets the file pointer of the data stream and checksum stream to
-   * the specified values.
-   * @param b
+   * Sets the file pointer of the checksum stream so that the last checksum
+   * will be overwritten
+   * @param b block
    * @param stream The stream for the data file and checksum file
-   * @param dataOffset The position to which the file pointre for the data stream
-   *        should be set
-   * @param ckOffset The position to which the file pointre for the checksum stream
-   *        should be set
+   * @param checksumSize number of bytes each checksum has
    * @throws IOException
    */
-  public void setChannelPosition(Block b, BlockWriteStreams stream, long dataOffset,
-                                 long ckOffset) throws IOException;
+  public void adjustCrcChannelPosition(Block b, BlockWriteStreams stream, 
+      int checksumSize) throws IOException;
 
   /**
    * checks how many valid storage volumes are there in the DataNode

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java?rev=828120&r1=828119&r2=828120&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
(original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
Wed Oct 21 18:01:54 2009
@@ -27,6 +27,7 @@
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.DataChecksum;
 
 /** 
  * This class defines a replica in a pipeline, which
@@ -160,7 +161,8 @@
   }
   
   @Override // ReplicaInPipelineInterface
-  public BlockWriteStreams createStreams() throws IOException {
+  public BlockWriteStreams createStreams(boolean isCreate, 
+      int bytesPerChunk, int checksumSize) throws IOException {
     File blockFile = getBlockFile();
     File metaFile = getMetaFile();
     if (DataNode.LOG.isDebugEnabled()) {
@@ -169,6 +171,17 @@
       DataNode.LOG.debug("writeTo metafile is " + metaFile +
                          " of size " + metaFile.length());
     }
+    long blockDiskSize = 0L;
+    long crcDiskSize = 0L;
+    if (!isCreate) { // check on disk file
+      blockDiskSize = bytesOnDisk;
+      crcDiskSize = BlockMetadataHeader.getHeaderSize() +
+      (blockDiskSize+bytesPerChunk-1)/bytesPerChunk*checksumSize;
+      if (blockDiskSize>0 && 
+          (blockDiskSize>blockFile.length() || crcDiskSize>metaFile.length())) {
+        throw new IOException("Corrupted block: " + this);
+      }
+    }
     FileOutputStream blockOut = null;
     FileOutputStream crcOut = null;
     try {
@@ -176,6 +189,10 @@
           new RandomAccessFile( blockFile, "rw" ).getFD() );
       crcOut = new FileOutputStream(
           new RandomAccessFile( metaFile, "rw" ).getFD() );
+      if (!isCreate) {
+        blockOut.getChannel().position(blockDiskSize);
+        crcOut.getChannel().position(crcDiskSize);
+      }
       return new BlockWriteStreams(blockOut, crcOut);
     } catch (IOException e) {
       IOUtils.closeStream(blockOut);

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java?rev=828120&r1=828119&r2=828120&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
(original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
Wed Oct 21 18:01:54 2009
@@ -53,8 +53,12 @@
    * Create output streams for writing to this replica, 
    * one for block file and one for CRC file
    * 
+   * @param isCreate if it is for creation
+   * @param bytePerChunk number of bytes per CRC chunk
+   * @param checksumSize number of bytes per checksum
    * @return output streams for writing
    * @throws IOException if any error occurs
    */
-  public BlockWriteStreams createStreams() throws IOException;
+  public BlockWriteStreams createStreams(boolean isCreate,
+      int bytesPerChunk, int checksumSize) throws IOException;
 }

Propchange: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 21 18:01:54 2009
@@ -3,4 +3,4 @@
 /hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java:776175-785643,785929-786278
 /hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:776175-785643,785929-786278
 /hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:796829-820463
-/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:818294-818298,824552,824944,826149
+/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:818294-818298,824552,824944,826149,828116

Propchange: hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/hdfs/protocol/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 21 18:01:54 2009
@@ -1 +1 @@
-/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/protocol:824552,824944,826149
+/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/protocol:824552,824944,826149,828116

Propchange: hadoop/hdfs/branches/branch-0.21/src/test/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 21 18:01:54 2009
@@ -1,4 +1,4 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/test/hdfs:713112
 /hadoop/core/trunk/src/test/hdfs:776175-785643
 /hadoop/hdfs/branches/HDFS-265/src/test/hdfs:796829-820463
-/hadoop/hdfs/trunk/src/test/hdfs:818294-818298,824552,824944,826149
+/hadoop/hdfs/trunk/src/test/hdfs:818294-818298,824552,824944,826149,828116

Modified: hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java?rev=828120&r1=828119&r2=828120&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
(original)
+++ hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
Wed Oct 21 18:01:54 2009
@@ -124,6 +124,8 @@
       out.close();
     }
 
+    AppendTestUtil.check(fs, p, len1);
+
     //   Reopen file to append quarter block of data. Close file.
     final int len2 = (int)BLOCK_SIZE/4; 
     {
@@ -300,4 +302,59 @@
     //c. Reopen file and read 25687+5877 bytes of data from file. Close file.
     AppendTestUtil.check(fs, p, len1 + len2);
   }
+  
+  /** Append to a partial CRC chunk and 
+   * the first write does not fill up the partial CRC trunk
+   * *
+   * @throws IOException
+   */
+  public void testAppendToPartialChunk() throws IOException {
+    final Path p = new Path("/partialChunk/foo");
+    final int fileLen = 513;
+    System.out.println("p=" + p);
+    
+    byte[] fileContents = AppendTestUtil.initBuffer(fileLen);
+
+    // create a new file.
+    FSDataOutputStream stm = AppendTestUtil.createFile(fs, p, 1);
+
+    // create 1 byte file
+    stm.write(fileContents, 0, 1);
+    stm.close();
+    System.out.println("Wrote 1 byte and closed the file " + p);
+
+    // append to file
+    stm = fs.append(p);
+    // Append to a partial CRC trunk
+    stm.write(fileContents, 1, 1);
+    stm.sync();
+    // The partial CRC trunk is not full yet and close the file
+    stm.close();
+    System.out.println("Append 1 byte and closed the file " + p);
+
+    // write the remainder of the file
+    stm = fs.append(p);
+
+    // ensure getPos is set to reflect existing size of the file
+    assertEquals(2, stm.getPos());
+
+    // append to a partial CRC trunk
+    stm.write(fileContents, 2, 1);
+    // The partial chunk is not full yet, force to send a packet to DN
+    stm.sync();
+    System.out.println("Append and flush 1 byte");
+    // The partial chunk is not full yet, force to send another packet to DN
+    stm.write(fileContents, 3, 2);
+    stm.sync();
+    System.out.println("Append and flush 2 byte");
+
+    // fill up the partial chunk and close the file
+    stm.write(fileContents, 5, fileLen-5);
+    stm.close();
+    System.out.println("Flush 508 byte and closed the file " + p);
+
+    // verify that entire file is good
+    AppendTestUtil.checkFullFile(fs, p, fileLen,
+        fileContents, "Failed to append to a partial chunk");
+  }
 }

Modified: hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=828120&r1=828119&r2=828120&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
(original)
+++ hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
Wed Oct 21 18:01:54 2009
@@ -31,8 +31,8 @@
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
@@ -197,7 +197,8 @@
     }
 
     @Override
-    synchronized public BlockWriteStreams createStreams() throws IOException {
+    synchronized public BlockWriteStreams createStreams(boolean isCreate, 
+        int bytesPerChunk, int checksumSize) throws IOException {
       if (finalized) {
         throw new IOException("Trying to write to a finalized replica "
             + theBlock);
@@ -615,24 +616,11 @@
     // nothing to check for simulated data set
   }
 
-  public synchronized long getChannelPosition(Block b, 
-                                              BlockWriteStreams stream)
-                                              throws IOException {
-    BInfo binfo = blockMap.get(b);
-    if (binfo == null) {
-      throw new IOException("No such Block " + b );
-    }
-    return binfo.getNumBytes();
-  }
-
-  public synchronized void setChannelPosition(Block b, BlockWriteStreams stream, 
-                                              long dataOffset, long ckOffset)
+  @Override
+  public synchronized void adjustCrcChannelPosition(Block b,
+                                              BlockWriteStreams stream, 
+                                              int checksumSize)
                                               throws IOException {
-    BInfo binfo = blockMap.get(b);
-    if (binfo == null) {
-      throw new IOException("No such Block " + b );
-    }
-    binfo.setBytesOnDisk(dataOffset);
   }
 
   /** 

Modified: hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java?rev=828120&r1=828119&r2=828120&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
(original)
+++ hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
Wed Oct 21 18:01:54 2009
@@ -66,7 +66,7 @@
     for (int i = startingBlockId; i < startingBlockId+NUMBLOCKS; ++i) {
       Block b = new Block(i, 0, 0); // we pass expected len as zero, - fsdataset should use
the sizeof actual data written
       ReplicaInPipelineInterface bInfo = fsdataset.createRbw(b);
-      BlockWriteStreams out = bInfo.createStreams();
+      BlockWriteStreams out = bInfo.createStreams(true, 512, 4);
       try {
         OutputStream dataOut  = out.dataOut;
         assertEquals(0, fsdataset.getLength(b));

Propchange: hadoop/hdfs/branches/branch-0.21/src/webapps/datanode/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 21 18:01:54 2009
@@ -1,4 +1,4 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/webapps/datanode:713112
 /hadoop/core/trunk/src/webapps/datanode:776175-784663
 /hadoop/hdfs/branches/HDFS-265/src/webapps/datanode:796829-820463
-/hadoop/hdfs/trunk/src/webapps/datanode:818294-818298,824552,824944,826149
+/hadoop/hdfs/trunk/src/webapps/datanode:818294-818298,824552,824944,826149,828116

Propchange: hadoop/hdfs/branches/branch-0.21/src/webapps/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 21 18:01:54 2009
@@ -1,4 +1,4 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/webapps/hdfs:713112
 /hadoop/core/trunk/src/webapps/hdfs:776175-784663
 /hadoop/hdfs/branches/HDFS-265/src/webapps/hdfs:796829-820463
-/hadoop/hdfs/trunk/src/webapps/hdfs:818294-818298,824552,824944,826149
+/hadoop/hdfs/trunk/src/webapps/hdfs:818294-818298,824552,824944,826149,828116

Propchange: hadoop/hdfs/branches/branch-0.21/src/webapps/secondary/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 21 18:01:54 2009
@@ -1,4 +1,4 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/webapps/secondary:713112
 /hadoop/core/trunk/src/webapps/secondary:776175-784663
 /hadoop/hdfs/branches/HDFS-265/src/webapps/secondary:796829-820463
-/hadoop/hdfs/trunk/src/webapps/secondary:818294-818298,824552,824944,826149
+/hadoop/hdfs/trunk/src/webapps/secondary:818294-818298,824552,824944,826149,828116



Mime
View raw message