hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1158072 [3/7] - in /hadoop/common/branches/HDFS-1623/hdfs: ./ ivy/ src/c++/libhdfs/ src/contrib/ src/contrib/fuse-dfs/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/ser...
Date Tue, 16 Aug 2011 00:37:25 GMT
Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Aug 16 00:37:15 2011
@@ -169,7 +169,7 @@ import org.mortbay.util.ajax.JSON;
  **********************************************************/
 @InterfaceAudience.Private
 public class DataNode extends Configured 
-    implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants,
+    implements InterDatanodeProtocol, ClientDatanodeProtocol,
     DataNodeMXBean {
   public static final Log LOG = LogFactory.getLog(DataNode.class);
   
@@ -348,7 +348,7 @@ public class DataNode extends Configured
   ThreadGroup threadGroup = null;
   long blockReportInterval;
   boolean resetBlockReportTime = true;
-  long initialBlockReportDelay = BLOCKREPORT_INITIAL_DELAY * 1000L;
+  long initialBlockReportDelay = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT * 1000L;
   long heartBeatInterval;
   private boolean heartbeatsDisabledForTests = false;
   private DataStorage storage = null;
@@ -440,21 +440,23 @@ public class DataNode extends Configured
                                           HdfsConstants.WRITE_TIMEOUT);
     /* Based on results on different platforms, we might need set the default 
      * to false on some of them. */
-    this.transferToAllowed = conf.getBoolean("dfs.datanode.transferTo.allowed", 
-                                             true);
+    this.transferToAllowed = conf.getBoolean(
+        DFS_DATANODE_TRANSFERTO_ALLOWED_KEY,
+        DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT);
     this.writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 
                                        DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
-
-    this.blockReportInterval =
-      conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, BLOCKREPORT_INTERVAL);
-    this.initialBlockReportDelay = conf.getLong(DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
-                                            BLOCKREPORT_INITIAL_DELAY)* 1000L; 
+    this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
+        DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
+    this.initialBlockReportDelay = conf.getLong(
+        DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
+        DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT) * 1000L;
     if (this.initialBlockReportDelay >= blockReportInterval) {
       this.initialBlockReportDelay = 0;
       LOG.info("dfs.blockreport.initialDelay is greater than " +
         "dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");
     }
-    this.heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL) * 1000L;
+    this.heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
+        DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L;
 
     // do we need to sync block file contents to disk when blockfile is closed?
     this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, 
@@ -617,7 +619,7 @@ public class DataNode extends Configured
     } else {
       ss = secureResources.getStreamingSocket();
     }
-    ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE); 
+    ss.setReceiveBufferSize(FSConstants.DEFAULT_DATA_SOCKET_SIZE); 
     // adjust machine name with the actual port
     int tmpPort = ss.getLocalPort();
     selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
@@ -1967,8 +1969,8 @@ public class DataNode extends Configured
         long writeTimeout = socketWriteTimeout + 
                             HdfsConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
         OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout);
-        out = new DataOutputStream(new BufferedOutputStream(baseStream, 
-                                                            SMALL_BUFFER_SIZE));
+        out = new DataOutputStream(new BufferedOutputStream(baseStream,
+            FSConstants.SMALL_BUFFER_SIZE));
         blockSender = new BlockSender(b, 0, b.getNumBytes(), 
             false, false, false, DataNode.this);
         DatanodeInfo srcNode = new DatanodeInfo(bpReg);

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue Aug 16 00:37:15 2011
@@ -69,7 +69,7 @@ import com.google.protobuf.ByteString;
 /**
  * Thread for processing incoming/outgoing data stream.
  */
-class DataXceiver extends Receiver implements Runnable, FSConstants {
+class DataXceiver extends Receiver implements Runnable {
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
@@ -202,8 +202,8 @@ class DataXceiver extends Receiver imple
       final long length) throws IOException {
     OutputStream baseStream = NetUtils.getOutputStream(s, 
         datanode.socketWriteTimeout);
-    DataOutputStream out = new DataOutputStream(
-                 new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
+    DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
+        baseStream, FSConstants.SMALL_BUFFER_SIZE));
     checkAccess(out, true, block, blockToken,
         Op.READ_BLOCK, BlockTokenSecretManager.AccessMode.READ);
   
@@ -329,7 +329,7 @@ class DataXceiver extends Receiver imple
     final DataOutputStream replyOut = new DataOutputStream(
         new BufferedOutputStream(
             NetUtils.getOutputStream(s, datanode.socketWriteTimeout),
-            SMALL_BUFFER_SIZE));
+            FSConstants.SMALL_BUFFER_SIZE));
     checkAccess(replyOut, isClient, block, blockToken,
         Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE);
 
@@ -369,11 +369,11 @@ class DataXceiver extends Receiver imple
                       (HdfsConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
           NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
           mirrorSock.setSoTimeout(timeoutValue);
-          mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
+          mirrorSock.setSendBufferSize(FSConstants.DEFAULT_DATA_SOCKET_SIZE);
           mirrorOut = new DataOutputStream(
              new BufferedOutputStream(
                          NetUtils.getOutputStream(mirrorSock, writeTimeout),
-                         SMALL_BUFFER_SIZE));
+                         FSConstants.SMALL_BUFFER_SIZE));
           mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
 
           new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
@@ -524,7 +524,7 @@ class DataXceiver extends Receiver imple
     final MetaDataInputStream metadataIn = 
       datanode.data.getMetaDataInputStream(block);
     final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(
-        metadataIn, BUFFER_SIZE));
+        metadataIn, FSConstants.IO_FILE_BUFFER_SIZE));
 
     updateCurrentThreadName("Getting checksum for block " + block);
     try {
@@ -603,7 +603,7 @@ class DataXceiver extends Receiver imple
       OutputStream baseStream = NetUtils.getOutputStream(
           s, datanode.socketWriteTimeout);
       reply = new DataOutputStream(new BufferedOutputStream(
-          baseStream, SMALL_BUFFER_SIZE));
+          baseStream, FSConstants.SMALL_BUFFER_SIZE));
 
       // send status first
       writeResponse(SUCCESS, reply);
@@ -681,15 +681,15 @@ class DataXceiver extends Receiver imple
 
       OutputStream baseStream = NetUtils.getOutputStream(proxySock, 
           datanode.socketWriteTimeout);
-      proxyOut = new DataOutputStream(
-                     new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
+      proxyOut = new DataOutputStream(new BufferedOutputStream(baseStream,
+          FSConstants.SMALL_BUFFER_SIZE));
 
       /* send request to the proxy */
       new Sender(proxyOut).copyBlock(block, blockToken);
 
       // receive the response from the proxy
       proxyReply = new DataInputStream(new BufferedInputStream(
-          NetUtils.getInputStream(proxySock), BUFFER_SIZE));
+          NetUtils.getInputStream(proxySock), FSConstants.IO_FILE_BUFFER_SIZE));
       BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
           HdfsProtoUtil.vintPrefixed(proxyReply));
 

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Tue Aug 16 00:37:15 2011
@@ -42,7 +42,7 @@ import org.apache.hadoop.util.Daemon;
  * other DataNodes.  This small server does not use the 
  * Hadoop IPC mechanism.
  */
