hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r662513 [2/2] - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/
Date Mon, 02 Jun 2008 18:43:02 GMT
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java Mon Jun  2 11:43:01
2008
@@ -17,25 +17,31 @@
  */
 package org.apache.hadoop.dfs;
 
-import junit.framework.TestCase;
 import java.io.*;
-import java.net.*;
+import java.net.InetSocketAddress;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.dfs.FSConstants.DatanodeReportType;
-import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.IOUtils;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
 
 
 /**
  * This class tests that a file need not be closed before its
  * data can be read by another client.
  */
-public class TestFileCreation extends TestCase {
+public class TestFileCreation extends junit.framework.TestCase {
+  static final String DIR = "/" + TestFileCreation.class.getSimpleName() + "/";
+
+  {
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+  }
+
   static final long seed = 0xDEADBEEFL;
   static final int blockSize = 8192;
   static final int numBlocks = 2;
@@ -46,11 +52,10 @@
   // entire file is written, the first two blocks definitely get flushed to
   // the datanodes.
 
-  //
   // creates a file but does not close it
-  //
-  private FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
+  static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
     throws IOException {
+    System.out.println("createFile: Created " + name + " with " + repl + " replica.");
     FSDataOutputStream stm = fileSys.create(name, true,
                                             fileSys.getConf().getInt("io.file.buffer.size",
4096),
                                             (short)repl, (long)blockSize);
@@ -151,7 +156,6 @@
       //
       Path path = new Path("/");
       System.out.println("Path : \"" + path.toString() + "\"");
-      System.out.println(fs.isDirectory(path));
       System.out.println(fs.getFileStatus(path).isDir()); 
       assertTrue("/ should be a directory", 
                  fs.getFileStatus(path).isDir() == true);
@@ -160,8 +164,6 @@
       //
       Path file1 = new Path("filestatus.dat");
       FSDataOutputStream stm = createFile(fs, file1, 1);
-      System.out.println("Created file filestatus.dat with one "
-                         + " replicas.");
 
       // verify that file exists in FS namespace
       assertTrue(file1 + " should be a file", 
@@ -230,6 +232,7 @@
       writeFile(stm1);
       writeFile(stm3);
       stm1.close();
+      stm2.close();
       stm3.close();
 
       // set delete on exit flag on files.
@@ -257,12 +260,8 @@
       System.out.println("DeleteOnExit successful.");
 
     } finally {
-      if (fs != null) {
-        fs.close();
-      }
-      if (localfs != null) {
-        localfs.close();
-      }
+      IOUtils.closeStream(fs);
+      IOUtils.closeStream(localfs);
       cluster.shutdown();
     }
   }
@@ -291,9 +290,6 @@
       //
       Path file1 = new Path("/filestatus.dat");
       FSDataOutputStream stm = createFile(fs, file1, 1);
-      System.out.println("testFileCreationError1: "
-                         + "Created file filestatus.dat with one "
-                         + " replicas.");
 
       // verify that file exists in FS namespace
       assertTrue(file1 + " should be a file", 
@@ -305,7 +301,8 @@
 
       // wait for the datanode to be declared dead
       while (true) {
-        DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
+        DatanodeInfo[] info = client.datanodeReport(
+            FSConstants.DatanodeReportType.LIVE);
         if (info.length == 0) {
           break;
         }
@@ -358,36 +355,35 @@
     }
     // create cluster
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
-    FileSystem fs = cluster.getFileSystem();
-    cluster.waitActive();
-    InetSocketAddress addr = new InetSocketAddress("localhost",
-                                                   cluster.getNameNodePort());
-    DFSClient client = new DFSClient(addr, conf);
-
+    DistributedFileSystem dfs = null;
     try {
+      cluster.waitActive();
+      dfs = (DistributedFileSystem)cluster.getFileSystem();
+      DFSClient client = dfs.dfs;
 
       // create a new file.
       //
       Path file1 = new Path("/filestatus.dat");
-      createFile(fs, file1, 1);
+      createFile(dfs, file1, 1);
       System.out.println("testFileCreationError2: "
-                         + "Created file filestatus.dat with one "
-                         + " replicas.");
+                         + "Created file filestatus.dat with one replicas.");
 
       LocatedBlocks locations = client.namenode.getBlockLocations(
                                   file1.toString(), 0, Long.MAX_VALUE);
-      System.out.println("The file has " + locations.locatedBlockCount() +
-                         " blocks.");
+      System.out.println("testFileCreationError2: "
+          + "The file has " + locations.locatedBlockCount() + " blocks.");
 
       // add another block to the file
       LocatedBlock location = client.namenode.addBlock(file1.toString(), 
-                                                       null);
-      System.out.println("Added block " + location.getBlock());
+          client.clientName);
+      System.out.println("testFileCreationError2: "
+          + "Added block " + location.getBlock());
 
       locations = client.namenode.getBlockLocations(file1.toString(), 
                                                     0, Long.MAX_VALUE);
-      System.out.println("The file now has " + locations.locatedBlockCount() +
-                         " blocks.");
+      int count = locations.locatedBlockCount();
+      System.out.println("testFileCreationError2: "
+          + "The file now has " + count + " blocks.");
       
       // set the soft and hard limit to be 1 second so that the
       // namenode triggers lease recovery
@@ -399,20 +395,16 @@
       } catch (InterruptedException e) {
       }
 
-      // verify that the last block was cleaned up.
+      // verify that the last block was synchronized.
       locations = client.namenode.getBlockLocations(file1.toString(), 
                                                     0, Long.MAX_VALUE);
-      System.out.println("locations = " + locations.locatedBlockCount());
-      assertTrue("Error blocks were not cleaned up",
-                 locations.locatedBlockCount() == 0);
+      System.out.println("testFileCreationError2: "
+          + "locations = " + locations.locatedBlockCount());
+      assertEquals(0, locations.locatedBlockCount());
       System.out.println("testFileCreationError2 successful");
     } finally {
-      try {
-        fs.close();
-      } catch (Exception e) {
-      }
+      IOUtils.closeStream(dfs);
       cluster.shutdown();
-      client.close();
     }
   }
 
