hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r612610 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/io/ src/test/org/apache/hadoop/dfs/
Date Wed, 16 Jan 2008 22:55:58 GMT
Author: dhruba
Date: Wed Jan 16 14:55:56 2008
New Revision: 612610

URL: http://svn.apache.org/viewvc?rev=612610&view=rev
Log:
HADOOP-2012. Periodic data verification on Datanodes.
(Raghu Angadi via dhruba)


Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/IOUtils.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/DFSTestUtil.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSFinalize.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSRollback.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSStartupVersions.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSStorageStateRecovery.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSUpgrade.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDistributedUpgrade.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/UpgradeUtilities.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=612610&r1=612609&r2=612610&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Jan 16 14:55:56 2008
@@ -53,6 +53,9 @@
     "fs.trash.root" parameter is no longer used.  Full source paths
     are also no longer reproduced within the trash.
 
+    HADOOP-2012. Periodic data verification on Datanodes.
+    (Raghu Angadi via dhruba)
+
   NEW FEATURES
 
     HADOOP-1857.  Ability to run a script when a task fails to capture stack

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=612610&r1=612609&r2=612610&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Wed Jan 16 14:55:56
2008
@@ -838,6 +838,24 @@
     int readAll(byte[] buf, int offset, int len) throws IOException {
       return readFully(this, buf, offset, len);
     }
+    
+    /* When the reader reaches end of a block and there are no checksum
+     * errors, we send OP_STATUS_CHECKSUM_OK to datanode to inform that 
+     * checksum was verified and there was no error.
+     */ 
+    void checksumOk(Socket sock) {
+      try {
+        OutputStream out = sock.getOutputStream();
+        byte buf[] = { (OP_STATUS_CHECKSUM_OK >>> 8) & 0xff,
+                       (OP_STATUS_CHECKSUM_OK) & 0xff };
+        out.write(buf);
+        out.flush();
+      } catch (IOException e) {
+        // its ok not to be able to send this.
+        LOG.debug("Could not write to datanode " + sock.getInetAddress() +
+                  ": " + e.getMessage());
+      }
+    }
   }
     
   /****************************************************************
@@ -1135,6 +1153,9 @@
             
             if (result >= 0) {
               pos += result;
+              if ( pos > blockEnd ) {
+                blockReader.checksumOk(s);
+              }
             } else {
               // got a EOS from reader though we expect more data on it.
               throw new IOException("Unexpected EOS from the reader");

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=612610&r1=612609&r2=612610&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Wed Jan 16 14:55:56 2008
@@ -19,6 +19,7 @@
 
 import org.apache.commons.logging.*;
 
+import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.*;
@@ -115,6 +116,9 @@
   private Thread dataNodeThread = null;
   String machineName;
   int defaultBytesPerChecksum = 512;
+  
+  private DataBlockScanner blockScanner;
+  private Daemon blockScannerThread;
 
   // The following three fields are to support balancing
   final static short MAX_BALANCING_THREADS = 5;
@@ -130,7 +134,7 @@
     return System.currentTimeMillis();
   }
 
-  private static class DataNodeMetrics implements Updater {
+  static class DataNodeMetrics implements Updater {
     private final MetricsRecord metricsRecord;
     private int bytesWritten = 0;
     private int bytesRead = 0;
@@ -138,6 +142,8 @@
     private int blocksRead = 0;
     private int blocksReplicated = 0;
     private int blocksRemoved = 0;
+    private int blocksVerified = 0;
+    private int blockVerificationFailures = 0;
       
     DataNodeMetrics(Configuration conf) {
       String sessionId = conf.get("session.id"); 
@@ -162,6 +168,9 @@
         metricsRecord.incrMetric("blocks_written", blocksWritten);
         metricsRecord.incrMetric("blocks_replicated", blocksReplicated);
         metricsRecord.incrMetric("blocks_removed", blocksRemoved);
+        metricsRecord.incrMetric("blocks_verified", blocksVerified);
+        metricsRecord.incrMetric("block_verification_failures", 
+                                                  blockVerificationFailures);        
               
         bytesWritten = 0;
         bytesRead = 0;
@@ -169,6 +178,8 @@
         blocksRead = 0;
         blocksReplicated = 0;
         blocksRemoved = 0;
+        blocksVerified = 0;
+        blockVerificationFailures = 0;
       }
       metricsRecord.update();
     }
@@ -196,6 +207,14 @@
     synchronized void removedBlocks(int nblocks) {
       blocksRemoved += nblocks;
     }
+    
+    synchronized void verifiedBlocks(int nblocks) {
+      blocksVerified += nblocks;
+    }
+    
+    synchronized void verificationFailures(int failures) {
+      blockVerificationFailures += failures;
+    }    
   }
     
   /**
@@ -310,6 +329,21 @@
     LOG.info("Balancing bandwith is "+balanceBandwidth + " bytes/s");
     this.balancingThrottler = new Throttler(balanceBandwidth);
 
+    //initialize periodic block scanner
+    String reason = null;
+    if (conf.getInt("dfs.datanode.scan.period.hours", 0) < 0) {
+      reason = "verification is turned off by configuration";
+    } else if ( !(data instanceof FSDataset) ) {
+      reason = "verifcation is supported only with FSDataset";
+    } 
+    if ( reason == null ) {
+      blockScanner = new DataBlockScanner(this, (FSDataset)data, conf);
+      blockScannerThread = new Daemon(blockScanner);
+    } else {
+      LOG.info("Periodic Block Verification is disabled because " +
+               reason + ".");
+    }
+    
     //create a servlet to serve full-file content
     String infoAddr = conf.get("dfs.datanode.http.bindAddress", "0.0.0.0:50075");
     InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
@@ -317,6 +351,9 @@
     int tmpInfoPort = infoSocAddr.getPort();
     this.infoServer = new StatusHttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort
== 0);
     this.infoServer.addServlet(null, "/streamFile/*", StreamFile.class);
+    this.infoServer.setAttribute("datanode.blockScanner", blockScanner);
+    this.infoServer.addServlet(null, "/blockScannerReport", 
+                               DataBlockScanner.Servlet.class);
     this.infoServer.start();
     // adjust info port
     this.dnRegistration.setInfoPort(this.infoServer.getPort());
@@ -372,6 +409,10 @@
     return nameNodeAddr;
   }
     
+  DataNodeMetrics getMetrics() {
+    return myMetrics;
+  }
+  
   /**
    * Return the namenode's identifier
    */