-class DataXceiverServer implements Runnable, FSConstants {
+class DataXceiverServer implements Runnable {
   public static final Log LOG = DataNode.LOG;
   
   ServerSocket ss;
@@ -119,8 +119,8 @@ class DataXceiverServer implements Runna
       conf.getInt(DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY,
                   DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT);
     
-    this.estimateBlockSize = 
-      conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+    this.estimateBlockSize = conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
+        DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
     
     //set up parameter for cluster balancing
     this.balanceThrottler = new BlockBalanceThrottler(

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Tue Aug 16 00:37:15 2011
@@ -47,8 +47,8 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.ServletUtil;
 import org.apache.hadoop.util.StringUtils;
-import org.mortbay.util.URIUtil;
 
 @InterfaceAudience.Private
 public class DatanodeJspHelper {
@@ -289,7 +289,7 @@ public class DatanodeJspHelper {
     // Add the various links for looking at the file contents
     // URL for downloading the full file
     String downloadUrl = "http://" + req.getServerName() + ":"
-        + req.getServerPort() + "/streamFile" + URIUtil.encodePath(filename)
+        + req.getServerPort() + "/streamFile" + ServletUtil.encodePath(filename)
         + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr, true)
         + JspHelper.getDelegationTokenUrlParam(tokenString);
     out.print("<a name=\"viewOptions\"></a>");

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Tue Aug 16 00:37:15 2011
@@ -75,7 +75,7 @@ import org.apache.hadoop.util.Reflection
  *
  ***************************************************/
 @InterfaceAudience.Private
-public class FSDataset implements FSConstants, FSDatasetInterface {
+public class FSDataset implements FSDatasetInterface {
 
   /**
    * A node type that can be built into a tree reflecting the
@@ -465,7 +465,7 @@ public class FSDataset implements FSCons
         }
         checksumIn = new DataInputStream(
             new BufferedInputStream(new FileInputStream(metaFile),
-                BUFFER_SIZE));
+                FSConstants.IO_FILE_BUFFER_SIZE));
 
         // read and handle the common header here. For now just a version
         BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
@@ -775,12 +775,13 @@ public class FSDataset implements FSCons
      */
     private volatile List<FSVolume> volumes = null;
     BlockVolumeChoosingPolicy blockChooser;
-    int numFailedVolumes = 0;
+    int numFailedVolumes;
 
-    FSVolumeSet(FSVolume[] volumes, BlockVolumeChoosingPolicy blockChooser) {
+    FSVolumeSet(FSVolume[] volumes, int failedVols, BlockVolumeChoosingPolicy blockChooser) {
       List<FSVolume> list = Arrays.asList(volumes);
       this.volumes = Collections.unmodifiableList(list);
       this.blockChooser = blockChooser;
+      this.numFailedVolumes = failedVols;
     }
     
     private int numberOfVolumes() {
@@ -1144,15 +1145,19 @@ public class FSDataset implements FSCons
     String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
 
     int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length;
-
+    int volsFailed = volsConfigured - storage.getNumStorageDirs();
     this.validVolsRequired = volsConfigured - volFailuresTolerated;
 
-    if (validVolsRequired < 1
-        || validVolsRequired > storage.getNumStorageDirs()) {
+    if (volFailuresTolerated < 0 || volFailuresTolerated >= volsConfigured) {
+      throw new DiskErrorException("Invalid volume failure "
+          + " config value: " + volFailuresTolerated);
+    }
+    if (volsFailed > volFailuresTolerated) {
       throw new DiskErrorException("Too many failed volumes - "
           + "current valid volumes: " + storage.getNumStorageDirs() 
           + ", volumes configured: " + volsConfigured 
-          + ", volume failures tolerated: " + volFailuresTolerated );
+          + ", volumes failed: " + volsFailed
+          + ", volume failures tolerated: " + volFailuresTolerated);
     }
 
     FSVolume[] volArray = new FSVolume[storage.getNumStorageDirs()];
@@ -1170,7 +1175,7 @@ public class FSDataset implements FSCons
             RoundRobinVolumesPolicy.class,
             BlockVolumeChoosingPolicy.class),
         conf);
-    volumes = new FSVolumeSet(volArray, blockChooserImpl);
+    volumes = new FSVolumeSet(volArray, volsFailed, blockChooserImpl);
     volumes.getVolumeMap(volumeMap);
 
     File[] roots = new File[storage.getNumStorageDirs()];

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java Tue Aug 16 00:37:15 2011
@@ -91,7 +91,6 @@ public class BackupImage extends FSImage
     super(conf);
     storage.setDisablePreUpgradableLayoutCheck(true);
     bnState = BNState.DROP_UNTIL_NEXT_ROLL;
-    editLog.initJournals();
   }
 
   /**
@@ -210,14 +209,13 @@ public class BackupImage extends FSImage
       if (LOG.isTraceEnabled()) {
         LOG.debug("data:" + StringUtils.byteToHexString(data));
       }
-      backupInputStream.setBytes(data);
+
       FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
       int logVersion = storage.getLayoutVersion();
-      BufferedInputStream bin = new BufferedInputStream(backupInputStream);
-      DataInputStream in = new DataInputStream(bin);
-      Checksum checksum = FSEditLog.getChecksum();
-      int numLoaded = logLoader.loadEditRecords(logVersion, in, checksum, true,
-                                lastAppliedTxId + 1);
+      backupInputStream.setBytes(data, logVersion);
+
+      int numLoaded = logLoader.loadEditRecords(logVersion, backupInputStream, 
+                                                true, lastAppliedTxId + 1);
       if (numLoaded != numTxns) {
         throw new IOException("Batch of txns starting at txnid " +
             firstTxId + " was supposed to contain " + numTxns +

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java Tue Aug 16 00:37:15 2011
@@ -54,7 +54,7 @@ class BackupJournalManager implements Jo
   }
 
   @Override
-  public void purgeLogsOlderThan(long minTxIdToKeep, StoragePurger purger)
+  public void purgeLogsOlderThan(long minTxIdToKeep)
       throws IOException {
   }
 

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Tue Aug 16 00:37:15 2011
@@ -207,7 +207,7 @@ class Checkpointer extends Daemon {
     long lastApplied = bnImage.getLastAppliedTxId();
     LOG.debug("Doing checkpoint. Last applied: " + lastApplied);
     RemoteEditLogManifest manifest =
-      getNamenode().getEditLogManifest(bnImage.getLastAppliedTxId());
+      getNamenode().getEditLogManifest(bnImage.getLastAppliedTxId() + 1);
 
     if (!manifest.getLogs().isEmpty()) {
       RemoteEditLog firstRemoteLog = manifest.getLogs().get(0);

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryServlet.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryServlet.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryServlet.java Tue Aug 16 00:37:15 2011
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.ContentSumma
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ServletUtil;
 import org.znerd.xmlenc.XMLOutputter;
 
 /** Servlets for file checksum */
@@ -49,8 +50,7 @@ public class ContentSummaryServlet exten
       ugi.doAs(new PrivilegedExceptionAction<Void>() {
         @Override
         public Void run() throws Exception {
-          final String path = request.getPathInfo();
-
+          final String path = ServletUtil.getDecodedPath(request, "/contentSummary");
           final PrintWriter out = response.getWriter();
           final XMLOutputter xml = new XMLOutputter(out, "UTF-8");
           xml.declaration();

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java Tue Aug 16 00:37:15 2011
@@ -19,8 +19,6 @@ package org.apache.hadoop.hdfs.server.na
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
 
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServlet;
@@ -33,8 +31,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -86,48 +82,6 @@ abstract class DfsServlet extends HttpSe
     return DFSUtil.createNamenode(nnAddr, conf);
   }
 
-  /** Create a URI for redirecting request to a datanode */
-  protected URI createRedirectUri(String servletpath, 
-                                  UserGroupInformation ugi,
-                                  DatanodeID host, 
-                                  HttpServletRequest request,
-                                  NameNode nn
-                                  ) throws IOException, URISyntaxException {
-    final String hostname = host instanceof DatanodeInfo?
-        ((DatanodeInfo)host).getHostName(): host.getHost();
-    final String scheme = request.getScheme();
-    final int port = "https".equals(scheme)?
-        (Integer)getServletContext().getAttribute("datanode.https.port")
-        : host.getInfoPort();
-    final String filename = request.getPathInfo();
-    StringBuilder params = new StringBuilder();
-    params.append("filename=");
-    params.append(filename);
-    if (UserGroupInformation.isSecurityEnabled()) {
-      String tokenString = ugi.getTokens().iterator().next().encodeToUrlString();
-      params.append(JspHelper.getDelegationTokenUrlParam(tokenString));
-    } else {
-      params.append("&ugi=");
-      params.append(ugi.getShortUserName());
-    }
-    
-    // Add namenode address to the URL params
-    String nnAddr = NameNode.getHostPortString(nn.getNameNodeAddress());
-    params.append(JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr));
-    return new URI(scheme, null, hostname, port, servletpath,
-                   params.toString(), null);
-  }
-
-  /** Get filename from the request */
-  protected String getFilename(HttpServletRequest request,
-      HttpServletResponse response) throws IOException {
-    final String filename = request.getParameter("filename");
-    if (filename == null || filename.length() == 0) {
-      throw new IOException("Invalid filename");
-    }
-    return filename;
-  }
-  
   protected UserGroupInformation getUGI(HttpServletRequest request,
                                         Configuration conf) throws IOException {
     return JspHelper.getUGI(getServletContext(), request, conf);

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java Tue Aug 16 00:37:15 2011
@@ -21,6 +21,8 @@ import java.io.DataInputStream;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 
+import com.google.common.base.Preconditions;
+
 /**
  * An implementation of the abstract class {@link EditLogInputStream},
  * which is used to updates HDFS meta-data state on a backup node.
@@ -33,6 +35,9 @@ class EditLogBackupInputStream extends E
   String address; // sender address 
   private ByteBufferInputStream inner;
   private DataInputStream in;
+  private FSEditLogOp.Reader reader = null;
+  private FSEditLogLoader.PositionTrackingInputStream tracker = null;
+  private int version = 0;
 
   /**
    * A ByteArrayInputStream, which lets modify the underlying byte array.
@@ -60,7 +65,8 @@ class EditLogBackupInputStream extends E
   EditLogBackupInputStream(String name) throws IOException {
     address = name;
     inner = new ByteBufferInputStream();
-    in = new DataInputStream(inner);
+    in = null;
+    reader = null;
   }
 
   @Override // JournalStream
@@ -74,18 +80,20 @@ class EditLogBackupInputStream extends E
   }
 
   @Override
-  public int available() throws IOException {
-    return in.available();
+  public FSEditLogOp readOp() throws IOException {
+    Preconditions.checkState(reader != null,
+        "Must call setBytes() before readOp()");
+    return reader.readOp();
   }
 
   @Override
-  public int read() throws IOException {
-    return in.read();
+  public int getVersion() throws IOException {
+    return this.version;
   }
 
   @Override
-  public int read(byte[] b, int off, int len) throws IOException {
-    return in.read(b, off, len);
+  public long getPosition() {
+    return tracker.getPos();
   }
 
   @Override
@@ -99,16 +107,19 @@ class EditLogBackupInputStream extends E
     return inner.length();
   }
 
-  DataInputStream getDataInputStream() {
-    return in;
-  }
-
-  void setBytes(byte[] newBytes) throws IOException {
+  void setBytes(byte[] newBytes, int version) throws IOException {
     inner.setData(newBytes);
-    in.reset();
+    tracker = new FSEditLogLoader.PositionTrackingInputStream(inner);
+    in = new DataInputStream(tracker);
+
+    this.version = version;
+
+    reader = new FSEditLogOp.Reader(in, version);
   }
 
   void clear() throws IOException {
-    setBytes(null);
+    setBytes(null, 0);
+    reader = null;
+    this.version = 0;
   }
 }

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Tue Aug 16 00:37:15 2011
@@ -21,18 +21,51 @@ package org.apache.hadoop.hdfs.server.na
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.BufferedInputStream;
+import java.io.EOFException;
+import java.io.DataInputStream;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
+import org.apache.hadoop.io.IOUtils;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * An implementation of the abstract class {@link EditLogInputStream}, which
  * reads edits from a local file.
  */
 class EditLogFileInputStream extends EditLogInputStream {
-  private File file;
-  private FileInputStream fStream;
-
-  EditLogFileInputStream(File name) throws IOException {
+  private final File file;
+  private final FileInputStream fStream;
+  private final int logVersion;
+  private final FSEditLogOp.Reader reader;
+  private final FSEditLogLoader.PositionTrackingInputStream tracker;
+  
+  /**
+   * Open an EditLogInputStream for the given file.
+   * @param name filename to open
+   * @throws LogHeaderCorruptException if the header is either missing or
+   *         appears to be corrupt/truncated
+   * @throws IOException if an actual IO error occurs while reading the
+   *         header
+   */
+  EditLogFileInputStream(File name)
+      throws LogHeaderCorruptException, IOException {
     file = name;
     fStream = new FileInputStream(name);
+
+    BufferedInputStream bin = new BufferedInputStream(fStream);
+    tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
+    DataInputStream in = new DataInputStream(tracker);
+
+    try {
+      logVersion = readLogVersion(in);
+    } catch (EOFException eofe) {
+      throw new LogHeaderCorruptException("No header found in log");
+    }
+
+    reader = new FSEditLogOp.Reader(in, logVersion);
   }
 
   @Override // JournalStream
@@ -46,18 +79,18 @@ class EditLogFileInputStream extends Edi
   }
 
   @Override
-  public int available() throws IOException {
-    return fStream.available();
+  public FSEditLogOp readOp() throws IOException {
+    return reader.readOp();
   }
 
   @Override
-  public int read() throws IOException {
-    return fStream.read();
+  public int getVersion() throws IOException {
+    return logVersion;
   }
 
   @Override
-  public int read(byte[] b, int off, int len) throws IOException {
-    return fStream.read(b, off, len);
+  public long getPosition() {
+    return tracker.getPos();
   }
 
   @Override
@@ -76,4 +109,62 @@ class EditLogFileInputStream extends Edi
     return getName();
   }
 
+  static FSEditLogLoader.EditLogValidation validateEditLog(File file) throws IOException {
+    EditLogFileInputStream in;
+    try {
+      in = new EditLogFileInputStream(file);
+    } catch (LogHeaderCorruptException corrupt) {
+      // If it's missing its header, this is equivalent to no transactions
+      FSImage.LOG.warn("Log at " + file + " has no valid header",
+          corrupt);
+      return new FSEditLogLoader.EditLogValidation(0, 0);
+    }
+    
+    try {
+      return FSEditLogLoader.validateEditLog(in);
+    } finally {
+      IOUtils.closeStream(in);
+    }
+  }
+
+  /**
+   * Read the header of fsedit log
+   * @param in fsedit stream
+   * @return the edit log version number
+   * @throws IOException if error occurs
+   */
+  @VisibleForTesting
+  static int readLogVersion(DataInputStream in)
+      throws IOException, LogHeaderCorruptException {
+    int logVersion;
+    try {
+      logVersion = in.readInt();
+    } catch (EOFException eofe) {
+      throw new LogHeaderCorruptException(
+          "Reached EOF when reading log header");
+    }
+    if (logVersion < FSConstants.LAYOUT_VERSION) { // future version
+      throw new LogHeaderCorruptException(
+          "Unexpected version of the file system log file: "
+          + logVersion + ". Current version = "
+          + FSConstants.LAYOUT_VERSION + ".");
+    }
+    assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
+      "Unsupported version " + logVersion;
+    return logVersion;
+  }
+  
+  /**
+   * Exception indicating that the header of an edits log file is
+   * corrupted. This can be because the header is not present,
+   * or because the header data is invalid (eg claims to be
+   * over a newer version than the running NameNode)
+   */
+  static class LogHeaderCorruptException extends IOException {
+    private static final long serialVersionUID = 1L;
+
+    private LogHeaderCorruptException(String msg) {
+      super(msg);
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java Tue Aug 16 00:37:15 2011
@@ -17,10 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
+import java.io.Closeable;
 import java.io.IOException;
-import java.io.InputStream;
 
 /**
  * A generic abstract class to support reading edits log data from 
@@ -29,29 +27,41 @@ import java.io.InputStream;
  * It should stream bytes from the storage exactly as they were written
  * into the #{@link EditLogOutputStream}.
  */
-abstract class EditLogInputStream extends InputStream
-implements JournalStream {
-  /** {@inheritDoc} */
-  public abstract int available() throws IOException;
-
-  /** {@inheritDoc} */
-  public abstract int read() throws IOException;
+abstract class EditLogInputStream implements JournalStream, Closeable {
+  /**
+   * Close the stream.
+   * @throws IOException if an error occurred while closing
+   */
+  public abstract void close() throws IOException;
 
-  /** {@inheritDoc} */
-  public abstract int read(byte[] b, int off, int len) throws IOException;
+  /** 
+   * Read an operation from the stream
+   * @return an operation from the stream or null if at end of stream
+   * @throws IOException if there is an error reading from the stream
+   */
+  public abstract FSEditLogOp readOp() throws IOException;
 
-  /** {@inheritDoc} */
-  public abstract void close() throws IOException;
+  /** 
+   * Get the layout version of the data in the stream.
+   * @return the layout version of the ops in the stream.
+   * @throws IOException if there is an error reading the version
+   */
+  public abstract int getVersion() throws IOException;
 
   /**
-   * Return the size of the current edits log.
+   * Get the "position" of in the stream. This is useful for 
+   * debugging and operational purposes.
+   *
+   * Different stream types can have a different meaning for 
+   * what the position is. For file streams it means the byte offset
+   * from the start of the file.
+   *
+   * @return the position in the stream
    */
-  abstract long length() throws IOException;
+  public abstract long getPosition();
 
   /**
-   * Return DataInputStream based on this edit stream.
+   * Return the size of the current edits log.
    */
-  DataInputStream getDataInputStream() {
-    return new DataInputStream(new BufferedInputStream(this));
-  }
+  abstract long length() throws IOException;
 }

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Tue Aug 16 00:37:15 2011
@@ -444,8 +444,6 @@ public class FSDirectory implements Clos
       // modify file-> block and blocksMap
       fileNode.removeLastBlock(block);
       getBlockManager().removeBlockFromMap(block);
-      // If block is removed from blocksMap remove it from corruptReplicasMap
-      getBlockManager().removeFromCorruptReplicasMap(block);
 
       // write modified block locations to log
       fsImage.getEditLog().logOpenFile(path, fileNode);
@@ -809,7 +807,7 @@ public class FSDirectory implements Clos
    * @return array of file blocks
    * @throws QuotaExceededException
    */
-  Block[] setReplication(String src, short replication, int[] oldReplication)
+  Block[] setReplication(String src, short replication, short[] oldReplication)
       throws QuotaExceededException, UnresolvedLinkException {
     waitForReady();
     Block[] fileBlocks = null;
@@ -826,14 +824,10 @@ public class FSDirectory implements Clos
 
   Block[] unprotectedSetReplication(String src, 
                                     short replication,
-                                    int[] oldReplication
+                                    short[] oldReplication
                                     ) throws QuotaExceededException, 
                                     UnresolvedLinkException {
     assert hasWriteLock();
-    if (oldReplication == null) {
-      oldReplication = new int[1];
-    }
-    oldReplication[0] = -1;
 
     INode[] inodes = rootDir.getExistingPathINodes(src, true);
     INode inode = inodes[inodes.length - 1];
@@ -845,14 +839,17 @@ public class FSDirectory implements Clos
       return null;
     }
     INodeFile fileNode = (INodeFile)inode;
-    oldReplication[0] = fileNode.getReplication();
+    final short oldRepl = fileNode.getReplication();
 
     // check disk quota
-    long dsDelta = (replication - oldReplication[0]) *
-         (fileNode.diskspaceConsumed()/oldReplication[0]);
+    long dsDelta = (replication - oldRepl) * (fileNode.diskspaceConsumed()/oldRepl);
     updateCount(inodes, inodes.length-1, 0, dsDelta, true);
 
     fileNode.setReplication(replication);
+
+    if (oldReplication != null) {
+      oldReplication[0] = oldRepl;
+    }
     return fileNode.getBlocks();
   }
 
@@ -1344,7 +1341,7 @@ public class FSDirectory implements Clos
    * @throws QuotaExceededException if the new count violates any quota limit
    * @throws FileNotFound if path does not exist.
    */
-  public void updateSpaceConsumed(String path, long nsDelta, long dsDelta)
+  void updateSpaceConsumed(String path, long nsDelta, long dsDelta)
                                          throws QuotaExceededException,
                                                 FileNotFoundException,
                                                 UnresolvedLinkException {
@@ -2075,8 +2072,9 @@ public class FSDirectory implements Clos
         size = fileNode.computeFileSize(true);
         replication = fileNode.getReplication();
         blocksize = fileNode.getPreferredBlockSize();
-        loc = getFSNamesystem().getBlockLocationsInternal(
-            fileNode, 0L, size, false);
+        loc = getFSNamesystem().getBlockManager().createLocatedBlocks(
+            fileNode.getBlocks(), fileNode.computeFileSize(false),
+            fileNode.isUnderConstruction(), 0L, size, false);
         if (loc==null) {
           loc = new LocatedBlocks();
         }

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Tue Aug 16 00:37:15 2011
@@ -18,10 +18,10 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.zip.Checksum;
-import java.util.zip.CheckedOutputStream;
+import java.util.SortedSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -29,28 +29,26 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.util.PureJavaCrc32;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Sets;
 
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
 
 /**
@@ -116,18 +114,6 @@ public class FSEditLog  {
 
   private NNStorage storage;
 
-  private static ThreadLocal<Checksum> localChecksum =
-    new ThreadLocal<Checksum>() {
-    protected Checksum initialValue() {
-      return new PureJavaCrc32();
-    }
-  };
-
-  /** Get a thread local checksum */
-  public static Checksum getChecksum() {
-    return localChecksum.get();
-  }
-
   private static class TransactionId {
     public long txid;
 
@@ -148,15 +134,6 @@ public class FSEditLog  {
     this.storage = storage;
     metrics = NameNode.getNameNodeMetrics();
     lastPrintTime = now();
-  }
-  
-  /**
-   * Initialize the list of edit journals
-   */
-  synchronized void initJournals() {
-    assert journals.isEmpty();
-    Preconditions.checkState(state == State.UNINITIALIZED,
-        "Bad state: %s", state);
     
     for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
       journals.add(new JournalAndStream(new FileJournalManager(sd)));
@@ -174,8 +151,7 @@ public class FSEditLog  {
    * log segment.
    */
   synchronized void open() throws IOException {
-    Preconditions.checkState(state == State.UNINITIALIZED);
-    initJournals();
+    Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS);
 
     startLogSegment(getLastWrittenTxId() + 1, true);
     assert state == State.IN_SEGMENT : "Bad state: " + state;
@@ -755,18 +731,64 @@ public class FSEditLog  {
   /**
    * Return a manifest of what finalized edit logs are available
    */
-  public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
-      throws IOException {
-    FSImageTransactionalStorageInspector inspector =
-        new FSImageTransactionalStorageInspector();
-
-    for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
-      inspector.inspectDirectory(sd);
+  public synchronized RemoteEditLogManifest getEditLogManifest(
+      long fromTxId) throws IOException {
+    // Collect RemoteEditLogs available from each FileJournalManager
+    List<RemoteEditLog> allLogs = Lists.newArrayList();
+    for (JournalAndStream j : journals) {
+      if (j.getManager() instanceof FileJournalManager) {
+        FileJournalManager fjm = (FileJournalManager)j.getManager();
+        try {
+          allLogs.addAll(fjm.getRemoteEditLogs(fromTxId));
+        } catch (Throwable t) {
+          LOG.warn("Cannot list edit logs in " + fjm, t);
+        }
+      }
     }
     
-    return inspector.getEditLogManifest(sinceTxId);
+    // Group logs by their starting txid
+    ImmutableListMultimap<Long, RemoteEditLog> logsByStartTxId =
+      Multimaps.index(allLogs, RemoteEditLog.GET_START_TXID);
+    long curStartTxId = fromTxId;
+
+    List<RemoteEditLog> logs = Lists.newArrayList();
+    while (true) {
+      ImmutableList<RemoteEditLog> logGroup = logsByStartTxId.get(curStartTxId);
+      if (logGroup.isEmpty()) {
+        // we have a gap in logs - for example because we recovered some old
+        // storage directory with ancient logs. Clear out any logs we've
+        // accumulated so far, and then skip to the next segment of logs
+        // after the gap.
+        SortedSet<Long> startTxIds = Sets.newTreeSet(logsByStartTxId.keySet());
+        startTxIds = startTxIds.tailSet(curStartTxId);
+        if (startTxIds.isEmpty()) {
+          break;
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Found gap in logs at " + curStartTxId + ": " +
+                "not returning previous logs in manifest.");
+          }
+          logs.clear();
+          curStartTxId = startTxIds.first();
+          continue;
+        }
+      }
+
+      // Find the one that extends the farthest forward
+      RemoteEditLog bestLog = Collections.max(logGroup);
+      logs.add(bestLog);
+      // And then start looking from after that point
+      curStartTxId = bestLog.getEndTxId() + 1;
+    }
+    RemoteEditLogManifest ret = new RemoteEditLogManifest(logs);
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Generated manifest for logs since " + fromTxId + ":"
+          + ret);      
+    }
+    return ret;
   }
-  
+ 
   /**
    * Finalizes the current edit log and opens a new log segment.
    * @return the transaction id of the BEGIN_LOG_SEGMENT transaction
@@ -877,8 +899,7 @@ public class FSEditLog  {
   /**
    * Archive any log files that are older than the given txid.
    */
-  public void purgeLogsOlderThan(
-      final long minTxIdToKeep, final StoragePurger purger) {
+  public void purgeLogsOlderThan(final long minTxIdToKeep) {
     synchronized (this) {
       // synchronized to prevent findbugs warning about inconsistent
       // synchronization. This will be JIT-ed out if asserts are
@@ -892,7 +913,7 @@ public class FSEditLog  {
     mapJournalsAndReportErrors(new JournalClosure() {
       @Override
       public void apply(JournalAndStream jas) throws IOException {
-        jas.manager.purgeLogsOlderThan(minTxIdToKeep, purger);
+        jas.manager.purgeLogsOlderThan(minTxIdToKeep);
       }
     }, "purging logs older than " + minTxIdToKeep);
   }
@@ -1080,7 +1101,8 @@ public class FSEditLog  {
       stream = null;
     }
     
-    private void abort() {
+    @VisibleForTesting
+    void abort() {
       if (stream == null) return;
       try {
         stream.abort();

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Tue Aug 16 00:37:15 2011
@@ -19,15 +19,12 @@ package org.apache.hadoop.hdfs.server.na
 
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Arrays;
-import java.util.zip.Checksum;
+import java.util.EnumMap;
 
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
@@ -37,8 +34,7 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.LogHeader;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Reader;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream.LogHeaderCorruptException;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ClearNSQuotaOp;
@@ -60,6 +56,10 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
+import org.apache.hadoop.hdfs.util.Holder;
+import org.apache.hadoop.io.IOUtils;
+
+import com.google.common.base.Joiner;
 
 public class FSEditLogLoader {
   private final FSNamesystem fsNamesys;
@@ -84,49 +84,36 @@ public class FSEditLogLoader {
   }
 
   int loadFSEdits(EditLogInputStream edits, boolean closeOnExit,
-      long expectedStartingTxId)
-  throws IOException {
-    BufferedInputStream bin = new BufferedInputStream(edits);
-    DataInputStream in = new DataInputStream(bin);
-
+                  long expectedStartingTxId)
+      throws IOException {
     int numEdits = 0;
+    int logVersion = edits.getVersion();
 
     try {
-      LogHeader header = LogHeader.read(in);
-      numEdits = loadEditRecords(
-          header.logVersion, in, header.checksum, false,
-          expectedStartingTxId);
+      numEdits = loadEditRecords(logVersion, edits, false, 
+                                 expectedStartingTxId);
     } finally {
-      if(closeOnExit)
-        in.close();
+      if(closeOnExit) {
+        edits.close();
+      }
     }
     
     return numEdits;
   }
 
   @SuppressWarnings("deprecation")
-  int loadEditRecords(int logVersion, DataInputStream in,
-                      Checksum checksum, boolean closeOnExit,
+  int loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit,
                       long expectedStartingTxId)
       throws IOException {
     FSDirectory fsDir = fsNamesys.dir;
     int numEdits = 0;
 
-    int numOpAdd = 0, numOpClose = 0, numOpDelete = 0,
-        numOpRenameOld = 0, numOpSetRepl = 0, numOpMkDir = 0,
-        numOpSetPerm = 0, numOpSetOwner = 0, numOpSetGenStamp = 0,
-        numOpTimes = 0, numOpRename = 0, numOpConcatDelete = 0, 
-        numOpSymlink = 0, numOpGetDelegationToken = 0,
-        numOpRenewDelegationToken = 0, numOpCancelDelegationToken = 0, 
-        numOpUpdateMasterKey = 0, numOpReassignLease = 0, numOpOther = 0;
+    EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts =
+      new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class);
 
     fsNamesys.writeLock();
     fsDir.writeLock();
 
-    // Keep track of the file offsets of the last several opcodes.
-    // This is handy when manually recovering corrupted edits files.
-    PositionTrackingInputStream tracker = new PositionTrackingInputStream(in);
-    in = new DataInputStream(tracker);
     long recentOpcodeOffsets[] = new long[4];
     Arrays.fill(recentOpcodeOffsets, -1);
 
@@ -134,12 +121,10 @@ public class FSEditLogLoader {
       long txId = expectedStartingTxId - 1;
 
       try {
-        FSEditLogOp.Reader reader = new FSEditLogOp.Reader(in, logVersion,
-                                                           checksum);
         FSEditLogOp op;
-        while ((op = reader.readOp()) != null) {
+        while ((op = in.readOp()) != null) {
           recentOpcodeOffsets[numEdits % recentOpcodeOffsets.length] =
-              tracker.getPos();
+            in.getPosition();
           if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
             long thisTxId = op.txid;
             if (thisTxId != txId + 1) {
@@ -150,6 +135,7 @@ public class FSEditLogLoader {
           }
 
           numEdits++;
+          incrOpCount(op.opCode, opCounts);
           switch (op.opCode) {
           case OP_ADD:
           case OP_CLOSE: {
@@ -157,8 +143,8 @@ public class FSEditLogLoader {
 
             // versions > 0 support per file replication
             // get name and replication
-            short replication
-              = fsNamesys.adjustReplication(addCloseOp.replication);
+            final short replication  = fsNamesys.getBlockManager(
+                ).adjustReplication(addCloseOp.replication);
 
             long blockSize = addCloseOp.blockSize;
             BlockInfo blocks[] = new BlockInfo[addCloseOp.blocks.length];
@@ -209,7 +195,6 @@ public class FSEditLogLoader {
                 blocks, replication,
                 addCloseOp.mtime, addCloseOp.atime, blockSize);
             if (addCloseOp.opCode == FSEditLogOpCodes.OP_ADD) {
-              numOpAdd++;
               //
               // Replace current node with a INodeUnderConstruction.
               // Recreate in-memory lease record.
@@ -231,24 +216,20 @@ public class FSEditLogLoader {
             break;
           }
           case OP_SET_REPLICATION: {
-            numOpSetRepl++;
             SetReplicationOp setReplicationOp = (SetReplicationOp)op;
-            short replication
-              = fsNamesys.adjustReplication(setReplicationOp.replication);
+            short replication = fsNamesys.getBlockManager().adjustReplication(
+                setReplicationOp.replication);
             fsDir.unprotectedSetReplication(setReplicationOp.path,
                                             replication, null);
             break;
           }
           case OP_CONCAT_DELETE: {
-            numOpConcatDelete++;
-
             ConcatDeleteOp concatDeleteOp = (ConcatDeleteOp)op;
             fsDir.unprotectedConcat(concatDeleteOp.trg, concatDeleteOp.srcs,
                 concatDeleteOp.timestamp);
             break;
           }
           case OP_RENAME_OLD: {
-            numOpRenameOld++;
             RenameOldOp renameOp = (RenameOldOp)op;
             HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
             fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
@@ -257,14 +238,11 @@ public class FSEditLogLoader {
             break;
           }
           case OP_DELETE: {
-            numOpDelete++;
-
             DeleteOp deleteOp = (DeleteOp)op;
             fsDir.unprotectedDelete(deleteOp.path, deleteOp.timestamp);
             break;
           }
           case OP_MKDIR: {
-            numOpMkDir++;
             MkdirOp mkdirOp = (MkdirOp)op;
             PermissionStatus permissions = fsNamesys.getUpgradePermission();
             if (mkdirOp.permissions != null) {
@@ -276,22 +254,17 @@ public class FSEditLogLoader {
             break;
           }
           case OP_SET_GENSTAMP: {
-            numOpSetGenStamp++;
             SetGenstampOp setGenstampOp = (SetGenstampOp)op;
             fsNamesys.setGenerationStamp(setGenstampOp.genStamp);
             break;
           }
           case OP_SET_PERMISSIONS: {
-            numOpSetPerm++;
-
             SetPermissionsOp setPermissionsOp = (SetPermissionsOp)op;
             fsDir.unprotectedSetPermission(setPermissionsOp.src,
                                            setPermissionsOp.permissions);
             break;
           }
           case OP_SET_OWNER: {
-            numOpSetOwner++;
-
             SetOwnerOp setOwnerOp = (SetOwnerOp)op;
             fsDir.unprotectedSetOwner(setOwnerOp.src, setOwnerOp.username,
                                       setOwnerOp.groupname);
@@ -320,7 +293,6 @@ public class FSEditLogLoader {
             break;
 
           case OP_TIMES: {
-            numOpTimes++;
             TimesOp timesOp = (TimesOp)op;
 
             fsDir.unprotectedSetTimes(timesOp.path,
@@ -329,8 +301,6 @@ public class FSEditLogLoader {
             break;
           }
           case OP_SYMLINK: {
-            numOpSymlink++;
-
             SymlinkOp symlinkOp = (SymlinkOp)op;
             fsDir.unprotectedSymlink(symlinkOp.path, symlinkOp.value,
                                      symlinkOp.mtime, symlinkOp.atime,
@@ -338,7 +308,6 @@ public class FSEditLogLoader {
             break;
           }
           case OP_RENAME: {
-            numOpRename++;
             RenameOp renameOp = (RenameOp)op;
 
             HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
@@ -348,7 +317,6 @@ public class FSEditLogLoader {
             break;
           }
           case OP_GET_DELEGATION_TOKEN: {
-            numOpGetDelegationToken++;
             GetDelegationTokenOp getDelegationTokenOp
               = (GetDelegationTokenOp)op;
 
@@ -358,8 +326,6 @@ public class FSEditLogLoader {
             break;
           }
           case OP_RENEW_DELEGATION_TOKEN: {
-            numOpRenewDelegationToken++;
-
             RenewDelegationTokenOp renewDelegationTokenOp
               = (RenewDelegationTokenOp)op;
             fsNamesys.getDelegationTokenSecretManager()
@@ -368,8 +334,6 @@ public class FSEditLogLoader {
             break;
           }
           case OP_CANCEL_DELEGATION_TOKEN: {
-            numOpCancelDelegationToken++;
-
             CancelDelegationTokenOp cancelDelegationTokenOp
               = (CancelDelegationTokenOp)op;
             fsNamesys.getDelegationTokenSecretManager()
@@ -378,14 +342,12 @@ public class FSEditLogLoader {
             break;
           }
           case OP_UPDATE_MASTER_KEY: {
-            numOpUpdateMasterKey++;
             UpdateMasterKeyOp updateMasterKeyOp = (UpdateMasterKeyOp)op;
             fsNamesys.getDelegationTokenSecretManager()
               .updatePersistedMasterKey(updateMasterKeyOp.key);
             break;
           }
           case OP_REASSIGN_LEASE: {
-            numOpReassignLease++;
             ReassignLeaseOp reassignLeaseOp = (ReassignLeaseOp)op;
 
             Lease lease = fsNamesys.leaseManager.getLease(
@@ -400,17 +362,16 @@ public class FSEditLogLoader {
           case OP_START_LOG_SEGMENT:
           case OP_END_LOG_SEGMENT: {
             // no data in here currently.
-            numOpOther++;
             break;
           }
           case OP_DATANODE_ADD:
           case OP_DATANODE_REMOVE:
-            numOpOther++;
             break;
           default:
             throw new IOException("Invalid operation read " + op.opCode);
           }
         }
+
       } catch (IOException ex) {
         check203UpgradeFailure(logVersion, ex);
       } finally {
@@ -421,7 +382,7 @@ public class FSEditLogLoader {
       // Catch Throwable because in the case of a truly corrupt edits log, any
       // sort of error might be thrown (NumberFormat, NullPointer, EOF, etc.)
       StringBuilder sb = new StringBuilder();
-      sb.append("Error replaying edit log at offset " + tracker.getPos());
+      sb.append("Error replaying edit log at offset " + in.getPosition());
       if (recentOpcodeOffsets[0] != -1) {
         Arrays.sort(recentOpcodeOffsets);
         sb.append("\nRecent opcode offsets:");
@@ -439,26 +400,31 @@ public class FSEditLogLoader {
       fsNamesys.writeUnlock();
     }
     if (FSImage.LOG.isDebugEnabled()) {
-      FSImage.LOG.debug("numOpAdd = " + numOpAdd + " numOpClose = " + numOpClose 
-          + " numOpDelete = " + numOpDelete 
-          + " numOpRenameOld = " + numOpRenameOld 
-          + " numOpSetRepl = " + numOpSetRepl + " numOpMkDir = " + numOpMkDir
-          + " numOpSetPerm = " + numOpSetPerm 
-          + " numOpSetOwner = " + numOpSetOwner
-          + " numOpSetGenStamp = " + numOpSetGenStamp 
-          + " numOpTimes = " + numOpTimes
-          + " numOpConcatDelete  = " + numOpConcatDelete
-          + " numOpRename = " + numOpRename
-          + " numOpGetDelegationToken = " + numOpGetDelegationToken
-          + " numOpRenewDelegationToken = " + numOpRenewDelegationToken
-          + " numOpCancelDelegationToken = " + numOpCancelDelegationToken
-          + " numOpUpdateMasterKey = " + numOpUpdateMasterKey
-          + " numOpReassignLease = " + numOpReassignLease
-          + " numOpOther = " + numOpOther);
+      dumpOpCounts(opCounts);
     }
     return numEdits;
   }
 
+
+  private static void dumpOpCounts(
+      EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("Summary of operations loaded from edit log:\n  ");
+    Joiner.on("\n  ").withKeyValueSeparator("=").appendTo(sb, opCounts);
+    FSImage.LOG.debug(sb.toString());
+  }
+
+  private void incrOpCount(FSEditLogOpCodes opCode,
+      EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts) {
+    Holder<Integer> holder = opCounts.get(opCode);
+    if (holder == null) {
+      holder = new Holder<Integer>(1);
+      opCounts.put(opCode, holder);
+    } else {
+      holder.held++;
+    }
+  }
+
   /**
    * Throw appropriate exception during upgrade from 203, when editlog loading
    * could fail due to opcode conflicts.
@@ -480,49 +446,50 @@ public class FSEditLogLoader {
     }
   }
   
+  static EditLogValidation validateEditLog(File file) throws IOException {
+    EditLogFileInputStream in;
+    try {
+      in = new EditLogFileInputStream(file);
+    } catch (LogHeaderCorruptException corrupt) {
+      // If it's missing its header, this is equivalent to no transactions
+      FSImage.LOG.warn("Log at " + file + " has no valid header",
+          corrupt);
+      return new EditLogValidation(0, 0);
+    }
+    
+    try {
+      return validateEditLog(in);
+    } finally {
+      IOUtils.closeStream(in);
+    }
+  }
+
   /**
-   * Return the number of valid transactions in the file. If the file is
+   * Return the number of valid transactions in the stream. If the stream is
    * truncated during the header, returns a value indicating that there are
-   * 0 valid transactions.
-   * @throws IOException if the file cannot be read due to an IO error (eg
+   * 0 valid transactions. This reads through the stream but does not close
+   * it.
+   * @throws IOException if the stream cannot be read due to an IO error (eg
    *                     if the log does not exist)
    */
-  static EditLogValidation validateEditLog(File f) throws IOException {
-    FileInputStream fis = new FileInputStream(f);
+  static EditLogValidation validateEditLog(EditLogInputStream in) {
+    long numValid = 0;
+    long lastPos = 0;
     try {
-      PositionTrackingInputStream tracker = new PositionTrackingInputStream(
-          new BufferedInputStream(fis));
-      DataInputStream dis = new DataInputStream(tracker);
-      LogHeader header; 
-      try {
-        header = LogHeader.read(dis);
-      } catch (Throwable t) {
-        FSImage.LOG.debug("Unable to read header from " + f +
-            " -> no valid transactions in this file.");
-        return new EditLogValidation(0, 0);
-      }
-      
-      Reader reader = new FSEditLogOp.Reader(dis, header.logVersion, header.checksum);
-      long numValid = 0;
-      long lastPos = 0;
-      try {
-        while (true) {
-          lastPos = tracker.getPos();
-          if (reader.readOp() == null) {
-            break;
-          }
-          numValid++;
+      while (true) {
+        lastPos = in.getPosition();
+        if (in.readOp() == null) {
+          break;
         }
-      } catch (Throwable t) {
-        // Catch Throwable and not just IOE, since bad edits may generate
-        // NumberFormatExceptions, AssertionErrors, OutOfMemoryErrors, etc.
-        FSImage.LOG.debug("Caught exception after reading " + numValid +
-            " ops from " + f + " while determining its valid length.", t);
+        numValid++;
       }
-      return new EditLogValidation(lastPos, numValid);
-    } finally {
-      fis.close();
+    } catch (Throwable t) {
+      // Catch Throwable and not just IOE, since bad edits may generate
+      // NumberFormatExceptions, AssertionErrors, OutOfMemoryErrors, etc.
+      FSImage.LOG.debug("Caught exception after reading " + numValid +
+          " ops from " + in + " while determining its valid length.", t);
     }
+    return new EditLogValidation(lastPos, numValid);
   }
   
   static class EditLogValidation {
@@ -536,9 +503,9 @@ public class FSEditLogLoader {
   }
 
   /**
-   * Stream wrapper that keeps track of the current file position.
+   * Stream wrapper that keeps track of the current stream position.
    */
-  private static class PositionTrackingInputStream extends FilterInputStream {
+  static class PositionTrackingInputStream extends FilterInputStream {
     private long curPos = 0;
     private long markPos = -1;
 
@@ -582,4 +549,5 @@ public class FSEditLogLoader {
       return curPos;
     }
   }
+
 }

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Tue Aug 16 00:37:15 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.util.PureJavaCrc32;
 
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
@@ -1323,71 +1324,17 @@ public abstract class FSEditLogOp {
       return longWritable.get();
     }
   }
-  
-  /**
-   * Class to encapsulate the header at the top of a log file.
-   */
-  static class LogHeader {
-    final int logVersion;
-    final Checksum checksum;
-
-    public LogHeader(int logVersion, Checksum checksum) {
-      this.logVersion = logVersion;
-      this.checksum = checksum;
-    }
-
-    static LogHeader read(DataInputStream in) throws IOException {
-      int logVersion = 0;
-
-      logVersion = FSEditLogOp.LogHeader.readLogVersion(in);
-      Checksum checksum = null;
-      if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
-        checksum = FSEditLog.getChecksum();
-      }
-      return new LogHeader(logVersion, checksum);
-    }
-    
-    /**
-     * Read the header of fsedit log
-     * @param in fsedit stream
-     * @return the edit log version number
-     * @throws IOException if error occurs
-     */
-    private static int readLogVersion(DataInputStream in) throws IOException {
-      int logVersion = 0;
-      // Read log file version. Could be missing.
-      in.mark(4);
-      // If edits log is greater than 2G, available method will return negative
-      // numbers, so we avoid having to call available
-      boolean available = true;
-      try {
-        logVersion = in.readByte();
-      } catch (EOFException e) {
-        available = false;
-      }
-      if (available) {
-        in.reset();
-        logVersion = in.readInt();
-        if (logVersion < FSConstants.LAYOUT_VERSION) // future version
-          throw new IOException(
-              "Unexpected version of the file system log file: "
-              + logVersion + ". Current version = "
-              + FSConstants.LAYOUT_VERSION + ".");
-      }
-      assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
-        "Unsupported version " + logVersion;
-      return logVersion;
-    }
-  }
 
   /**
    * Class for writing editlog ops
    */
   public static class Writer {
     private final DataOutputBuffer buf;
+    private final Checksum checksum;
 
     public Writer(DataOutputBuffer out) {
       this.buf = out;
+      this.checksum = new PureJavaCrc32();
     }
 
     /**
@@ -1402,7 +1349,6 @@ public abstract class FSEditLogOp {
       buf.writeLong(op.txid);
       op.writeFields(buf);
       int end = buf.getLength();
-      Checksum checksum = FSEditLog.getChecksum();
       checksum.reset();
       checksum.update(buf.getData(), start, end-start);
       int sum = (int)checksum.getValue();
@@ -1422,19 +1368,22 @@ public abstract class FSEditLogOp {
      * Construct the reader
      * @param in The stream to read from.
      * @param logVersion The version of the data coming from the stream.
-     * @param checksum Checksum being used with input stream.
      */
     @SuppressWarnings("deprecation")
-    public Reader(DataInputStream in, int logVersion,
-                  Checksum checksum) {
-      if (checksum != null) {
+    public Reader(DataInputStream in, int logVersion) {
+      this.logVersion = logVersion;
+      if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
+        this.checksum = new PureJavaCrc32();
+      } else {
+        this.checksum = null;
+      }
+
+      if (this.checksum != null) {
         this.in = new DataInputStream(
-            new CheckedInputStream(in, checksum));
+            new CheckedInputStream(in, this.checksum));
       } else {
         this.in = in;
       }
-      this.logVersion = logVersion;
-      this.checksum = checksum;
     }
 
     /**

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Tue Aug 16 00:37:15 2011
@@ -137,10 +137,6 @@ public class FSImage implements Closeabl
                              FSImage.getCheckpointEditsDirs(conf, null));
 
     storage = new NNStorage(conf, imageDirs, editsDirs);
-    if (ns != null) {
-      storage.setUpgradeManager(ns.upgradeManager);
-    }
-
     if(conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY,
                        DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT)) {
       storage.setRestoreFailedStorage(true);

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Tue Aug 16 00:37:15 2011
@@ -330,7 +330,7 @@ class FSImageFormat {
     
     int imgVersion = getLayoutVersion();
     short replication = in.readShort();
-    replication = namesystem.adjustReplication(replication);
+    replication = namesystem.getBlockManager().adjustReplication(replication);
     modificationTime = in.readLong();
     if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, imgVersion)) {
       atime = in.readLong();

Modified: hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java (original)
+++ hadoop/common/branches/HDFS-1623/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java Tue Aug 16 00:37:15 2011
@@ -96,4 +96,36 @@ abstract class FSImageStorageInspector {
       return sb.toString();
     }
   }
+
+  /**
+   * Record of an image that has been located and had its filename parsed.
+   */
+  static class FSImageFile {
+    final StorageDirectory sd;    
+    final long txId;
+    private final File file;
+    
+    FSImageFile(StorageDirectory sd, File file, long txId) {
+      assert txId >= 0 : "Invalid txid on " + file +": " + txId;
+      
+      this.sd = sd;
+      this.txId = txId;
+      this.file = file;
+    } 
+    
+    File getFile() {
+      return file;
+    }
+
+    public long getCheckpointTxId() {
+      return txId;
+    }
+    
+    @Override
+    public String toString() {
+      return String.format("FSImageFile(file=%s, cpktTxId=%019d)", 
+                           file.toString(), txId);
+    }
+  }
+
 }



Mime
View raw message