@@ -421,7 +413,7 @@
    * This test is currently not triggered because more HDFS work is 
    * is needed to handle persistent leases.
    */
-  public void testFileCreationNamenodeRestart() throws IOException {
+  public void xxxtestFileCreationNamenodeRestart() throws IOException {
     Configuration conf = new Configuration();
     final int MAX_IDLE_TIME = 2000; // 2s
     conf.setInt("ipc.client.connection.maxidletime", MAX_IDLE_TIME);
@@ -430,18 +422,16 @@
     if (simulatedStorage) {
       conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
     }
+
     // create cluster
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
-    FileSystem fs = cluster.getFileSystem();
-    cluster.waitActive();
-    int nnport = cluster.getNameNodePort();
-    InetSocketAddress addr = new InetSocketAddress("localhost", nnport);
-
-    DFSClient client = null;
+    FileSystem fs = null;
     try {
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      final int nnport = cluster.getNameNodePort();
 
       // create a new file.
-      //
       Path file1 = new Path("/filestatus.dat");
       FSDataOutputStream stm = createFile(fs, file1, 1);
       System.out.println("testFileCreationNamenodeRestart: "
@@ -510,6 +500,7 @@
       cluster = new MiniDFSCluster(nnport, conf, 1, false, true, 
                                    null, null, null);
       cluster.waitActive();
+      fs = cluster.getFileSystem();
 
       // instruct the dfsclient to use a new filename when it requests
       // new blocks for files that were renamed.
@@ -534,7 +525,7 @@
       stm4.close();
 
       // verify that new block is associated with this file
-      client = new DFSClient(addr, conf);
+      DFSClient client = ((DistributedFileSystem)fs).dfs;
       LocatedBlocks locations = client.namenode.getBlockLocations(
                                   file1.toString(), 0, Long.MAX_VALUE);
       System.out.println("locations = " + locations.locatedBlockCount());
@@ -548,9 +539,8 @@
       assertTrue("Error blocks were not cleaned up for file " + file2,
                  locations.locatedBlockCount() == 1);
     } finally {
-      fs.close();
+      IOUtils.closeStream(fs);
       cluster.shutdown();
-      if (client != null)  client.close();
     }
   }
 
@@ -609,4 +599,97 @@
     simulatedStorage = false;
   }
 
+  /**
+   * Test creating two files at the same time. 
+   */
+  public void testConcurrentFileCreation() throws IOException {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+
+    try {
+      FileSystem fs = cluster.getFileSystem();
+      
+      Path[] p = {new Path("/foo"), new Path("/bar")};
+      
+      //write 2 files at the same time
+      FSDataOutputStream[] out = {fs.create(p[0]), fs.create(p[1])};
+      int i = 0;
+      for(; i < 100; i++) {
+        out[0].write(i);
+        out[1].write(i);
+      }
+      out[0].close();
+      for(; i < 200; i++) {out[1].write(i);}
+      out[1].close();
+
+      //verify
+      FSDataInputStream[] in = {fs.open(p[0]), fs.open(p[1])};  
+      for(i = 0; i < 100; i++) {assertEquals(i, in[0].read());}
+      for(i = 0; i < 200; i++) {assertEquals(i, in[1].read());}
+    } finally {
+      if (cluster != null) {cluster.shutdown();}
+    }
+  }
+
+  /**
+   * Create a file, write something, fsync but not close.
+   * Then change lease period and wait for lease recovery.
+   * Finally, read the block directly from each Datanode and verify the content.
+   */
+  public void testLeaseExpireHardLimit() throws Exception {
+    System.out.println("testLeaseExpireHardLimit start");
+    final long leasePeriod = 1000;
+    final int DATANODE_NUM = 3;
+
+    Configuration conf = new Configuration();
+    conf.setInt("heartbeat.recheck.interval", 1000);
+    conf.setInt("dfs.heartbeat.interval", 1);
+
+    // create cluster
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, DATANODE_NUM, true, null);
+    DistributedFileSystem dfs = null;
+    try {
+      cluster.waitActive();
+      dfs = (DistributedFileSystem)cluster.getFileSystem();
+
+      // create a new file.
+      final String f = DIR + "foo";
+      final Path fpath = new Path(f);
+      FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, DATANODE_NUM);
+      out.write("something".getBytes());
+      ((DFSClient.DFSOutputStream)out.getWrappedStream()).fsync();
+
+      // set the soft and hard limit to be 1 second so that the
+      // namenode triggers lease recovery
+      cluster.setLeasePeriod(leasePeriod, leasePeriod);
+      // wait for the lease to expire
+      try {Thread.sleep(5 * leasePeriod);} catch (InterruptedException e) {}
+
+      LocatedBlocks locations = dfs.dfs.namenode.getBlockLocations(
+          f, 0, Long.MAX_VALUE);
+      assertEquals(1, locations.locatedBlockCount());
+      LocatedBlock locatedblock = locations.getLocatedBlocks().get(0);
+      int successcount = 0;
+      for(DatanodeInfo datanodeinfo: locatedblock.getLocations()) {
+        DataNode datanode = cluster.getDataNode(datanodeinfo.ipcPort);
+        FSDataset dataset = (FSDataset)datanode.data;
+        Block b = dataset.getStoredBlock(locatedblock.getBlock().blkid);
+        File blockfile = dataset.findBlockFile(b);
+        System.out.println("blockfile=" + blockfile);
+        if (blockfile != null) {
+          BufferedReader in = new BufferedReader(new FileReader(blockfile));
+          assertEquals("something", in.readLine());
+          in.close();
+          successcount++;
+        }
+      }
+      System.out.println("successcount=" + successcount);
+      assertTrue(successcount > 0); 
+    } finally {
+      IOUtils.closeStream(dfs);
+      cluster.shutdown();
+    }
+
+    System.out.println("testLeaseExpireHardLimit successful");
+  }
 }

