hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hair...@apache.org
Subject svn commit: r820497 [7/7] - in /hadoop/hdfs/trunk: ./ .eclipse.templates/.launches/ src/contrib/hdfsproxy/ src/docs/src/documentation/content/xdocs/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apach...
Date Wed, 30 Sep 2009 23:39:33 GMT
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java Wed Sep 30 23:39:30 2009
@@ -22,7 +22,6 @@
 import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.util.Random;
-import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -67,17 +66,16 @@
   /** Truncate a block file */
   private long truncateBlockFile() throws IOException {
     synchronized (fds) {
-      for (Entry<Block, ReplicaInfo> entry : fds.volumeMap.entrySet()) {
-        Block b = entry.getKey();
-        File f = entry.getValue().getFile();
-        File mf = FSDataset.getMetaFile(f, b);
+      for (ReplicaInfo b : fds.volumeMap.replicas()) {
+        File f = b.getBlockFile();
+        File mf = b.getMetaFile();
         // Truncate a block file that has a corresponding metadata file
         if (f.exists() && f.length() != 0 && mf.exists()) {
           FileOutputStream s = new FileOutputStream(f);
           FileChannel channel = s.getChannel();
           channel.truncate(0);
           LOG.info("Truncated block file " + f.getAbsolutePath());
-          return entry.getKey().getBlockId();
+          return b.getBlockId();
         }
       }
     }
@@ -87,14 +85,13 @@
   /** Delete a block file */
   private long deleteBlockFile() {
     synchronized(fds) {
-      for (Entry<Block, ReplicaInfo> entry : fds.volumeMap.entrySet()) {
-        Block b = entry.getKey();
-        File f = entry.getValue().getFile();
-        File mf = FSDataset.getMetaFile(f, b);
+      for (ReplicaInfo b : fds.volumeMap.replicas()) {
+        File f = b.getBlockFile();
+        File mf = b.getMetaFile();
         // Delete a block file that has corresponding metadata file
         if (f.exists() && mf.exists() && f.delete()) {
           LOG.info("Deleting block file " + f.getAbsolutePath());
-          return entry.getKey().getBlockId();
+          return b.getBlockId();
         }
       }
     }
@@ -104,16 +101,12 @@
   /** Delete block meta file */
   private long deleteMetaFile() {
     synchronized(fds) {
-      for (Entry<Block, ReplicaInfo> entry : fds.volumeMap.entrySet()) {
-        Block b = entry.getKey();
-        String blkfile = entry.getValue().getFile().getAbsolutePath();
-        long genStamp = b.getGenerationStamp();
-        String metafile = FSDataset.getMetaFileName(blkfile, genStamp);
-        File file = new File(metafile);
+      for (ReplicaInfo b : fds.volumeMap.replicas()) {
+        File file = b.getMetaFile();
         // Delete a metadata file
         if (file.exists() && file.delete()) {
           LOG.info("Deleting metadata file " + file.getAbsolutePath());
-          return entry.getKey().getBlockId();
+          return b.getBlockId();
         }
       }
     }
@@ -324,23 +317,21 @@
   }
 
   private void verifyAddition(long blockId, long genStamp, long size) {
-    Block memBlock = fds.getBlockKey(blockId);
-    assertNotNull(memBlock);
-    ReplicaInfo blockInfo;
+    final ReplicaInfo replicainfo;
     synchronized(fds) {
-      blockInfo = fds.volumeMap.get(memBlock);
+      replicainfo = fds.getReplica(blockId);
     }
-    assertNotNull(blockInfo);
+    assertNotNull(replicainfo);
 
     // Added block has the same file as the one created by the test
     File file = new File(getBlockFile(blockId));
-    assertEquals(file.getName(), blockInfo.getFile().getName());
+    assertEquals(file.getName(), replicainfo.getBlockFile().getName());
 
     // Generation stamp is same as that of created file
-    assertEquals(genStamp, memBlock.getGenerationStamp());
+    assertEquals(genStamp, replicainfo.getGenerationStamp());
 
     // File size matches
-    assertEquals(size, memBlock.getNumBytes());
+    assertEquals(size, replicainfo.getNumBytes());
   }
 
   private void verifyDeletion(long blockId) {
@@ -351,9 +342,9 @@
   }
 
   private void verifyGenStamp(long blockId, long genStamp) {
-    Block memBlock;
+    final Replica memBlock;
     synchronized(fds) {
-      memBlock = fds.getBlockKey(blockId);
+      memBlock = fds.getReplica(blockId);
     }
     assertNotNull(memBlock);
     assertEquals(genStamp, memBlock.getGenerationStamp());

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Wed Sep 30 23:39:30 2009
@@ -32,8 +32,11 @@
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Sender;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.AccessToken;
@@ -58,8 +61,8 @@
     FileSystem fs = cluster.getFileSystem();
     final int dnIndex = 0;
     String dataDir = cluster.getDataDirectory();
-    File dir1 = new File(new File(dataDir, "data"+(2*dnIndex+1)), "tmp");
-    File dir2 = new File(new File(dataDir, "data"+(2*dnIndex+2)), "tmp");
+    File dir1 = new File(new File(dataDir, "data"+(2*dnIndex+1)), "current/rbw");
+    File dir2 = new File(new File(dataDir, "data"+(2*dnIndex+2)), "current/rbw");
     try {
       // make the data directory of the first datanode to be readonly
       assertTrue(dir1.setReadOnly());
@@ -114,17 +117,12 @@
       DataOutputStream out = new DataOutputStream(
           s.getOutputStream());
 
-      out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
-      WRITE_BLOCK.write(out);
-      out.writeLong( block.getBlock().getBlockId());
-      out.writeLong( block.getBlock().getGenerationStamp() );
-      out.writeInt(1);
-      out.writeBoolean( false );       // recovery flag
-      Text.writeString( out, "" );
-      out.writeBoolean(false); // Not sending src node information
-      out.writeInt(0);
-      AccessToken.DUMMY_TOKEN.write(out);
-      
+      Sender.opWriteBlock(out, block.getBlock().getBlockId(), 
+          block.getBlock().getGenerationStamp(), 1, 
+          BlockConstructionStage.PIPELINE_SETUP_CREATE, 
+          0L, 0L, 0L, "", null, new DatanodeInfo[0], 
+          AccessToken.DUMMY_TOKEN);
+
       // write check header
       out.writeByte( 1 );
       out.writeInt( 512 );
@@ -136,8 +134,8 @@
       
       // the temporary block & meta files should be deleted
       String dataDir = cluster.getDataDirectory();
-      File dir1 = new File(new File(dataDir, "data"+(2*sndNode+1)), "tmp");
-      File dir2 = new File(new File(dataDir, "data"+(2*sndNode+2)), "tmp");
+      File dir1 = new File(new File(dataDir, "data"+(2*sndNode+1)), "current/rbw");
+      File dir2 = new File(new File(dataDir, "data"+(2*sndNode+2)), "current/rbw");
       while (dir1.listFiles().length != 0 || dir2.listFiles().length != 0) {
         Thread.sleep(100);
       }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java Wed Sep 30 23:39:30 2009
@@ -21,33 +21,30 @@
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.junit.Assert;
+import org.junit.Test;
 
 /**
  * This tests InterDataNodeProtocol for block handling. 
  */
 public class TestInterDatanodeProtocol extends junit.framework.TestCase {
-  public static void checkMetaInfo(Block b, InterDatanodeProtocol idp,
-      DataBlockScanner scanner) throws IOException {
-    BlockMetaDataInfo metainfo = idp.getBlockMetaDataInfo(b);
+  public static void checkMetaInfo(Block b, DataNode dn) throws IOException {
+    Block metainfo = dn.data.getStoredBlock(b.getBlockId());
     assertEquals(b.getBlockId(), metainfo.getBlockId());
     assertEquals(b.getNumBytes(), metainfo.getNumBytes());
-    if (scanner != null) {
-      assertEquals(scanner.getLastScanTime(b),
-          metainfo.getLastScanTime());
-    }
   }
 
   public static LocatedBlock getLastLocatedBlock(
@@ -99,16 +96,155 @@
       //verify BlockMetaDataInfo
       Block b = locatedblock.getBlock();
       InterDatanodeProtocol.LOG.info("b=" + b + ", " + b.getClass());
-      checkMetaInfo(b, idp, datanode.blockScanner);
+      checkMetaInfo(b, datanode);
+      long recoveryId = b.getGenerationStamp() + 1;
+      idp.initReplicaRecovery(
+          new RecoveringBlock(b, locatedblock.getLocations(), recoveryId));
 
       //verify updateBlock
       Block newblock = new Block(
           b.getBlockId(), b.getNumBytes()/2, b.getGenerationStamp()+1);
-      idp.updateBlock(b, newblock, false);
-      checkMetaInfo(newblock, idp, datanode.blockScanner);
+      idp.updateReplicaUnderRecovery(b, recoveryId, newblock.getNumBytes());
+      checkMetaInfo(newblock, datanode);
     }
     finally {
       if (cluster != null) {cluster.shutdown();}
     }
   }
+
+  private static ReplicaInfo createReplicaInfo(Block b) {
+    return new ReplicaBeingWritten(b.getBlockId(), b.getGenerationStamp(),
+        null, null);
+  }
+
+  private static void assertEquals(ReplicaInfo originalInfo, ReplicaRecoveryInfo recoveryInfo) {
+    Assert.assertEquals(originalInfo.getBlockId(), recoveryInfo.getBlockId());
+    Assert.assertEquals(originalInfo.getGenerationStamp(), recoveryInfo.getGenerationStamp());
+    Assert.assertEquals(originalInfo.getBytesOnDisk(), recoveryInfo.getNumBytes());
+    Assert.assertEquals(originalInfo.getState(), recoveryInfo.getOriginalReplicaState());
+  }
+
+  /** Test {@link FSDataset#initReplicaRecovery(ReplicasMap, Block, long)} */
+  @Test
+  public void testInitReplicaRecovery() throws IOException {
+    final long firstblockid = 10000L;
+    final long gs = 7777L;
+    final long length = 22L;
+    final ReplicasMap map = new ReplicasMap();
+    final Block[] blocks = new Block[5];
+    for(int i = 0; i < blocks.length; i++) {
+      blocks[i] = new Block(firstblockid + i, length, gs);
+      map.add(createReplicaInfo(blocks[i]));
+    }
+    
+    { 
+      //normal case
+      final Block b = blocks[0];
+      final ReplicaInfo originalInfo = map.get(b);
+
+      final long recoveryid = gs + 1;
+      final ReplicaRecoveryInfo recoveryInfo = FSDataset.initReplicaRecovery(map, blocks[0], recoveryid);
+      assertEquals(originalInfo, recoveryInfo);
+
+      final ReplicaUnderRecovery updatedInfo = (ReplicaUnderRecovery)map.get(b);
+      Assert.assertEquals(originalInfo.getBlockId(), updatedInfo.getBlockId());
+      Assert.assertEquals(recoveryid, updatedInfo.getRecoveryID());
+
+      //recover one more time 
+      final long recoveryid2 = gs + 2;
+      final ReplicaRecoveryInfo recoveryInfo2 = FSDataset.initReplicaRecovery(map, blocks[0], recoveryid2);
+      assertEquals(originalInfo, recoveryInfo2);
+
+      final ReplicaUnderRecovery updatedInfo2 = (ReplicaUnderRecovery)map.get(b);
+      Assert.assertEquals(originalInfo.getBlockId(), updatedInfo2.getBlockId());
+      Assert.assertEquals(recoveryid2, updatedInfo2.getRecoveryID());
+      
+      //case RecoveryInProgressException
+      try {
+        FSDataset.initReplicaRecovery(map, b, recoveryid);
+        Assert.fail();
+      }
+      catch(RecoveryInProgressException ripe) {
+        System.out.println("GOOD: getting " + ripe);
+      }
+    }
+
+    { //replica not found
+      final long recoveryid = gs + 1;
+      final Block b = new Block(firstblockid - 1, length, gs);
+      ReplicaRecoveryInfo r = FSDataset.initReplicaRecovery(map, b, recoveryid);
+      Assert.assertNull("Data-node should not have this replica.", r);
+    }
+    
+    { //case "THIS IS NOT SUPPOSED TO HAPPEN"
+      final long recoveryid = gs - 1;
+      final Block b = new Block(firstblockid + 1, length, gs);
+      try {
+        FSDataset.initReplicaRecovery(map, b, recoveryid);
+        Assert.fail();
+      }
+      catch(IOException ioe) {
+        System.out.println("GOOD: getting " + ioe);
+      }
+    }
+
+  }
+
+  /** Test {@link FSDataset#updateReplicaUnderRecovery(ReplicaUnderRecovery, long, long)} */
+  @Test
+  public void testUpdateReplicaUnderRecovery() throws IOException {
+    final Configuration conf = new Configuration();
+    MiniDFSCluster cluster = null;
+
+    try {
+      cluster = new MiniDFSCluster(conf, 3, true, null);
+      cluster.waitActive();
+
+      //create a file
+      DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
+      String filestr = "/foo";
+      Path filepath = new Path(filestr);
+      DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L);
+
+      //get block info
+      final LocatedBlock locatedblock = getLastLocatedBlock(
+          dfs.getClient().getNamenode(), filestr);
+      final DatanodeInfo[] datanodeinfo = locatedblock.getLocations();
+      Assert.assertTrue(datanodeinfo.length > 0);
+
+      //get DataNode and FSDataset objects
+      final DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort());
+      Assert.assertTrue(datanode != null);
+      Assert.assertTrue(datanode.data instanceof FSDataset);
+      final FSDataset fsdataset = (FSDataset)datanode.data;
+
+      //initReplicaRecovery
+      final Block b = locatedblock.getBlock();
+      final long recoveryid = b.getGenerationStamp() + 1;
+      final long newlength = b.getNumBytes() - 1;
+      FSDataset.initReplicaRecovery(fsdataset.volumeMap, b, recoveryid);
+
+      //check replica
+      final ReplicaInfo replica = fsdataset.getReplica(b.getBlockId());
+      Assert.assertTrue(replica instanceof ReplicaUnderRecovery);
+      final ReplicaUnderRecovery rur = (ReplicaUnderRecovery)replica;
+
+      //check meta data before update
+      FSDataset.checkReplicaFiles(rur);
+
+      //update
+      final ReplicaInfo finalized = 
+        (ReplicaInfo)fsdataset.updateReplicaUnderRecovery(
+            rur, recoveryid, newlength);
+
+      //check meta data after update
+      FSDataset.checkReplicaFiles(finalized);
+      Assert.assertEquals(b.getBlockId(), finalized.getBlockId());
+      Assert.assertEquals(recoveryid, finalized.getGenerationStamp());
+      Assert.assertEquals(newlength, finalized.getNumBytes());
+
+    } finally {
+      if (cluster != null) cluster.shutdown();
+    }
+  }
 }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java Wed Sep 30 23:39:30 2009
@@ -25,8 +25,10 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
 import org.apache.hadoop.util.DataChecksum;
 
 /**
@@ -62,14 +64,19 @@
     int bytesAdded = 0;
     for (int i = startingBlockId; i < startingBlockId+NUMBLOCKS; ++i) {
       Block b = new Block(i, 0, 0); // we pass expected len as zero, - fsdataset should use the sizeof actual data written
-      OutputStream dataOut  = fsdataset.writeToBlock(b, false).dataOut;
-      assertEquals(0, fsdataset.getLength(b));
-      for (int j=1; j <= blockIdToLen(i); ++j) {
-        dataOut.write(j);
-        assertEquals(j, fsdataset.getLength(b)); // correct length even as we write
-        bytesAdded++;
+      ReplicaInPipelineInterface bInfo = fsdataset.createRbw(b);
+      BlockWriteStreams out = bInfo.createStreams();
+      try {
+        OutputStream dataOut  = out.dataOut;
+        assertEquals(0, fsdataset.getLength(b));
+        for (int j=1; j <= blockIdToLen(i); ++j) {
+          dataOut.write(j);
+          assertEquals(j, bInfo.getBytesOnDisk()); // correct length even as we write
+          bytesAdded++;
+        }
+      } finally {
+        out.close();
       }
-      dataOut.close();
       b.setNumBytes(blockIdToLen(i));
       fsdataset.finalizeBlock(b);
       assertEquals(blockIdToLen(i), fsdataset.getLength(b));
@@ -139,24 +146,24 @@
 
 
   public void testGetBlockReport() throws IOException {
-    FSDatasetInterface fsdataset = new SimulatedFSDataset(conf); 
-    Block[] blockReport = fsdataset.getBlockReport();
-    assertEquals(0, blockReport.length);
+    SimulatedFSDataset fsdataset = new SimulatedFSDataset(conf); 
+    BlockListAsLongs blockReport = fsdataset.getBlockReport();
+    assertEquals(0, blockReport.getNumberOfBlocks());
     int bytesAdded = addSomeBlocks(fsdataset);
     blockReport = fsdataset.getBlockReport();
-    assertEquals(NUMBLOCKS, blockReport.length);
+    assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
     for (Block b: blockReport) {
       assertNotNull(b);
       assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
     }
   }
   public void testInjectionEmpty() throws IOException {
-    FSDatasetInterface fsdataset = new SimulatedFSDataset(conf); 
-    Block[] blockReport = fsdataset.getBlockReport();
-    assertEquals(0, blockReport.length);
+    SimulatedFSDataset fsdataset = new SimulatedFSDataset(conf); 
+    BlockListAsLongs blockReport = fsdataset.getBlockReport();
+    assertEquals(0, blockReport.getNumberOfBlocks());
     int bytesAdded = addSomeBlocks(fsdataset);
     blockReport = fsdataset.getBlockReport();
-    assertEquals(NUMBLOCKS, blockReport.length);
+    assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
     for (Block b: blockReport) {
       assertNotNull(b);
       assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
@@ -169,7 +176,7 @@
     SimulatedFSDataset sfsdataset = new SimulatedFSDataset(conf);
     sfsdataset.injectBlocks(blockReport);
     blockReport = sfsdataset.getBlockReport();
-    assertEquals(NUMBLOCKS, blockReport.length);
+    assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
     for (Block b: blockReport) {
       assertNotNull(b);
       assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
@@ -180,13 +187,13 @@
   }
 
   public void testInjectionNonEmpty() throws IOException {
-    FSDatasetInterface fsdataset = new SimulatedFSDataset(conf); 
+    SimulatedFSDataset fsdataset = new SimulatedFSDataset(conf); 
     
-    Block[] blockReport = fsdataset.getBlockReport();
-    assertEquals(0, blockReport.length);
+    BlockListAsLongs blockReport = fsdataset.getBlockReport();
+    assertEquals(0, blockReport.getNumberOfBlocks());
     int bytesAdded = addSomeBlocks(fsdataset);
     blockReport = fsdataset.getBlockReport();
-    assertEquals(NUMBLOCKS, blockReport.length);
+    assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
     for (Block b: blockReport) {
       assertNotNull(b);
       assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
@@ -201,13 +208,13 @@
     // Add come blocks whose block ids do not conflict with
     // the ones we are going to inject.
     bytesAdded += addSomeBlocks(sfsdataset, NUMBLOCKS+1);
-    Block[] blockReport2 = sfsdataset.getBlockReport();
-    assertEquals(NUMBLOCKS, blockReport.length);
+    BlockListAsLongs blockReport2 = sfsdataset.getBlockReport();
+    assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
     blockReport2 = sfsdataset.getBlockReport();
-    assertEquals(NUMBLOCKS, blockReport.length);
+    assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
     sfsdataset.injectBlocks(blockReport);
     blockReport = sfsdataset.getBlockReport();
-    assertEquals(NUMBLOCKS*2, blockReport.length);
+    assertEquals(NUMBLOCKS*2, blockReport.getNumberOfBlocks());
     for (Block b: blockReport) {
       assertNotNull(b);
       assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());

Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,431 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test if FSDataset#append, writeToRbw, and writeToTmp */
+public class TestWriteToReplica {
+  final private static Block[] blocks = new Block[] {
+    new Block(1, 1, 2001), new Block(2, 1, 2002), 
+    new Block(3, 1, 2003), new Block(4, 1, 2004),
+    new Block(5, 1, 2005), new Block(6, 1, 2006)
+  };
+  final private static int FINALIZED = 0;
+  final private static int TEMPORARY = 1;
+  final private static int RBW = 2;
+  final private static int RWR = 3;
+  final private static int RUR = 4;
+  final private static int NON_EXISTENT = 5;
+  
+  // test close
+  @Test
+  public void testClose() throws Exception {
+    MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 1, true, null);
+    try {
+      cluster.waitActive();
+      DataNode dn = cluster.getDataNodes().get(0);
+      FSDataset dataSet = (FSDataset)dn.data;
+
+      // set up replicasMap
+      setup(dataSet);
+
+      // test close
+      testClose(dataSet);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  // test append
+  @Test
+  public void testAppend() throws Exception {
+    MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 1, true, null);
+    try {
+      cluster.waitActive();
+      DataNode dn = cluster.getDataNodes().get(0);
+      FSDataset dataSet = (FSDataset)dn.data;
+
+      // set up replicasMap
+      setup(dataSet);
+
+      // test append
+      testAppend(dataSet);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  // test writeToRbw
+  @Test
+  public void testWriteToRbw() throws Exception {
+    MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 1, true, null);
+    try {
+      cluster.waitActive();
+      DataNode dn = cluster.getDataNodes().get(0);
+      FSDataset dataSet = (FSDataset)dn.data;
+
+      // set up replicasMap
+      setup(dataSet);
+
+      // test writeToRbw
+      testWriteToRbw(dataSet);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  // test writeToTemporary
+  @Test
+  public void testWriteToTempoary() throws Exception {
+    MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 1, true, null);
+    try {
+      cluster.waitActive();
+      DataNode dn = cluster.getDataNodes().get(0);
+      FSDataset dataSet = (FSDataset)dn.data;
+
+      // set up replicasMap
+      setup(dataSet);
+
+      // test writeToTemporary
+      testWriteToTemporary(dataSet);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  private void setup(FSDataset dataSet) throws IOException {
+    // setup replicas map
+    ReplicasMap replicasMap = dataSet.volumeMap;
+    FSVolume vol = dataSet.volumes.getNextVolume(0);
+    ReplicaInfo replicaInfo = new FinalizedReplica(
+        blocks[FINALIZED], vol, vol.getDir());
+    replicasMap.add(replicaInfo);
+    replicaInfo.getBlockFile().createNewFile();
+    replicaInfo.getMetaFile().createNewFile();
+    
+    replicasMap.add(new ReplicaInPipeline(
+        blocks[TEMPORARY].getBlockId(),
+        blocks[TEMPORARY].getGenerationStamp(), vol, 
+        vol.createTmpFile(blocks[TEMPORARY]).getParentFile()));
+    
+    replicaInfo = new ReplicaBeingWritten(blocks[RBW], vol, 
+        vol.createRbwFile(blocks[RBW]).getParentFile(), null);
+    replicasMap.add(replicaInfo);
+    replicaInfo.getBlockFile().createNewFile();
+    replicaInfo.getMetaFile().createNewFile();
+    
+    replicasMap.add(new ReplicaWaitingToBeRecovered(blocks[RWR], vol, 
+        vol.createRbwFile(blocks[RWR]).getParentFile()));
+    replicasMap.add(new ReplicaUnderRecovery(
+        new FinalizedReplica(blocks[RUR], vol, vol.getDir()), 2007));    
+  }
+  
+  private void testAppend(FSDataset dataSet) throws IOException {
+    long newGS = blocks[FINALIZED].getGenerationStamp()+1;
+    dataSet.append(blocks[FINALIZED], newGS, 
+        blocks[FINALIZED].getNumBytes());  // successful
+    blocks[FINALIZED].setGenerationStamp(newGS);
+    
+    try {
+      dataSet.append(blocks[TEMPORARY], blocks[TEMPORARY].getGenerationStamp()+1, 
+          blocks[TEMPORARY].getNumBytes());
+      Assert.fail("Should not have appended to a temporary replica " 
+          + blocks[TEMPORARY]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertEquals(ReplicaNotFoundException.UNFINALIZED_REPLICA +
+          blocks[TEMPORARY], e.getMessage());
+    }
+
+    try {
+      dataSet.append(blocks[RBW], blocks[RBW].getGenerationStamp()+1,
+          blocks[RBW].getNumBytes());
+      Assert.fail("Should not have appended to an RBW replica" + blocks[RBW]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertEquals(ReplicaNotFoundException.UNFINALIZED_REPLICA +
+          blocks[RBW], e.getMessage());
+    }
+
+    try {
+      dataSet.append(blocks[RWR], blocks[RWR].getGenerationStamp()+1,
+          blocks[RBW].getNumBytes());
+      Assert.fail("Should not have appended to an RWR replica" + blocks[RWR]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertEquals(ReplicaNotFoundException.UNFINALIZED_REPLICA +
+          blocks[RWR], e.getMessage());
+    }
+
+    try {
+      dataSet.append(blocks[RUR], blocks[RUR].getGenerationStamp()+1,
+          blocks[RUR].getNumBytes());
+      Assert.fail("Should not have appended to an RUR replica" + blocks[RUR]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertEquals(ReplicaNotFoundException.UNFINALIZED_REPLICA +
+          blocks[RUR], e.getMessage());
+    }
+
+    try {
+      dataSet.append(blocks[NON_EXISTENT], 
+          blocks[NON_EXISTENT].getGenerationStamp(), 
+          blocks[NON_EXISTENT].getNumBytes());
+      Assert.fail("Should not have appended to a non-existent replica " + 
+          blocks[NON_EXISTENT]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertEquals(ReplicaNotFoundException.NON_EXISTENT_REPLICA + 
+          blocks[NON_EXISTENT], e.getMessage());
+    }
+    
+    newGS = blocks[FINALIZED].getGenerationStamp()+1;
+    dataSet.recoverAppend(blocks[FINALIZED], newGS, 
+        blocks[FINALIZED].getNumBytes());  // successful
+    blocks[FINALIZED].setGenerationStamp(newGS);
+    
+    try {
+      dataSet.recoverAppend(blocks[TEMPORARY], blocks[TEMPORARY].getGenerationStamp()+1, 
+          blocks[TEMPORARY].getNumBytes());
+      Assert.fail("Should not have appended to a temporary replica " 
+          + blocks[TEMPORARY]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertTrue(e.getMessage().startsWith(
+          ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
+    }
+
+    newGS = blocks[RBW].getGenerationStamp()+1;
+    dataSet.recoverAppend(blocks[RBW], newGS, blocks[RBW].getNumBytes());
+    blocks[RBW].setGenerationStamp(newGS);
+
+    try {
+      dataSet.recoverAppend(blocks[RWR], blocks[RWR].getGenerationStamp()+1,
+          blocks[RBW].getNumBytes());
+      Assert.fail("Should not have appended to an RWR replica" + blocks[RWR]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertTrue(e.getMessage().startsWith(
+          ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
+    }
+
+    try {
+      dataSet.recoverAppend(blocks[RUR], blocks[RUR].getGenerationStamp()+1,
+          blocks[RUR].getNumBytes());
+      Assert.fail("Should not have appended to an RUR replica" + blocks[RUR]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertTrue(e.getMessage().startsWith(
+          ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
+    }
+
+    try {
+      dataSet.recoverAppend(blocks[NON_EXISTENT], 
+          blocks[NON_EXISTENT].getGenerationStamp(), 
+          blocks[NON_EXISTENT].getNumBytes());
+      Assert.fail("Should not have appended to a non-existent replica " + 
+          blocks[NON_EXISTENT]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertTrue(e.getMessage().startsWith(
+          ReplicaNotFoundException.NON_EXISTENT_REPLICA));
+    }
+  }
+
+  private void testClose(FSDataset dataSet) throws IOException {
+    long newGS = blocks[FINALIZED].getGenerationStamp()+1;
+    dataSet.recoverClose(blocks[FINALIZED], newGS, 
+        blocks[FINALIZED].getNumBytes());  // successful
+    blocks[FINALIZED].setGenerationStamp(newGS);
+    
+    try {
+      dataSet.recoverClose(blocks[TEMPORARY], blocks[TEMPORARY].getGenerationStamp()+1, 
+          blocks[TEMPORARY].getNumBytes());
+      Assert.fail("Should not have recovered close a temporary replica " 
+          + blocks[TEMPORARY]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertTrue(e.getMessage().startsWith(
+          ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
+    }
+
+    newGS = blocks[RBW].getGenerationStamp()+1;
+    dataSet.recoverClose(blocks[RBW], newGS, blocks[RBW].getNumBytes());
+    blocks[RBW].setGenerationStamp(newGS);
+
+    try {
+      dataSet.recoverClose(blocks[RWR], blocks[RWR].getGenerationStamp()+1,
+          blocks[RBW].getNumBytes());
+      Assert.fail("Should not have recovered close an RWR replica" + blocks[RWR]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertTrue(e.getMessage().startsWith(
+          ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
+    }
+
+    try {
+      dataSet.recoverClose(blocks[RUR], blocks[RUR].getGenerationStamp()+1,
+          blocks[RUR].getNumBytes());
+      Assert.fail("Should not have recovered close an RUR replica" + blocks[RUR]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertTrue(e.getMessage().startsWith(
+          ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
+    }
+
+    try {
+      dataSet.recoverClose(blocks[NON_EXISTENT], 
+          blocks[NON_EXISTENT].getGenerationStamp(), 
+          blocks[NON_EXISTENT].getNumBytes());
+      Assert.fail("Should not have recovered close a non-existent replica " + 
+          blocks[NON_EXISTENT]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertTrue(e.getMessage().startsWith(
+          ReplicaNotFoundException.NON_EXISTENT_REPLICA));
+    }
+  }
+  
+  private void testWriteToRbw(FSDataset dataSet) throws IOException {
+    try {
+      dataSet.recoverRbw(blocks[FINALIZED],
+          blocks[FINALIZED].getGenerationStamp()+1,
+          0L, blocks[FINALIZED].getNumBytes());
+      Assert.fail("Should not have recovered a finalized replica " +
+          blocks[FINALIZED]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertTrue(e.getMessage().startsWith(
+          ReplicaNotFoundException.NON_RBW_REPLICA));
+    }
+ 
+    try {
+      dataSet.createRbw(blocks[FINALIZED]);
+      Assert.fail("Should not have created a replica that's already " +
+      		"finalized " + blocks[FINALIZED]);
+    } catch (ReplicaAlreadyExistsException e) {
+    }
+ 
+    try {
+      dataSet.recoverRbw(blocks[TEMPORARY], 
+          blocks[TEMPORARY].getGenerationStamp()+1, 
+          0L, blocks[TEMPORARY].getNumBytes());
+      Assert.fail("Should not have recovered a temporary replica " +
+          blocks[TEMPORARY]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertTrue(e.getMessage().startsWith(
+          ReplicaNotFoundException.NON_RBW_REPLICA));
+    }
+
+    try {
+      dataSet.createRbw(blocks[TEMPORARY]);
+      Assert.fail("Should not have created a replica that had created as " +
+      		"temporary " + blocks[TEMPORARY]);
+    } catch (ReplicaAlreadyExistsException e) {
+    }
+        
+    dataSet.recoverRbw(blocks[RBW], blocks[RBW].getGenerationStamp()+1, 
+        0L, blocks[RBW].getNumBytes());  // expect to be successful
+    
+    try {
+      dataSet.createRbw(blocks[RBW]);
+      Assert.fail("Should not have created a replica that had created as RBW " +
+          blocks[RBW]);
+    } catch (ReplicaAlreadyExistsException e) {
+    }
+    
+    try {
+      dataSet.recoverRbw(blocks[RWR], blocks[RWR].getGenerationStamp()+1,
+          0L, blocks[RWR].getNumBytes());
+      Assert.fail("Should not have recovered a RWR replica " + blocks[RWR]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertTrue(e.getMessage().startsWith(
+          ReplicaNotFoundException.NON_RBW_REPLICA));
+    }
+
+    try {
+      dataSet.createRbw(blocks[RWR]);
+      Assert.fail("Should not have created a replica that was waiting to be " +
+      		"recovered " + blocks[RWR]);
+    } catch (ReplicaAlreadyExistsException e) {
+    }
+    
+    try {
+      dataSet.recoverRbw(blocks[RUR], blocks[RUR].getGenerationStamp()+1,
+          0L, blocks[RUR].getNumBytes());
+      Assert.fail("Should not have recovered a RUR replica " + blocks[RUR]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertTrue(e.getMessage().startsWith(
+          ReplicaNotFoundException.NON_RBW_REPLICA));
+    }
+
+    try {
+      dataSet.createRbw(blocks[RUR]);
+      Assert.fail("Should not have created a replica that was under recovery " +
+          blocks[RUR]);
+    } catch (ReplicaAlreadyExistsException e) {
+    }
+    
+    try {
+      dataSet.recoverRbw(blocks[NON_EXISTENT],
+          blocks[NON_EXISTENT].getGenerationStamp()+1,
+          0L, blocks[NON_EXISTENT].getNumBytes());
+      Assert.fail("Cannot recover a non-existent replica " +
+          blocks[NON_EXISTENT]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertTrue(
+          e.getMessage().contains(ReplicaNotFoundException.NON_EXISTENT_REPLICA));
+    }
+    
+    dataSet.createRbw(blocks[NON_EXISTENT]);
+  }
+  
+  private void testWriteToTemporary(FSDataset dataSet) throws IOException {
+    try {
+      dataSet.createTemporary(blocks[FINALIZED]);
+      Assert.fail("Should not have created a temporary replica that was " +
+      		"finalized " + blocks[FINALIZED]);
+    } catch (ReplicaAlreadyExistsException e) {
+    }
+ 
+    try {
+      dataSet.createTemporary(blocks[TEMPORARY]);
+      Assert.fail("Should not have created a replica that had created as" +
+      		"temporary " + blocks[TEMPORARY]);
+    } catch (ReplicaAlreadyExistsException e) {
+    }
+    
+    try {
+      dataSet.createTemporary(blocks[RBW]);
+      Assert.fail("Should not have created a replica that had created as RBW " +
+          blocks[RBW]);
+    } catch (ReplicaAlreadyExistsException e) {
+    }
+    
+    try {
+      dataSet.createTemporary(blocks[RWR]);
+      Assert.fail("Should not have created a replica that was waiting to be " +
+      		"recovered " + blocks[RWR]);
+    } catch (ReplicaAlreadyExistsException e) {
+    }
+    
+    try {
+      dataSet.createTemporary(blocks[RUR]);
+      Assert.fail("Should not have created a replica that was under recovery " +
+          blocks[RUR]);
+    } catch (ReplicaAlreadyExistsException e) {
+    }
+    
+    dataSet.createTemporary(blocks[NON_EXISTENT]);
+  }
+}

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java Wed Sep 30 23:39:30 2009
@@ -519,7 +519,8 @@
               .of(CreateFlag.OVERWRITE)), true, replication, BLOCK_SIZE);
       long end = System.currentTimeMillis();
       for(boolean written = !closeUponCreate; !written; 
-        written = nameNode.complete(fileNames[daemonId][inputIdx], clientName));
+        written = nameNode.complete(fileNames[daemonId][inputIdx],
+                                    clientName, null));
       return end-start;
     }
 
@@ -685,8 +686,9 @@
     
     NamespaceInfo nsInfo;
     DatanodeRegistration dnRegistration;
-    Block[] blocks;
+    ArrayList<Block> blocks;
     int nrBlocks; // actual number of blocks
+    long[] blockReportList;
 
     /**
      * Get data-node in the form 
@@ -705,7 +707,7 @@
 
     TinyDatanode(int dnIdx, int blockCapacity) throws IOException {
       dnRegistration = new DatanodeRegistration(getNodeName(dnIdx));
-      this.blocks = new Block[blockCapacity];
+      this.blocks = new ArrayList<Block>(blockCapacity);
       this.nrBlocks = 0;
     }
 
@@ -738,19 +740,24 @@
     }
 
     boolean addBlock(Block blk) {
-      if(nrBlocks == blocks.length) {
-        LOG.debug("Cannot add block: datanode capacity = " + blocks.length);
+      if(nrBlocks == blocks.size()) {
+        LOG.debug("Cannot add block: datanode capacity = " + blocks.size());
         return false;
       }
-      blocks[nrBlocks] = blk;
+      blocks.set(nrBlocks, blk);
       nrBlocks++;
       return true;
     }
 
     void formBlockReport() {
       // fill remaining slots with blocks that do not exist
-      for(int idx = blocks.length-1; idx >= nrBlocks; idx--)
-        blocks[idx] = new Block(blocks.length - idx, 0, 0);
+      for(int idx = blocks.size()-1; idx >= nrBlocks; idx--)
+        blocks.set(idx, new Block(blocks.size() - idx, 0, 0));
+      blockReportList = new BlockListAsLongs(blocks,null).getBlockListAsLongs();
+    }
+
+    long[] getBlockReportList() {
+      return blockReportList;
     }
 
     public int compareTo(String name) {
@@ -760,6 +767,7 @@
     /**
      * Send a heartbeat to the name-node and replicate blocks if requested.
      */
+    @SuppressWarnings("unused") // keep it for future blockReceived benchmark
     int replicateBlocks() throws IOException {
       // register datanode
       DatanodeCommand[] cmds = nameNode.sendHeartbeat(
@@ -889,8 +897,8 @@
         nameNode.create(fileName, FsPermission.getDefault(), clientName,
             new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.OVERWRITE)), true, replication,
             BLOCK_SIZE);
-        addBlocks(fileName, clientName);
-        nameNode.complete(fileName, clientName);
+        Block lastBlock = addBlocks(fileName, clientName);
+        nameNode.complete(fileName, clientName, lastBlock);
       }
       // prepare block reports
       for(int idx=0; idx < nrDatanodes; idx++) {
@@ -898,9 +906,12 @@
       }
     }
 
-    private void addBlocks(String fileName, String clientName) throws IOException {
+    private Block addBlocks(String fileName, String clientName)
+    throws IOException {
+      Block prevBlock = null;
       for(int jdx = 0; jdx < blocksPerFile; jdx++) {
-        LocatedBlock loc = nameNode.addBlock(fileName, clientName);
+        LocatedBlock loc = nameNode.addBlock(fileName, clientName, prevBlock);
+        prevBlock = loc.getBlock();
         for(DatanodeInfo dnInfo : loc.getLocations()) {
           int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getName());
           datanodes[dnIdx].addBlock(loc.getBlock());
@@ -910,6 +921,7 @@
               new String[] {""});
         }
       }
+      return prevBlock;
     }
 
     /**
@@ -923,8 +935,7 @@
       assert daemonId < numThreads : "Wrong daemonId.";
       TinyDatanode dn = datanodes[daemonId];
       long start = System.currentTimeMillis();
-      nameNode.blockReport(dn.dnRegistration,
-          BlockListAsLongs.convertToArrayLongs(dn.blocks));
+      nameNode.blockReport(dn.dnRegistration, dn.getBlockReportList());
       long end = System.currentTimeMillis();
       return end-start;
     }

Propchange: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Propchange: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.TestFileCreation;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestBlockUnderConstruction {
+  static final String BASE_DIR = "/test/TestBlockUnderConstruction";
+  static final int BLOCK_SIZE = 8192; // same as TestFileCreation.blocksize
+  static final int NUM_BLOCKS = 5;  // number of blocks to write
+
+  private static MiniDFSCluster cluster;
+  private static DistributedFileSystem hdfs;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Configuration conf = new Configuration();
+    cluster = new MiniDFSCluster(conf, 3, true, null);
+    cluster.waitActive();
+    hdfs = (DistributedFileSystem)cluster.getFileSystem();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if(hdfs != null) hdfs.close();
+    if(cluster != null) cluster.shutdown();
+  }
+
+  void writeFile(Path file, FSDataOutputStream stm, int size)
+  throws IOException {
+    long blocksBefore = stm.getPos() / BLOCK_SIZE;
+    
+    TestFileCreation.writeFile(stm, BLOCK_SIZE);
+    int blocksAfter = 0;
+    // wait until the block is allocated by DataStreamer
+    BlockLocation[] locatedBlocks;
+    while(blocksAfter <= blocksBefore) {
+      locatedBlocks = hdfs.getClient().getBlockLocations(
+          file.toString(), 0L, BLOCK_SIZE*NUM_BLOCKS);
+      blocksAfter = locatedBlocks == null ? 0 : locatedBlocks.length;
+    }
+  }
+
+  private void verifyFileBlocks(String file,
+                                boolean isFileOpen) throws IOException {
+    FSNamesystem ns = cluster.getNamesystem();
+    INodeFile inode = ns.dir.getFileINode(file);
+    assertTrue("File does not exist: " + inode.toString(), inode != null);
+    assertTrue("File " + inode.toString() +
+        " isUnderConstruction = " + inode.isUnderConstruction() +
+        " expected to be " + isFileOpen,
+        inode.isUnderConstruction() == isFileOpen);
+    BlockInfo[] blocks = inode.getBlocks();
+    assertTrue("File does not have blocks: " + inode.toString(),
+        blocks != null && blocks.length > 0);
+    
+    int idx = 0;
+    BlockInfo curBlock;
+    // all blocks but the last two should be regular blocks
+    for(; idx < blocks.length - 2; idx++) {
+      curBlock = blocks[idx];
+      assertTrue("Block is not complete: " + curBlock,
+          curBlock.isComplete());
+      assertTrue("Block is not in BlocksMap: " + curBlock,
+          ns.blockManager.getStoredBlock(curBlock) == curBlock);
+    }
+
+    // the penultimate block is either complete or
+    // committed if the file is not closed
+    if(idx > 0) {
+      curBlock = blocks[idx-1]; // penultimate block
+      assertTrue("Block " + curBlock +
+          " isUnderConstruction = " + inode.isUnderConstruction() +
+          " expected to be " + isFileOpen,
+          (isFileOpen && curBlock.isComplete()) ||
+          (!isFileOpen && !curBlock.isComplete() == 
+            (curBlock.getBlockUCState() ==
+              BlockUCState.COMMITTED)));
+      assertTrue("Block is not in BlocksMap: " + curBlock,
+          ns.blockManager.getStoredBlock(curBlock) == curBlock);
+    }
+
+    // the last block is under construction if the file is not closed
+    curBlock = blocks[idx]; // last block
+    assertEquals("Block " + curBlock +
+        " isComplete = " + curBlock.isComplete() +
+        " expected to be " + isFileOpen,
+        isFileOpen, !curBlock.isComplete());
+    assertTrue("Block is not in BlocksMap: " + curBlock,
+        ns.blockManager.getStoredBlock(curBlock) == curBlock);
+  }
+
+  @Test
+  public void testBlockCreation() throws IOException {
+    Path file1 = new Path(BASE_DIR, "file1.dat");
+    FSDataOutputStream out = TestFileCreation.createFile(hdfs, file1, 3);
+
+    for(int idx = 0; idx < NUM_BLOCKS; idx++) {
+      // write one block
+      writeFile(file1, out, BLOCK_SIZE);
+      // verify consistency
+      verifyFileBlocks(file1.toString(), true);
+    }
+
+    // close file
+    out.close();
+    // verify consistency
+    verifyFileBlocks(file1.toString(), false);
+  }
+
+  /**
+   * Test NameNode.getBlockLocations(..) on reading un-closed files.
+   */
+  @Test
+  public void testGetBlockLocations() throws IOException {
+    final NameNode namenode = cluster.getNameNode();
+    final Path p = new Path(BASE_DIR, "file2.dat");
+    final String src = p.toString();
+    final FSDataOutputStream out = TestFileCreation.createFile(hdfs, p, 3);
+
+    // write a half block
+    int len = BLOCK_SIZE >>> 1;
+    writeFile(p, out, len);
+
+    for(int i = 1; i < NUM_BLOCKS; ) {
+      // verify consistency
+      final LocatedBlocks lb = namenode.getBlockLocations(src, 0, len);
+      final List<LocatedBlock> blocks = lb.getLocatedBlocks();
+      assertEquals(i, blocks.size());
+      final Block b = blocks.get(blocks.size() - 1).getBlock();
+      assertTrue(b instanceof BlockInfoUnderConstruction);
+
+      if (++i < NUM_BLOCKS) {
+        // write one more block
+        writeFile(p, out, BLOCK_SIZE);
+        len += BLOCK_SIZE;
+      }
+    }
+    // close file
+    out.close();
+  }
+}

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java Wed Sep 30 23:39:30 2009
@@ -182,7 +182,8 @@
       File baseDir = new File(System.getProperty("test.build.data",
                                                  "build/test/data"),"dfs/data");
       for (int i=0; i<8; i++) {
-        File blockFile = new File(baseDir, "data" +(i+1)+ "/current/" + block);
+        File blockFile = new File(baseDir, "data" +(i+1) + 
+            MiniDFSCluster.FINALIZED_DIR_NAME + block);
         if(blockFile.exists()) {
           assertTrue(blockFile.delete());
         }
@@ -294,8 +295,8 @@
     File baseDir = new File(System.getProperty("test.build.data",
                                                "build/test/data"),"dfs/data");
     for (int i=0; i < 6; i++) {
-      File blockFile = new File(baseDir, "data" + (i+1) + "/current/" +
-                                block);
+      File blockFile = new File(baseDir, "data" + (i+1) + 
+          MiniDFSCluster.FINALIZED_DIR_NAME + block);
       if (blockFile.exists()) {
         RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
         FileChannel channel = raFile.getChannel();

Propchange: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestLargeDirectoryDelete.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java Wed Sep 30 23:39:30 2009
@@ -56,7 +56,8 @@
       DataNodeProperties dnProps = cluster.stopDataNode(0);
       // remove block scanner log to trigger block scanning
       File scanLog = new File(System.getProperty("test.build.data"),
-          "dfs/data/data1/current/dncp_block_verification.log.curr");
+          "dfs/data/data1" + MiniDFSCluster.FINALIZED_DIR_NAME + 
+          "dncp_block_verification.log.curr");
       //wait for one minute for deletion to succeed;
       for(int i=0; !scanLog.delete(); i++) {
         assertTrue("Could not delete log file in one minute", i < 60);

Propchange: hadoop/hdfs/trunk/src/webapps/datanode/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 30 23:39:30 2009
@@ -1,2 +1,4 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/webapps/datanode:713112
 /hadoop/core/trunk/src/webapps/datanode:776175-784663
+/hadoop/hdfs/branches/HDFS-265/src/webapps/datanode:796829-820463
+/hadoop/hdfs/branches/branch-0.21/src/webapps/datanode:820487

Propchange: hadoop/hdfs/trunk/src/webapps/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 30 23:39:30 2009
@@ -1,2 +1,4 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/webapps/hdfs:713112
 /hadoop/core/trunk/src/webapps/hdfs:776175-784663
+/hadoop/hdfs/branches/HDFS-265/src/webapps/hdfs:796829-820463
+/hadoop/hdfs/branches/branch-0.21/src/webapps/hdfs:820487

Propchange: hadoop/hdfs/trunk/src/webapps/secondary/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 30 23:39:30 2009
@@ -1,2 +1,4 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/webapps/secondary:713112
 /hadoop/core/trunk/src/webapps/secondary:776175-784663
+/hadoop/hdfs/branches/HDFS-265/src/webapps/secondary:796829-820463
+/hadoop/hdfs/branches/branch-0.21/src/webapps/secondary:820487



Mime
View raw message