@@ -473,6 +514,10 @@
     }
     if(upgradeManager != null)
       upgradeManager.shutdownUpgrade();
+    if (blockScannerThread != null) {
+      blockScanner.shutdown();
+      blockScannerThread.interrupt();
+    }
     if (storage != null) {
       try {
         this.storage.unlockAll();
@@ -684,6 +729,9 @@
       //
       Block toDelete[] = ((BlockCommand)cmd).getBlocks();
       try {
+        if (blockScanner != null) {
+          blockScanner.deleteBlocks(toDelete);
+        }
         data.invalidate(toDelete);
       } catch(IOException e) {
         checkDiskError();
@@ -910,7 +958,8 @@
       BlockSender blockSender = null;
       try {
         try {
-          blockSender = new BlockSender(block, startOffset, length, true, true);
+          blockSender = new BlockSender(block, startOffset, length, 
+                                        true, true, false);
         } catch(IOException e) {
           out.writeShort(OP_STATUS_ERROR);
           throw e;
@@ -919,6 +968,17 @@
         out.writeShort(DataNode.OP_STATUS_SUCCESS); // send op status
         long read = blockSender.sendBlock(out, null); // send data
 
+        if (blockSender.isBlockReadFully()) {
+          // See if client verification succeeded. 
+          // This is an optional response from client.
+          try {
+            if (in.readShort() == OP_STATUS_CHECKSUM_OK  && 
+                blockScanner != null) {
+              blockScanner.verifiedByClient(block);
+            }
+          } catch (IOException ignored) {}
+        }
+        
         myMetrics.readBytes((int) read);
         myMetrics.readBlocks(1);
         LOG.info(dnRegistration + "Served block " + block + " to " + s.getInetAddress());
@@ -1013,6 +1073,10 @@
         // notify name node
         notifyNamenodeReceivedBlock(block, EMPTY_DEL_HINT);
 
+        if (blockScanner != null) {
+          blockScanner.addBlock(block);
+        }
+        
         String msg = "Received block " + block + " from " +
                      s.getInetAddress();
 
@@ -1111,7 +1175,7 @@
         balancingSem.acquireUninterruptibly();
         
         // check if the block exists or not
-        blockSender = new BlockSender(block, 0, -1, false, false);
+        blockSender = new BlockSender(block, 0, -1, false, false, false);
 
         // get the output stream to the target
         InetSocketAddress targetAddr = NetUtils.createSocketAddr(target.getName());
@@ -1217,12 +1281,19 @@
     private long curReserve;     // remaining bytes can be sent in the period
     private long bytesAlreadyUsed;
 
-    /** Constructor */
+    /** Constructor 
+     * @param bandwidthPerSec bandwidth allowed in bytes per second. 
+     */
     Throttler(long bandwidthPerSec) {
       this(500, bandwidthPerSec);  // by default throttling period is 500ms 
     }
 
-    /** Constructor */
+    /**
+     * Constructor
+     * @param period in milliseconds. Bandwidth is enforced over this
+     *        period.
+     * @param bandwidthPerSec bandwidth allowed in bytes per second. 
+     */
     Throttler(long period, long bandwidthPerSec) {
       this.curPeriodStart = System.currentTimeMillis();
       this.period = period;
@@ -1230,6 +1301,26 @@
       this.periodExtension = period*3;
     }
 
+    /**
+     * @return current throttle bandwidth in bytes per second.
+     */
+    public synchronized long getBandwidth() {
+      return bytesPerPeriod*1000/period;
+    }
+    
+    /**
+     * Sets throttle bandwidth. This takes affect latest by the end of current
+     * period.
+     * 
+     * @param bytesPerSecond 
+     */
+    public synchronized void setBandwidth(long bytesPerSecond) {
+      if ( bytesPerSecond <= 0 ) {
+        throw new IllegalArgumentException("" + bytesPerSecond);
+      }
+      bytesPerPeriod = bytesPerSecond*period/1000;
+    }
+    
     /** Given the numOfBytes sent/received since last time throttle was called,
      * make the current thread sleep if I/O rate is too fast
      * compared to the given bandwidth
@@ -1269,30 +1360,35 @@
     }
   }
 
-  private class BlockSender implements java.io.Closeable {
+  class BlockSender implements java.io.Closeable {
     private Block block; // the block to read from
     private DataInputStream blockIn; // data strean
     private DataInputStream checksumIn; // checksum datastream
     private DataChecksum checksum; // checksum stream
     private long offset; // starting position to read
     private long endOffset; // ending position
+    private long blockLength;
     private byte buf[]; // buffer to store data read from the block file & crc
     private int bytesPerChecksum; // chunk size
     private int checksumSize; // checksum size
     private boolean corruptChecksumOk; // if need to verify checksum
     private boolean chunkOffsetOK; // if need to send chunk offset
 
+    private boolean blockReadFully; //set when the whole block is read
+    private boolean verifyChecksum; //if true, check is verified while reading
     private Throttler throttler;
     private DataOutputStream out;
 
     BlockSender(Block block, long startOffset, long length,
-        boolean corruptChecksumOk, boolean chunkOffsetOK) throws IOException {
+                boolean corruptChecksumOk, boolean chunkOffsetOK,
+                boolean verifyChecksum) throws IOException {
 
       try {
         this.block = block;
         this.chunkOffsetOK = chunkOffsetOK;
         this.corruptChecksumOk = corruptChecksumOk;
-
+        this.verifyChecksum = verifyChecksum;
+        this.blockLength = data.getLength(block);
 
         if ( !corruptChecksumOk || data.metaFileExists(block) ) {
           checksumIn = new DataInputStream(
@@ -1317,10 +1413,10 @@
         checksumSize = checksum.getChecksumSize();
 
         if (length < 0) {
-          length = data.getLength(block);
+          length = blockLength;
         }
 
-        endOffset = data.getLength(block);
+        endOffset = blockLength;
         if (startOffset < 0 || startOffset > endOffset
             || (length + startOffset) > endOffset) {
           String msg = " Offset " + startOffset + " and length " + length
@@ -1397,10 +1493,18 @@
       if (checksumSize > 0 && checksumIn != null) {
         try {
           checksumIn.readFully(buf, len, checksumSize);
+          
+          if (verifyChecksum) {
+            checksum.reset();
+            checksum.update(buf, 0, len);
+            if (!checksum.compare(buf, len)) {
+              throw new ChecksumException("Checksum failed at " + offset, len);
+            }
+          }
         } catch (IOException e) {
-          LOG.warn(" Could not read checksum for data at offset " + offset
-              + " for block " + block + " got : "
-              + StringUtils.stringifyException(e));
+          LOG.warn(" Could not read or failed to veirfy checksum for data" +
+                   " at offset " + offset + " for block " + block + " got : "
+                   + StringUtils.stringifyException(e));
           IOUtils.closeStream(checksumIn);
           checksumIn = null;
           if (corruptChecksumOk) {
@@ -1437,6 +1541,7 @@
       this.out = out;
       this.throttler = throttler;
 
+      long initialOffset = offset;
       long totalRead = 0;
       try {
         checksum.writeHeader(out);
@@ -1456,8 +1561,14 @@
         close();
       }
 
+      blockReadFully = (initialOffset == 0 && offset >= blockLength);
+
       return totalRead;
     }
+    
+    boolean isBlockReadFully() {
+      return blockReadFully;
+    }
   }
 
   /* A class that receives a block and wites to its own disk, meanwhile
@@ -1684,7 +1795,7 @@
 
         out = new DataOutputStream(new BufferedOutputStream(
             sock.getOutputStream(), BUFFER_SIZE));
-        blockSender = new BlockSender(b, 0, -1, false, false);
+        blockSender = new BlockSender(b, 0, -1, false, false, false);
 
         //
         // Header info
@@ -1726,6 +1837,11 @@
    */
   public void run() {
     LOG.info(dnRegistration + "In DataNode.run, data = " + data);
+
+    // start block scanner
+    if (blockScannerThread != null) {
+      blockScannerThread.start();
+    }
 
     // start dataXceiveServer
     dataXceiveServer.start();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java?rev=612610&r1=612609&r2=612610&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java Wed Jan 16 14:55:56
2008
@@ -19,6 +19,8 @@
 package org.apache.hadoop.dfs;
 
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.channels.FileLock;
@@ -31,6 +33,7 @@
 import org.apache.hadoop.dfs.FSConstants.NodeType;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.fs.FileUtil.HardLink;
+import org.apache.hadoop.io.IOUtils;
 
 /** 
  * Data storage information file.
@@ -41,6 +44,7 @@
   // Constants
   final static String BLOCK_SUBDIR_PREFIX = "subdir";
   final static String BLOCK_FILE_PREFIX = "blk_";
+  final static String COPY_FILE_PREFIX = "dncp_";
   
   private String storageID;
 
@@ -424,7 +428,12 @@
   
   static void linkBlocks(File from, File to) throws IOException {
     if (!from.isDirectory()) {
-      HardLink.createHardLink(from, to);
+      if (from.getName().startsWith(COPY_FILE_PREFIX)) {
+        IOUtils.copyBytes(new FileInputStream(from), 
+                          new FileOutputStream(to), 16*1024, true);
+      } else {
+        HardLink.createHardLink(from, to);
+      }
       return;
     }
     // from is a directory
@@ -433,7 +442,8 @@
     String[] blockNames = from.list(new java.io.FilenameFilter() {
         public boolean accept(File dir, String name) {
           return name.startsWith(BLOCK_SUBDIR_PREFIX) 
-            || name.startsWith(BLOCK_FILE_PREFIX);
+            || name.startsWith(BLOCK_FILE_PREFIX)
+            || name.startsWith(COPY_FILE_PREFIX);
         }
       });
     

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java?rev=612610&r1=612609&r2=612610&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java Wed Jan 16 14:55:56
2008
@@ -31,7 +31,7 @@
  **********************************************************************/
 interface DatanodeProtocol extends VersionedProtocol {
   /*
-   * 10: blockReceived also sends hints for deletion
+   * 11 : reportBadBlocks() is added.
    * 11 Block reports as long[]
    */
   public static final long versionID = 11L;
@@ -141,4 +141,9 @@
    */
   public BlockCrcInfo blockCrcUpgradeGetBlockLocations(Block block)
                                                       throws IOException;  
+
+  /**
+   * same as {@link ClientProtocol#reportBadBlocks(LocatedBlock[] blocks)}
+   */
+  public void reportBadBlocks(LocatedBlock[] blocks) throws IOException;
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?rev=612610&r1=612609&r2=612610&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Wed Jan 16 14:55:56
2008
@@ -93,6 +93,7 @@
   public static final int OP_STATUS_ERROR_CHECKSUM = 2;  
   public static final int OP_STATUS_ERROR_INVALID = 3;  
   public static final int OP_STATUS_ERROR_EXISTS = 4;  
+  public static final int OP_STATUS_CHECKSUM_OK = 5;  
 
   
   /** Version for data transfers between clients and datanodes

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java?rev=612610&r1=612609&r2=612610&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java Wed Jan 16 14:55:56
2008
@@ -317,6 +317,10 @@
       return usage.getMount();
     }
       
+    File getDir() {
+      return dataDir.dir;
+    }
+    
     File createTmpFile(Block b) throws IOException {
       File f = new File(tmpDir, b.getBlockName());
       try {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/IOUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/IOUtils.java?rev=612610&r1=612609&r2=612610&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/IOUtils.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/IOUtils.java Wed Jan 16 14:55:56 2008
@@ -147,4 +147,14 @@
       }
     }
   }
+  
+  /** /dev/null of OutputStreams.
+   */
+  public static class NullOutputStream extends OutputStream {
+    public void write(byte[] b, int off, int len) throws IOException {
+    }
+
+    public void write(int b) throws IOException {
+    }
+  }  
 }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/DFSTestUtil.java?rev=612610&r1=612609&r2=612610&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/DFSTestUtil.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/DFSTestUtil.java Wed Jan 16 14:55:56
2008
@@ -23,6 +23,7 @@
 import java.util.Random;
 import junit.framework.TestCase;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.DFSClient.DFSDataInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -212,4 +213,11 @@
     fs.delete(root);
     files = null;
   }
+  
+  static Block getFirstBlock(FileSystem fs, Path path) throws IOException {
+    DFSDataInputStream in = 
+      (DFSDataInputStream) ((DistributedFileSystem)fs).open(path);
+    in.readByte();
+    return in.getCurrentBlock();
+  }  
 }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSFinalize.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSFinalize.java?rev=612610&r1=612609&r2=612610&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSFinalize.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSFinalize.java Wed Jan 16 14:55:56
2008
@@ -83,7 +83,14 @@
     UpgradeUtilities.initialize();
     
     for (int numDirs = 1; numDirs <= 2; numDirs++) {
-      conf = UpgradeUtilities.initializeStorageStateConf(numDirs);
+      /* This test requires that "current" directory not change after
+       * the upgrade. Actually it is ok for those contents to change.
+       * For now disabling block verification so that the contents are 
+       * not changed.
+       */
+      conf = new Configuration();
+      conf.setInt("dfs.datanode.scan.period.hours", -1);
+      conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
       String[] nameNodeDirs = conf.getStrings("dfs.name.dir");
       String[] dataNodeDirs = conf.getStrings("dfs.data.dir");
       

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSRollback.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSRollback.java?rev=612610&r1=612609&r2=612610&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSRollback.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSRollback.java Wed Jan 16 14:55:56
2008
@@ -117,7 +117,9 @@
     UpgradeUtilities.initialize();
     
     for (int numDirs = 1; numDirs <= 2; numDirs++) {
-      conf = UpgradeUtilities.initializeStorageStateConf(numDirs);
+      conf = new Configuration();
+      conf.setInt("dfs.datanode.scan.period.hours", -1);      
+      conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
       String[] nameNodeDirs = conf.getStrings("dfs.name.dir");
       String[] dataNodeDirs = conf.getStrings("dfs.data.dir");
       

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSStartupVersions.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSStartupVersions.java?rev=612610&r1=612609&r2=612610&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSStartupVersions.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSStartupVersions.java Wed Jan
16 14:55:56 2008
@@ -165,7 +165,8 @@
    */
   public void testVersions() throws Exception {
     UpgradeUtilities.initialize();
-    Configuration conf = UpgradeUtilities.initializeStorageStateConf(1);
+    Configuration conf = UpgradeUtilities.initializeStorageStateConf(1, 
+                                                      new Configuration());
     StorageInfo[] versions = initializeVersions();
     UpgradeUtilities.createStorageDirs(
                                        NAME_NODE, conf.getStrings("dfs.name.dir"), "current");

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSStorageStateRecovery.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSStorageStateRecovery.java?rev=612610&r1=612609&r2=612610&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSStorageStateRecovery.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSStorageStateRecovery.java Wed
Jan 16 14:55:56 2008
@@ -27,7 +27,6 @@
 import static org.apache.hadoop.dfs.FSConstants.NodeType.NAME_NODE;
 import static org.apache.hadoop.dfs.FSConstants.NodeType.DATA_NODE;
 import org.apache.hadoop.dfs.FSConstants.StartupOption;
-import org.apache.hadoop.fs.Path;
 
 /**
 * This test ensures the appropriate response (successful or failure) from
@@ -179,7 +178,9 @@
     UpgradeUtilities.initialize();
 
     for (int numDirs = 1; numDirs <= 2; numDirs++) {
-      conf = UpgradeUtilities.initializeStorageStateConf(numDirs);
+      conf = new Configuration();
+      conf.setInt("dfs.datanode.scan.period.hours", -1);      
+      conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
       for (int i = 0; i < testCases.length; i++) {
         boolean[] testCase = testCases[i];
         boolean shouldRecover = testCase[4];

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSUpgrade.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSUpgrade.java?rev=612610&r1=612609&r2=612610&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSUpgrade.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSUpgrade.java Wed Jan 16 14:55:56
2008
@@ -123,7 +123,9 @@
     UpgradeUtilities.initialize();
     
     for (int numDirs = 1; numDirs <= 2; numDirs++) {
-      conf = UpgradeUtilities.initializeStorageStateConf(numDirs);
+      conf = new Configuration();
+      conf.setInt("dfs.datanode.scan.period.hours", -1);      
+      conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
       String[] nameNodeDirs = conf.getStrings("dfs.name.dir");
       String[] dataNodeDirs = conf.getStrings("dfs.data.dir");
       

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java?rev=612610&r1=612609&r2=612610&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java Wed Jan
16 14:55:56 2008
@@ -114,13 +114,6 @@
     in.readFully(arr);
   }
   
-  Block getFirstBlock(FileSystem fs, Path path) throws IOException {
-    DFSDataInputStream in = 
-      (DFSDataInputStream) ((DistributedFileSystem)fs).open(path);
-    in.readByte();
-    return in.getCurrentBlock();
-  }
-  
   public void testDataTransferProtocol() throws IOException {
     Random random = new Random();
     int oneMil = 1024*1024;
@@ -143,7 +136,7 @@
     createFile(fileSys, file, fileLen);
 
     // get the first blockid for the file
-    Block firstBlock = getFirstBlock(fileSys, file);
+    Block firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
     long newBlockId = firstBlock.getBlockId() + 1;
 
     recvByteBuf.position(1);

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDistributedUpgrade.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDistributedUpgrade.java?rev=612610&r1=612609&r2=612610&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDistributedUpgrade.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDistributedUpgrade.java Wed Jan
16 14:55:56 2008
@@ -88,7 +88,8 @@
     UpgradeObjectCollection.registerUpgrade(new UpgradeObject_Test_Datanode());
     UpgradeObjectCollection.registerUpgrade(new UpgradeObject_Test_Namenode());
 
-    conf = UpgradeUtilities.initializeStorageStateConf(numDirs);
+    conf = UpgradeUtilities.initializeStorageStateConf(numDirs, 
+                                                       new Configuration());
     String[] nameNodeDirs = conf.getStrings("dfs.name.dir");
     String[] dataNodeDirs = conf.getStrings("dfs.data.dir");
     DFSAdmin dfsAdmin = new DFSAdmin();

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/UpgradeUtilities.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/UpgradeUtilities.java?rev=612610&r1=612609&r2=612610&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/UpgradeUtilities.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/UpgradeUtilities.java Wed Jan 16 14:55:56
2008
@@ -139,7 +139,8 @@
    * Initialize dfs.name.dir and dfs.data.dir with the specified number of
    * directory entries. Also initialize dfs.blockreport.intervalMsec.
    */
-  public static Configuration initializeStorageStateConf(int numDirs) {
+  public static Configuration initializeStorageStateConf(int numDirs,
+                                                         Configuration conf) {
     StringBuffer nameNodeDirs =
       new StringBuffer(new File(TEST_ROOT_DIR, "name1").toString());
     StringBuffer dataNodeDirs =
@@ -148,7 +149,9 @@
       nameNodeDirs.append("," + new File(TEST_ROOT_DIR, "name"+i));
       dataNodeDirs.append("," + new File(TEST_ROOT_DIR, "data"+i));
     }
-    Configuration conf = new Configuration();
+    if (conf == null) {
+      conf = new Configuration();
+    }
     conf.set("dfs.name.dir", nameNodeDirs.toString());
     conf.set("dfs.data.dir", dataNodeDirs.toString());
     conf.setInt("dfs.blockreport.intervalMsec", 10000);



Mime
View raw message