Added: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreationNamenodeRestart.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreationNamenodeRestart.java?rev=662513&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreationNamenodeRestart.java
(added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreationNamenodeRestart.java
Mon Jun  2 11:43:01 2008
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+public class TestFileCreationNamenodeRestart extends junit.framework.TestCase {
+  public void testFileCreationNamenodeRestart() throws Exception {
+    new TestFileCreation().xxxtestFileCreationNamenodeRestart();
+  }
+}
\ No newline at end of file

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestGetBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestGetBlocks.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestGetBlocks.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestGetBlocks.java Mon Jun  2 11:43:01
2008
@@ -19,8 +19,7 @@
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.Random;
+import java.util.*;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.dfs.BlocksWithLocations.BlockWithLocations;
@@ -140,6 +139,27 @@
     assertTrue(getException);
   }
  
+  public void testGenerationStampWildCard() {
+    Map<Block, Long> map = new HashMap<Block, Long>();
+    final Random RAN = new Random();
+    final long seed = RAN.nextLong();
+    System.out.println("seed=" +  seed);
+    RAN.setSeed(seed);
+
+    long[] blkids = new long[10]; 
+    for(int i = 0; i < blkids.length; i++) {
+      blkids[i] = 1000L + RAN.nextInt(100000);
+      map.put(new Block(blkids[i], 0, blkids[i]), blkids[i]);
+    }
+    System.out.println("map=" + map.toString().replace(",", "\n  "));
+    
+    for(int i = 0; i < blkids.length; i++) {
+      Block b = new Block(blkids[i], 0, GenerationStamp.WILDCARD_STAMP);
+      Long v = map.get(b);
+      System.out.println(b + " => " + v);
+      assertEquals(blkids[i], v.longValue());
+    }
+  }
 
   /**
    * @param args

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java Mon Jun
 2 11:43:01 2008
@@ -21,13 +21,40 @@
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
 
 /**
  * This tests InterDataNodeProtocol for block handling. 
  */
 public class TestInterDatanodeProtocol extends junit.framework.TestCase {
-  public void testGetBlockMetaDataInfo() throws IOException {
+  static void checkMetaInfo(Block b, InterDatanodeProtocol idp,
+      DataBlockScanner scanner) throws IOException {
+    BlockMetaDataInfo metainfo = idp.getBlockMetaDataInfo(b);
+    assertEquals(b.getBlockId(), metainfo.getBlockId());
+    assertEquals(b.getNumBytes(), metainfo.getNumBytes());
+    if (scanner != null) {
+      assertEquals(scanner.getLastScanTime(b),
+          metainfo.getLastScanTime());
+    }
+  }
+
+  static LocatedBlock getLastLocatedBlock(ClientProtocol namenode, String src
+      ) throws IOException {
+    //get block info for the last block
+    LocatedBlocks locations = namenode.getBlockLocations(src, 0, Long.MAX_VALUE);
+    List<LocatedBlock> blocks = locations.getLocatedBlocks();
+    DataNode.LOG.info("blocks.size()=" + blocks.size());
+    assertTrue(blocks.size() > 0);
+
+    return blocks.get(blocks.size() - 1);
+  }
+
+  /**
+   * The following test first creates a file.
+   * It verifies the block information from a datanode.
+   * Then, it updates the block with new information and verifies again. 
+   */
+  public void testBlockMetaDataInfo() throws Exception {
     Configuration conf = new Configuration();
     MiniDFSCluster cluster = null;
 
@@ -37,18 +64,13 @@
 
       //create a file
       DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
-      String filepath = "/foo";
-      DFSTestUtil.createFile(dfs, new Path(filepath), 1024L, (short)3, 0L);
-      assertTrue(dfs.dfs.exists(filepath));
+      String filestr = "/foo";
+      Path filepath = new Path(filestr);
+      DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L);
+      assertTrue(dfs.dfs.exists(filestr));
 
       //get block info
-      ClientProtocol namenode = dfs.dfs.namenode;
-      LocatedBlocks locations = namenode.getBlockLocations(
-          filepath, 0, Long.MAX_VALUE);
-      List<LocatedBlock> blocks = locations.getLocatedBlocks();
-      assertTrue(blocks.size() > 0);
-
-      LocatedBlock locatedblock = blocks.get(0);
+      LocatedBlock locatedblock = getLastLocatedBlock(dfs.dfs.namenode, filestr);
       DatanodeInfo[] datanodeinfo = locatedblock.getLocations();
       assertTrue(datanodeinfo.length > 0);
 
@@ -64,15 +86,13 @@
       //verify BlockMetaDataInfo
       Block b = locatedblock.getBlock();
       InterDatanodeProtocol.LOG.info("b=" + b + ", " + b.getClass());
-      BlockMetaDataInfo metainfo = idp.getBlockMetaDataInfo(b);
-      assertEquals(b.getBlockId(), metainfo.getBlockId());
-      assertEquals(b.getNumBytes(), metainfo.getNumBytes());
-      assertEquals(datanode.blockScanner.getLastScanTime(b),
-          metainfo.getLastScanTime());
+      checkMetaInfo(b, idp, datanode.blockScanner);
 
-      //TODO: verify GenerationStamp
-      InterDatanodeProtocol.LOG.info("idp.updateGenerationStamp="
-          + idp.updateGenerationStamp(b, new GenerationStamp(456789L)));
+      //verify updateBlock
+      Block newblock = new Block(
+          b.getBlockId(), b.getNumBytes()/2, b.getGenerationStamp()+1);
+      idp.updateBlock(b, newblock);
+      checkMetaInfo(newblock, idp, datanode.blockScanner);
     }
     finally {
       if (cluster != null) {cluster.shutdown();}

Added: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestLeaseRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestLeaseRecovery.java?rev=662513&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestLeaseRecovery.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestLeaseRecovery.java Mon Jun  2 11:43:01
2008
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+
+public class TestLeaseRecovery extends junit.framework.TestCase {
+  static final int BLOCK_SIZE = 1024;
+  static final short REPLICATION_NUM = (short)3;
+  static final Random RANDOM = new Random();
+
+  static void checkMetaInfo(Block b, InterDatanodeProtocol idp
+      ) throws IOException {
+    TestInterDatanodeProtocol.checkMetaInfo(b, idp, null);
+  }
+  
+  static int min(Integer... x) {
+    int m = x[0];
+    for(int i = 1; i < x.length; i++) {
+      if (x[i] < m) {
+        m = x[i];
+      }
+    }
+    return m;
+  }
+
+  /**
+   * The following test first creates a file with a few blocks.
+   * It randomly truncates the replica of the last block stored in each datanode.
+   * Finally, it triggers block synchronization to synchronize all stored block.
+   */
+  public void testBlockSynchronization() throws Exception {
+    final int ORG_FILE_SIZE = 3000; 
+    Configuration conf = new Configuration();
+    conf.setLong("dfs.block.size", BLOCK_SIZE);
+    MiniDFSCluster cluster = null;
+
+    try {
+      cluster = new MiniDFSCluster(conf, 5, true, null);
+      cluster.waitActive();
+
+      //create a file
+      DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
+      String filestr = "/foo";
+      Path filepath = new Path(filestr);
+      DFSTestUtil.createFile(dfs, filepath, ORG_FILE_SIZE, REPLICATION_NUM, 0L);
+      assertTrue(dfs.dfs.exists(filestr));
+
+      //get block info for the last block
+      LocatedBlock locatedblock = TestInterDatanodeProtocol.getLastLocatedBlock(
+          dfs.dfs.namenode, filestr);
+      DatanodeInfo[] datanodeinfos = locatedblock.getLocations();
+      assertEquals(REPLICATION_NUM, datanodeinfos.length);
+
+      //connect to data nodes
+      InterDatanodeProtocol[] idps = new InterDatanodeProtocol[REPLICATION_NUM];
+      DataNode[] datanodes = new DataNode[REPLICATION_NUM];
+      for(int i = 0; i < REPLICATION_NUM; i++) {
+        idps[i] = DataNode.createInterDataNodeProtocolProxy(datanodeinfos[i], conf);
+        datanodes[i] = cluster.getDataNode(datanodeinfos[i].getIpcPort());
+        assertTrue(datanodes[i] != null);
+      }
+      
+      //verify BlockMetaDataInfo
+      Block lastblock = locatedblock.getBlock();
+      DataNode.LOG.info("newblocks=" + lastblock);
+      for(int i = 0; i < REPLICATION_NUM; i++) {
+        checkMetaInfo(lastblock, idps[i]);
+      }
+
+      //setup random block sizes 
+      int lastblocksize = ORG_FILE_SIZE % BLOCK_SIZE;
+      Integer[] newblocksizes = new Integer[REPLICATION_NUM];
+      for(int i = 0; i < REPLICATION_NUM; i++) {
+        newblocksizes[i] = RANDOM.nextInt(lastblocksize);
+      }
+      DataNode.LOG.info("newblocksizes = " + Arrays.asList(newblocksizes)); 
+
+      //update blocks with random block sizes
+      Block[] newblocks = new Block[REPLICATION_NUM];
+      for(int i = 0; i < REPLICATION_NUM; i++) {
+        newblocks[i] = new Block(lastblock.getBlockId(), newblocksizes[i],
+            lastblock.getGenerationStamp());
+        idps[i].updateBlock(lastblock, newblocks[i]);
+        checkMetaInfo(newblocks[i], idps[i]);
+      }
+
+      DataNode.LOG.info("dfs.dfs.clientName=" + dfs.dfs.clientName);
+      cluster.getNameNode().append(filestr, dfs.dfs.clientName);
+
+      //block synchronization
+      final int primarydatanodeindex = RANDOM.nextInt(datanodes.length);
+      DataNode.LOG.info("primarydatanodeindex  =" + primarydatanodeindex);
+      DataNode primary = datanodes[primarydatanodeindex];
+      DataNode.LOG.info("primary.dnRegistration=" + primary.dnRegistration);
+      primary.recoverBlocks(new Block[]{lastblock}, new DatanodeInfo[][]{datanodeinfos}).join();
+
+      BlockMetaDataInfo[] updatedmetainfo = new BlockMetaDataInfo[REPLICATION_NUM];
+      int minsize = min(newblocksizes);
+      long currentGS = cluster.getNameNode().namesystem.getGenerationStamp();
+      lastblock.generationStamp = currentGS;
+      for(int i = 0; i < REPLICATION_NUM; i++) {
+        updatedmetainfo[i] = idps[i].getBlockMetaDataInfo(lastblock);
+        assertEquals(lastblock.getBlockId(), updatedmetainfo[i].getBlockId());
+        assertEquals(minsize, updatedmetainfo[i].getNumBytes());
+        assertEquals(currentGS, updatedmetainfo[i].getGenerationStamp());
+      }
+    }
+    finally {
+      if (cluster != null) {cluster.shutdown();}
+    }
+  }
+}



Mime
View raw message