hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1196458 [6/9] - in /hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/bin/ src/main/java/ src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ ...
Date Wed, 02 Nov 2011 05:35:26 GMT
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java Wed Nov  2 05:34:31 2011
@@ -27,6 +27,7 @@ import java.security.PrivilegedException
 import java.util.EnumSet;
 
 import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
@@ -47,6 +48,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -77,7 +79,35 @@ import com.sun.jersey.spi.container.Reso
 public class DatanodeWebHdfsMethods {
   public static final Log LOG = LogFactory.getLog(DatanodeWebHdfsMethods.class);
 
+  private static final UriFsPathParam ROOT = new UriFsPathParam("");
+
   private @Context ServletContext context;
+  private @Context HttpServletResponse response;
+
+  /** Handle HTTP PUT request for the root. */
+  @PUT
+  @Path("/")
+  @Consumes({"*/*"})
+  @Produces({MediaType.APPLICATION_JSON})
+  public Response putRoot(
+      final InputStream in,
+      @Context final UserGroupInformation ugi,
+      @QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
+          final PutOpParam op,
+      @QueryParam(PermissionParam.NAME) @DefaultValue(PermissionParam.DEFAULT)
+          final PermissionParam permission,
+      @QueryParam(OverwriteParam.NAME) @DefaultValue(OverwriteParam.DEFAULT)
+          final OverwriteParam overwrite,
+      @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
+          final BufferSizeParam bufferSize,
+      @QueryParam(ReplicationParam.NAME) @DefaultValue(ReplicationParam.DEFAULT)
+          final ReplicationParam replication,
+      @QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT)
+          final BlockSizeParam blockSize
+      ) throws IOException, InterruptedException {
+    return put(in, ugi, ROOT, op, permission, overwrite, bufferSize,
+        replication, blockSize);
+  }
 
   /** Handle HTTP PUT request. */
   @PUT
@@ -100,7 +130,7 @@ public class DatanodeWebHdfsMethods {
           final ReplicationParam replication,
       @QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT)
           final BlockSizeParam blockSize
-      ) throws IOException, URISyntaxException, InterruptedException {
+      ) throws IOException, InterruptedException {
 
     if (LOG.isTraceEnabled()) {
       LOG.trace(op + ": " + path + ", ugi=" + ugi
@@ -108,6 +138,9 @@ public class DatanodeWebHdfsMethods {
               replication, blockSize));
     }
 
+    //clear content type
+    response.setContentType(null);
+
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
       public Response run() throws IOException, URISyntaxException {
@@ -120,17 +153,25 @@ public class DatanodeWebHdfsMethods {
     {
       final Configuration conf = new Configuration(datanode.getConf());
       final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
-      final DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
+      conf.set(FsPermission.UMASK_LABEL, "000");
+
       final int b = bufferSize.getValue(conf);
-      final FSDataOutputStream out = new FSDataOutputStream(dfsclient.create(
-          fullpath, permission.getFsPermission(), 
-          overwrite.getValue() ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
-              : EnumSet.of(CreateFlag.CREATE),
-          replication.getValue(), blockSize.getValue(conf), null, b), null);
+      DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
+      FSDataOutputStream out = null;
       try {
+        out = new FSDataOutputStream(dfsclient.create(
+            fullpath, permission.getFsPermission(), 
+            overwrite.getValue() ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+                : EnumSet.of(CreateFlag.CREATE),
+            replication.getValue(conf), blockSize.getValue(conf), null, b), null);
         IOUtils.copyBytes(in, out, b);
-      } finally {
         out.close();
+        out = null;
+        dfsclient.close();
+        dfsclient = null;
+      } finally {
+        IOUtils.cleanup(LOG, out);
+        IOUtils.cleanup(LOG, dfsclient);
       }
       final InetSocketAddress nnHttpAddr = NameNode.getHttpAddress(conf);
       final URI uri = new URI(WebHdfsFileSystem.SCHEME, null,
@@ -144,6 +185,22 @@ public class DatanodeWebHdfsMethods {
     });
   }
 
+  /** Handle HTTP POST request for the root for the root. */
+  @POST
+  @Path("/")
+  @Consumes({"*/*"})
+  @Produces({MediaType.APPLICATION_JSON})
+  public Response postRoot(
+      final InputStream in,
+      @Context final UserGroupInformation ugi,
+      @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
+          final PostOpParam op,
+      @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
+          final BufferSizeParam bufferSize
+      ) throws IOException, InterruptedException {
+    return post(in, ugi, ROOT, op, bufferSize);
+  }
+
   /** Handle HTTP POST request. */
   @POST
   @Path("{" + UriFsPathParam.NAME + ":.*}")
@@ -157,13 +214,16 @@ public class DatanodeWebHdfsMethods {
           final PostOpParam op,
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
           final BufferSizeParam bufferSize
-      ) throws IOException, URISyntaxException, InterruptedException {
+      ) throws IOException, InterruptedException {
 
     if (LOG.isTraceEnabled()) {
       LOG.trace(op + ": " + path + ", ugi=" + ugi
           + Param.toSortedString(", ", bufferSize));
     }
 
+    //clear content type
+    response.setContentType(null);
+
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
       public Response run() throws IOException {
@@ -176,13 +236,19 @@ public class DatanodeWebHdfsMethods {
     {
       final Configuration conf = new Configuration(datanode.getConf());
       final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
-      final DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
       final int b = bufferSize.getValue(conf);
-      final FSDataOutputStream out = dfsclient.append(fullpath, b, null, null);
+      DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
+      FSDataOutputStream out = null;
       try {
+        out = dfsclient.append(fullpath, b, null, null);
         IOUtils.copyBytes(in, out, b);
-      } finally {
         out.close();
+        out = null;
+        dfsclient.close();
+        dfsclient = null;
+      } finally {
+        IOUtils.cleanup(LOG, out);
+        IOUtils.cleanup(LOG, dfsclient);
       }
       return Response.ok().type(MediaType.APPLICATION_JSON).build();
     }
@@ -193,6 +259,24 @@ public class DatanodeWebHdfsMethods {
     });
   }
 
+  /** Handle HTTP GET request for the root. */
+  @GET
+  @Path("/")
+  @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
+  public Response getRoot(
+      @Context final UserGroupInformation ugi,
+      @QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
+          final GetOpParam op,
+      @QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT)
+          final OffsetParam offset,
+      @QueryParam(LengthParam.NAME) @DefaultValue(LengthParam.DEFAULT)
+          final LengthParam length,
+      @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
+          final BufferSizeParam bufferSize
+      ) throws IOException, InterruptedException {
+    return get(ugi, ROOT, op, offset, length, bufferSize); 
+  }
+
   /** Handle HTTP GET request. */
   @GET
   @Path("{" + UriFsPathParam.NAME + ":.*}")
@@ -208,13 +292,16 @@ public class DatanodeWebHdfsMethods {
           final LengthParam length,
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
           final BufferSizeParam bufferSize
-      ) throws IOException, URISyntaxException, InterruptedException {
+      ) throws IOException, InterruptedException {
 
     if (LOG.isTraceEnabled()) {
       LOG.trace(op + ": " + path + ", ugi=" + ugi
           + Param.toSortedString(", ", offset, length, bufferSize));
     }
 
+    //clear content type
+    response.setContentType(null);
+
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
       public Response run() throws IOException {
@@ -223,32 +310,62 @@ public class DatanodeWebHdfsMethods {
     final DataNode datanode = (DataNode)context.getAttribute("datanode");
     final Configuration conf = new Configuration(datanode.getConf());
     final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
-    final DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
 
     switch(op.getValue()) {
     case OPEN:
     {
       final int b = bufferSize.getValue(conf);
-      final DFSDataInputStream in = new DFSClient.DFSDataInputStream(
-          dfsclient.open(fullpath, b, true));
-      in.seek(offset.getValue());
-
+      final DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
+      DFSDataInputStream in = null;
+      try {
+        in = new DFSClient.DFSDataInputStream(
+            dfsclient.open(fullpath, b, true));
+        in.seek(offset.getValue());
+      } catch(IOException ioe) {
+        IOUtils.cleanup(LOG, in);
+        IOUtils.cleanup(LOG, dfsclient);
+        throw ioe;
+      }
+      final DFSDataInputStream dis = in;
       final StreamingOutput streaming = new StreamingOutput() {
         @Override
         public void write(final OutputStream out) throws IOException {
           final Long n = length.getValue();
-          if (n == null) {
-            IOUtils.copyBytes(in, out, b);
-          } else {
-            IOUtils.copyBytes(in, out, n, false);
+          DFSDataInputStream dfsin = dis;
+          DFSClient client = dfsclient;
+          try {
+            if (n == null) {
+              IOUtils.copyBytes(dfsin, out, b);
+            } else {
+              IOUtils.copyBytes(dfsin, out, n, false);
+            }
+            dfsin.close();
+            dfsin = null;
+            dfsclient.close();
+            client = null;
+          } finally {
+            IOUtils.cleanup(LOG, dfsin);
+            IOUtils.cleanup(LOG, client);
           }
         }
       };
-      return Response.ok(streaming).type(MediaType.APPLICATION_OCTET_STREAM).build();
+
+      final int status = offset.getValue() == 0?
+          HttpServletResponse.SC_OK: HttpServletResponse.SC_PARTIAL_CONTENT;
+      return Response.status(status).entity(streaming).type(
+          MediaType.APPLICATION_OCTET_STREAM).build();
     }
     case GETFILECHECKSUM:
     {
-      final MD5MD5CRC32FileChecksum checksum = dfsclient.getFileChecksum(fullpath);
+      MD5MD5CRC32FileChecksum checksum = null;
+      DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
+      try {
+        checksum = dfsclient.getFileChecksum(fullpath);
+        dfsclient.close();
+        dfsclient = null;
+      } finally {
+        IOUtils.cleanup(LOG, dfsclient);
+      }
       final String js = JsonUtil.toJsonString(checksum);
       return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
     }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java Wed Nov  2 05:34:31 2011
@@ -77,6 +77,9 @@ class BackupJournalManager implements Jo
   public void recoverUnfinalizedSegments() throws IOException {
   }
 
+  @Override 
+  public void close() throws IOException {}
+
   public boolean matchesRegistration(NamenodeRegistration bnReg) {
     return bnReg.getAddress().equals(this.bnReg.getAddress());
   }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Wed Nov  2 05:34:31 2011
@@ -28,10 +28,13 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.protocolR23Compatible.JournalProtocolServerSideTranslatorR23;
+import org.apache.hadoop.hdfs.protocolR23Compatible.JournalWireProtocol;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@@ -135,6 +138,16 @@ public class BackupNode extends NameNode
                  CommonConfigurationKeys.FS_TRASH_INTERVAL_DEFAULT);
     NamespaceInfo nsInfo = handshake(conf);
     super.initialize(conf);
+
+    if (false == namesystem.isInSafeMode()) {
+      namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+    }
+
+    // Backup node should never do lease recovery,
+    // therefore lease hard limit should never expire.
+    namesystem.leaseManager.setLeasePeriod(
+        HdfsConstants.LEASE_SOFTLIMIT_PERIOD, Long.MAX_VALUE);
+    
     clusterId = nsInfo.getClusterID();
     blockPoolId = nsInfo.getBlockPoolID();
 
@@ -171,7 +184,9 @@ public class BackupNode extends NameNode
       }
     }
     // Stop the RPC client
-    RPC.stopProxy(namenode);
+    if (namenode != null) {
+      RPC.stopProxy(namenode);
+    }
     namenode = null;
     // Stop the checkpoint manager
     if(checkpointManager != null) {
@@ -181,14 +196,23 @@ public class BackupNode extends NameNode
     // Stop name-node threads
     super.stop();
   }
-
-  static class BackupNodeRpcServer extends NameNodeRpcServer implements JournalProtocol {
+  
+  /* @Override */// NameNode
+  public boolean setSafeMode(SafeModeAction action) throws IOException {
+    throw new UnsupportedActionException("setSafeMode");
+  }
+  
+  static class BackupNodeRpcServer extends NameNodeRpcServer implements
+      JournalProtocol {
     private final String nnRpcAddress;
     
     private BackupNodeRpcServer(Configuration conf, BackupNode nn)
         throws IOException {
       super(conf, nn);
-      this.server.addProtocol(JournalProtocol.class, this);
+      JournalProtocolServerSideTranslatorR23 journalProtocolTranslator = 
+          new JournalProtocolServerSideTranslatorR23(this);
+      this.clientRpcServer.addProtocol(JournalWireProtocol.class,
+          journalProtocolTranslator);
       nnRpcAddress = nn.nnRpcAddress;
     }
 
@@ -197,9 +221,8 @@ public class BackupNode extends NameNode
         throws IOException {
       if (protocol.equals(JournalProtocol.class.getName())) {
         return JournalProtocol.versionID;
-      } else {
-        return super.getProtocolVersion(protocol, clientVersion);
       }
+      return super.getProtocolVersion(protocol, clientVersion);
     }
 
     /////////////////////////////////////////////////////
@@ -250,7 +273,7 @@ public class BackupNode extends NameNode
     // connect to name node
     InetSocketAddress nnAddress = NameNode.getServiceAddress(conf, true);
     this.namenode =
-      (NamenodeProtocol) RPC.waitForProxy(NamenodeProtocol.class,
+      RPC.waitForProxy(NamenodeProtocol.class,
           NamenodeProtocol.versionID, nnAddress, conf);
     this.nnRpcAddress = getHostPortString(nnAddress);
     this.nnHttpAddress = getHostPortString(super.getHttpServerAddress(conf));
@@ -264,7 +287,9 @@ public class BackupNode extends NameNode
         LOG.info("Problem connecting to server: " + nnAddress);
         try {
           Thread.sleep(1000);
-        } catch (InterruptedException ie) {}
+        } catch (InterruptedException ie) {
+          LOG.warn("Encountered exception ", e);
+        }
       }
     }
     return nsInfo;
@@ -313,7 +338,9 @@ public class BackupNode extends NameNode
         LOG.info("Problem connecting to name-node: " + nnRpcAddress);
         try {
           Thread.sleep(1000);
-        } catch (InterruptedException ie) {}
+        } catch (InterruptedException ie) {
+          LOG.warn("Encountered exception ", e);
+        }
       }
     }
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java Wed Nov  2 05:34:31 2011
@@ -37,9 +37,7 @@ public class CheckpointSignature extends
                       implements WritableComparable<CheckpointSignature> {
   private static final String FIELD_SEPARATOR = ":";
   private static final int NUM_FIELDS = 7;
-
   String blockpoolID = "";
-  
   long mostRecentCheckpointTxId;
   long curSegmentTxId;
 
@@ -67,6 +65,14 @@ public class CheckpointSignature extends
     blockpoolID = fields[i++];
   }
 
+  public CheckpointSignature(StorageInfo info, String blockpoolID,
+      long mostRecentCheckpointTxId, long curSegmentTxId) {
+    super(info);
+    this.blockpoolID = blockpoolID;
+    this.mostRecentCheckpointTxId = mostRecentCheckpointTxId;
+    this.curSegmentTxId = curSegmentTxId;
+  }
+
   /**
    * Get the cluster id from CheckpointSignature
    * @return the cluster id
@@ -83,6 +89,14 @@ public class CheckpointSignature extends
     return blockpoolID;
   }
 
+  public long getMostRecentCheckpointTxId() {
+    return mostRecentCheckpointTxId;
+  }
+
+  public long getCurSegmentTxId() {
+    return curSegmentTxId;
+  }
+
   /**
    * Set the block pool id of CheckpointSignature.
    * 

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Wed Nov  2 05:34:31 2011
@@ -241,8 +241,12 @@ class Checkpointer extends Daemon {
   
       rollForwardByApplyingLogs(manifest, bnImage, backupNode.getNamesystem());
     }
-
+    
     long txid = bnImage.getLastAppliedTxId();
+    
+    backupNode.namesystem.dir.setReady();
+    backupNode.namesystem.setBlockTotal();
+    
     bnImage.saveFSImageInAllDirs(backupNode.getNamesystem(), txid);
     bnStorage.writeAll();
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Wed Nov  2 05:34:31 2011
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
 import java.util.Arrays;
 
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocolR23Compatible.JournalProtocolTranslatorR23;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@@ -56,8 +57,7 @@ class EditLogBackupOutputStream extends 
       NetUtils.createSocketAddr(bnRegistration.getAddress());
     try {
       this.backupNode =
-        RPC.getProxy(JournalProtocol.class,
-            JournalProtocol.versionID, bnAddress, new HdfsConfiguration());
+          new JournalProtocolTranslatorR23(bnAddress, new HdfsConfiguration());
     } catch(IOException e) {
       Storage.LOG.error("Error connecting to: " + bnAddress, e);
       throw e;

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Wed Nov  2 05:34:31 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Options.Rena
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -158,6 +159,11 @@ public class FSDirectory implements Clos
    */
   void imageLoadComplete() {
     Preconditions.checkState(!ready, "FSDirectory already loaded");
+    setReady();
+  }
+
+  void setReady() {
+    if(ready) return;
     writeLock();
     try {
       setReady(true);
@@ -233,7 +239,7 @@ public class FSDirectory implements Clos
                                  clientMachine, clientNode);
     writeLock();
     try {
-      newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE, false);
+      newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE);
     } finally {
       writeUnlock();
     }
@@ -276,7 +282,7 @@ public class FSDirectory implements Clos
     writeLock();
     try {
       try {
-        newNode = addNode(path, newNode, diskspace, false);
+        newNode = addNode(path, newNode, diskspace);
         if(newNode != null && blocks != null) {
           int nrBlocks = blocks.length;
           // Add file->block mapping
@@ -303,7 +309,7 @@ public class FSDirectory implements Clos
     try {
       try {
         newParent = rootDir.addToParent(src, newNode, parentINode,
-                                        false, propagateModTime);
+                                        propagateModTime);
         cacheName(newNode);
       } catch (FileNotFoundException e) {
         return null;
@@ -576,7 +582,7 @@ public class FSDirectory implements Clos
       
       // add src to the destination
       dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
-          srcChild, UNKNOWN_DISK_SPACE, false);
+          srcChild, UNKNOWN_DISK_SPACE);
       if (dstChild != null) {
         srcChild = null;
         if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -593,7 +599,7 @@ public class FSDirectory implements Clos
         // put it back
         srcChild.setLocalName(srcChildName);
         addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, srcChild, 
-            UNKNOWN_DISK_SPACE, false);
+            UNKNOWN_DISK_SPACE);
       }
     }
     NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
@@ -731,7 +737,7 @@ public class FSDirectory implements Clos
       removedSrc.setLocalName(dstComponents[dstInodes.length - 1]);
       // add src as dst to complete rename
       dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
-          removedSrc, UNKNOWN_DISK_SPACE, false);
+          removedSrc, UNKNOWN_DISK_SPACE);
 
       int filesDeleted = 0;
       if (dstChild != null) {
@@ -759,13 +765,13 @@ public class FSDirectory implements Clos
         // Rename failed - restore src
         removedSrc.setLocalName(srcChildName);
         addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, removedSrc, 
-            UNKNOWN_DISK_SPACE, false);
+            UNKNOWN_DISK_SPACE);
       }
       if (removedDst != null) {
         // Rename failed - restore dst
         removedDst.setLocalName(dstChildName);
         addChildNoQuotaCheck(dstInodes, dstInodes.length - 1, removedDst, 
-            UNKNOWN_DISK_SPACE, false);
+            UNKNOWN_DISK_SPACE);
       }
     }
     NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
@@ -1224,13 +1230,21 @@ public class FSDirectory implements Clos
    * Get {@link INode} associated with the file.
    */
   INodeFile getFileINode(String src) throws UnresolvedLinkException {
+    INode inode = getINode(src);
+    if (inode == null || inode.isDirectory())
+      return null;
+    assert !inode.isLink();
+    return (INodeFile) inode;
+  }
+  
+  /**
+   * Get {@link INode} associated with the file / directory.
+   */
+  INode getINode(String src) throws UnresolvedLinkException {
     readLock();
     try {
-      INode inode = rootDir.getNode(src, true);
-      if (inode == null || inode.isDirectory())
-        return null;
-      assert !inode.isLink();      
-      return (INodeFile)inode;
+      INode iNode = rootDir.getNode(src, true);
+      return iNode;
     } finally {
       readUnlock();
     }
@@ -1436,9 +1450,10 @@ public class FSDirectory implements Clos
 
    * @param src string representation of the path to the directory
    * @param permissions the permission of the directory
-   * @param inheritPermission if the permission of the directory should inherit
-   *                          from its parent or not. The automatically created
-   *                          ones always inherit its permission from its parent
+   * @param isAutocreate if the permission of the directory should inherit
+   *                          from its parent or not. u+wx is implicitly added to
+   *                          the automatically created directories, and to the
+   *                          given directory if inheritPermission is true
    * @param now creation time
    * @return true if the operation succeeds false otherwise
    * @throws FileNotFoundException if an ancestor or itself is a file
@@ -1454,6 +1469,7 @@ public class FSDirectory implements Clos
     String[] names = INode.getPathNames(src);
     byte[][] components = INode.getPathComponents(names);
     INode[] inodes = new INode[components.length];
+    final int lastInodeIndex = inodes.length - 1;
 
     writeLock();
     try {
@@ -1470,12 +1486,44 @@ public class FSDirectory implements Clos
         }
       }
 
+      // default to creating parent dirs with the given perms
+      PermissionStatus parentPermissions = permissions;
+
+      // if not inheriting and it's the last inode, there's no use in
+      // computing perms that won't be used
+      if (inheritPermission || (i < lastInodeIndex)) {
+        // if inheriting (ie. creating a file or symlink), use the parent dir,
+        // else the supplied permissions
+        // NOTE: the permissions of the auto-created directories violate posix
+        FsPermission parentFsPerm = inheritPermission
+            ? inodes[i-1].getFsPermission() : permissions.getPermission();
+        
+        // ensure that the permissions allow user write+execute
+        if (!parentFsPerm.getUserAction().implies(FsAction.WRITE_EXECUTE)) {
+          parentFsPerm = new FsPermission(
+              parentFsPerm.getUserAction().or(FsAction.WRITE_EXECUTE),
+              parentFsPerm.getGroupAction(),
+              parentFsPerm.getOtherAction()
+          );
+        }
+        
+        if (!parentPermissions.getPermission().equals(parentFsPerm)) {
+          parentPermissions = new PermissionStatus(
+              parentPermissions.getUserName(),
+              parentPermissions.getGroupName(),
+              parentFsPerm
+          );
+          // when inheriting, use same perms for entire path
+          if (inheritPermission) permissions = parentPermissions;
+        }
+      }
+      
       // create directories beginning from the first null index
       for(; i < inodes.length; i++) {
         pathbuilder.append(Path.SEPARATOR + names[i]);
         String cur = pathbuilder.toString();
-        unprotectedMkdir(inodes, i, components[i], permissions,
-            inheritPermission || i != components.length-1, now);
+        unprotectedMkdir(inodes, i, components[i],
+            (i < lastInodeIndex) ? parentPermissions : permissions, now);
         if (inodes[i] == null) {
           return false;
         }
@@ -1506,7 +1554,7 @@ public class FSDirectory implements Clos
 
     rootDir.getExistingPathINodes(components, inodes, false);
     unprotectedMkdir(inodes, inodes.length-1, components[inodes.length-1],
-        permissions, false, timestamp);
+        permissions, timestamp);
     return inodes[inodes.length-1];
   }
 
@@ -1515,19 +1563,19 @@ public class FSDirectory implements Clos
    * All ancestors exist. Newly created one stored at index pos.
    */
   private void unprotectedMkdir(INode[] inodes, int pos,
-      byte[] name, PermissionStatus permission, boolean inheritPermission,
+      byte[] name, PermissionStatus permission,
       long timestamp) throws QuotaExceededException {
     assert hasWriteLock();
     inodes[pos] = addChild(inodes, pos, 
         new INodeDirectory(name, permission, timestamp),
-        -1, inheritPermission );
+        -1);
   }
   
   /** Add a node child to the namespace. The full path name of the node is src.
    * childDiskspace should be -1, if unknown. 
    * QuotaExceededException is thrown if it violates quota limit */
   private <T extends INode> T addNode(String src, T child, 
-        long childDiskspace, boolean inheritPermission) 
+        long childDiskspace) 
   throws QuotaExceededException, UnresolvedLinkException {
     byte[][] components = INode.getPathComponents(src);
     byte[] path = components[components.length-1];
@@ -1537,8 +1585,7 @@ public class FSDirectory implements Clos
     writeLock();
     try {
       rootDir.getExistingPathINodes(components, inodes, false);
-      return addChild(inodes, inodes.length-1, child, childDiskspace,
-                      inheritPermission);
+      return addChild(inodes, inodes.length-1, child, childDiskspace);
     } finally {
       writeUnlock();
     }
@@ -1666,7 +1713,7 @@ public class FSDirectory implements Clos
    * Its ancestors are stored at [0, pos-1]. 
    * QuotaExceededException is thrown if it violates quota limit */
   private <T extends INode> T addChild(INode[] pathComponents, int pos,
-      T child, long childDiskspace, boolean inheritPermission,
+      T child, long childDiskspace,
       boolean checkQuota) throws QuotaExceededException {
 	// The filesystem limits are not really quotas, so this check may appear
 	// odd.  It's because a rename operation deletes the src, tries to add
@@ -1689,7 +1736,7 @@ public class FSDirectory implements Clos
       throw new NullPointerException("Panic: parent does not exist");
     }
     T addedNode = ((INodeDirectory)pathComponents[pos-1]).addChild(
-        child, inheritPermission, true);
+        child, true);
     if (addedNode == null) {
       updateCount(pathComponents, pos, -counts.getNsCount(), 
           -childDiskspace, true);
@@ -1698,18 +1745,16 @@ public class FSDirectory implements Clos
   }
 
   private <T extends INode> T addChild(INode[] pathComponents, int pos,
-      T child, long childDiskspace, boolean inheritPermission)
+      T child, long childDiskspace)
       throws QuotaExceededException {
-    return addChild(pathComponents, pos, child, childDiskspace,
-        inheritPermission, true);
+    return addChild(pathComponents, pos, child, childDiskspace, true);
   }
   
   private <T extends INode> T addChildNoQuotaCheck(INode[] pathComponents,
-      int pos, T child, long childDiskspace, boolean inheritPermission) {
+      int pos, T child, long childDiskspace) {
     T inode = null;
     try {
-      inode = addChild(pathComponents, pos, child, childDiskspace,
-          inheritPermission, false);
+      inode = addChild(pathComponents, pos, child, childDiskspace, false);
     } catch (QuotaExceededException e) {
       NameNode.LOG.warn("FSDirectory.addChildNoQuotaCheck - unexpected", e); 
     }
@@ -1934,9 +1979,9 @@ public class FSDirectory implements Clos
   }
 
   /**
-   * Sets the access time on the file. Logs it in the transaction log.
+   * Sets the access time on the file/directory. Logs it in the transaction log.
    */
-  void setTimes(String src, INodeFile inode, long mtime, long atime, boolean force) {
+  void setTimes(String src, INode inode, long mtime, long atime, boolean force) {
     boolean status = false;
     writeLock();
     try {
@@ -1952,11 +1997,11 @@ public class FSDirectory implements Clos
   boolean unprotectedSetTimes(String src, long mtime, long atime, boolean force) 
       throws UnresolvedLinkException {
     assert hasWriteLock();
-    INodeFile inode = getFileINode(src);
+    INode inode = getINode(src);
     return unprotectedSetTimes(src, inode, mtime, atime, force);
   }
 
-  private boolean unprotectedSetTimes(String src, INodeFile inode, long mtime,
+  private boolean unprotectedSetTimes(String src, INode inode, long mtime,
                                       long atime, boolean force) {
     assert hasWriteLock();
     boolean status = false;
@@ -2119,7 +2164,7 @@ public class FSDirectory implements Clos
     assert hasWriteLock();
     INodeSymlink newNode = new INodeSymlink(target, modTime, atime, perm);
     try {
-      newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE, false);
+      newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE);
     } catch (UnresolvedLinkException e) {
       /* All UnresolvedLinkExceptions should have been resolved by now, but we
        * should re-throw them in case that changes so they are not swallowed 

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Wed Nov  2 05:34:31 2011
@@ -215,6 +215,12 @@ public class FSEditLog  {
       waitForSyncToFinish();
       endCurrentLogSegment(true);
     }
+    
+    try {
+      journalSet.close();
+    } catch (IOException ioe) {
+      LOG.warn("Error closing journalSet", ioe);
+    }
 
     state = State.CLOSED;
   }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Wed Nov  2 05:34:31 2011
@@ -998,18 +998,12 @@ public class FSImage implements Closeabl
   /**
    * End checkpoint.
    * <p>
-   * Rename uploaded checkpoint to the new image;
-   * purge old edits file;
-   * rename edits.new to edits;
-   * redirect edit log streams to the new edits;
-   * update checkpoint time if the remote node is a checkpoint only node.
+   * Validate the current storage info with the given signature.
    * 
-   * @param sig
-   * @param remoteNNRole
-   * @throws IOException
+   * @param sig to validate the current storage info against
+   * @throws IOException if the checkpoint fields are inconsistent
    */
-  void endCheckpoint(CheckpointSignature sig,
-                     NamenodeRole remoteNNRole) throws IOException {
+  void endCheckpoint(CheckpointSignature sig) throws IOException {
     sig.validateStorageInfo(this);
   }
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Nov  2 05:34:31 2011
@@ -17,6 +17,45 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_KEY;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 
 import java.io.BufferedWriter;
@@ -68,7 +107,6 @@ import org.apache.hadoop.fs.UnresolvedLi
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
@@ -119,7 +157,6 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.lib.MutableCounterInt;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
@@ -203,8 +240,6 @@ public class FSNamesystem implements Nam
   private UserGroupInformation fsOwner;
   private String supergroup;
   private PermissionStatus defaultPermission;
-  // FSNamesystemMetrics counter variables
-  @Metric private MutableCounterInt expiredHeartbeats;
   
   // Scan interval is not configurable.
   private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL =
@@ -312,7 +347,7 @@ public class FSNamesystem implements Nam
         DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
         DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT);
     this.systemStart = now();
-    this.blockManager = new BlockManager(this, conf);
+    this.blockManager = new BlockManager(this, this, conf);
     this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
     this.fsLock = new ReentrantReadWriteLock(true); // fair locking
     setConfigurationParameters(conf);
@@ -989,7 +1024,7 @@ public class FSNamesystem implements Nam
       if (isPermissionEnabled) {
         checkPathAccess(src, FsAction.WRITE);
       }
-      INodeFile inode = dir.getFileINode(src);
+      INode inode = dir.getINode(src);
       if (inode != null) {
         dir.setTimes(src, inode, mtime, atime, true);
         if (auditLog.isInfoEnabled() && isExternalInvocation()) {
@@ -999,7 +1034,7 @@ public class FSNamesystem implements Nam
                         "setTimes", src, null, stat);
         }
       } else {
-        throw new FileNotFoundException("File " + src + " does not exist.");
+        throw new FileNotFoundException("File/Directory " + src + " does not exist.");
       }
     } finally {
       writeUnlock();
@@ -2675,9 +2710,9 @@ public class FSNamesystem implements Nam
     return blockManager.getMissingBlocksCount();
   }
   
-  /** Increment expired heartbeat counter. */
-  public void incrExpiredHeartbeats() {
-    expiredHeartbeats.incr();
+  @Metric({"ExpiredHeartbeats", "Number of expired heartbeats"})
+  public int getExpiredHeartbeats() {
+    return datanodeStatistics.getExpiredHeartbeats();
   }
 
   /** @see ClientProtocol#getStats() */
@@ -2905,6 +2940,9 @@ public class FSNamesystem implements Nam
     private SafeModeInfo(Configuration conf) {
       this.threshold = conf.getFloat(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
           DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT);
+      if(threshold > 1.0) {
+        LOG.warn("The threshold value should't be greater than 1, threshold: " + threshold);
+      }
       this.datanodeThreshold = conf.getInt(
         DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY,
         DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT);
@@ -3188,7 +3226,7 @@ public class FSNamesystem implements Nam
           msg += String.format(
             "The reported blocks %d needs additional %d"
             + " blocks to reach the threshold %.4f of total blocks %d.",
-            blockSafe, (blockThreshold - blockSafe), threshold, blockTotal);
+            blockSafe, (blockThreshold - blockSafe) + 1, threshold, blockTotal);
         }
         if (numLive < datanodeThreshold) {
           if (!"".equals(msg)) {
@@ -3197,7 +3235,7 @@ public class FSNamesystem implements Nam
           msg += String.format(
             "The number of live datanodes %d needs an additional %d live "
             + "datanodes to reach the minimum number %d.",
-            numLive, datanodeThreshold - numLive, datanodeThreshold);
+            numLive, (datanodeThreshold - numLive) + 1 , datanodeThreshold);
         }
         msg += " " + leaveMsg;
       } else {
@@ -3362,7 +3400,7 @@ public class FSNamesystem implements Nam
   /**
    * Set the total number of blocks in the system. 
    */
-  private void setBlockTotal() {
+  void setBlockTotal() {
     // safeMode is volatile, and may be set to null at any time
     SafeModeInfo safeMode = this.safeMode;
     if (safeMode == null)
@@ -3508,15 +3546,15 @@ public class FSNamesystem implements Nam
 
   void endCheckpoint(NamenodeRegistration registration,
                             CheckpointSignature sig) throws IOException {
-    writeLock();
+    readLock();
     try {
       if (isInSafeMode()) {
         throw new SafeModeException("Checkpoint not ended", safeMode);
       }
       LOG.info("End checkpoint for " + registration.getAddress());
-      getFSImage().endCheckpoint(sig, registration.getRole());
+      getFSImage().endCheckpoint(sig);
     } finally {
-      writeUnlock();
+      readUnlock();
     }
   }
 
@@ -4436,4 +4474,15 @@ public class FSNamesystem implements Nam
   public BlockManager getBlockManager() {
     return blockManager;
   }
+  
+  /**
+   * Verifies that the given identifier and password are valid and match.
+   * @param identifier Token identifier.
+   * @param password Password in the token.
+   * @throws InvalidToken
+   */
+  public synchronized void verifyToken(DelegationTokenIdentifier identifier,
+      byte[] password) throws InvalidToken {
+    getDelegationTokenSecretManager().verifyToken(identifier, password);
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Wed Nov  2 05:34:31 2011
@@ -70,6 +70,9 @@ class FileJournalManager implements Jour
     this.sd = sd;
   }
 
+  @Override 
+  public void close() throws IOException {}
+
   @Override
   synchronized public EditLogOutputStream startLogSegment(long txid) 
       throws IOException {

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Wed Nov  2 05:34:31 2011
@@ -304,7 +304,6 @@ public abstract class INode implements C
    * Always set the last modification time of inode.
    */
   void setModificationTimeForce(long modtime) {
-    assert !isDirectory();
     this.modificationTime = modtime;
   }
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Wed Nov  2 05:34:31 2011
@@ -261,25 +261,13 @@ class INodeDirectory extends INode {
    * Add a child inode to the directory.
    * 
    * @param node INode to insert
-   * @param inheritPermission inherit permission from parent?
    * @param setModTime set modification time for the parent node
    *                   not needed when replaying the addition and 
    *                   the parent already has the proper mod time
    * @return  null if the child with this name already exists; 
    *          node, otherwise
    */
-  <T extends INode> T addChild(final T node, boolean inheritPermission,
-                                              boolean setModTime) {
-    if (inheritPermission) {
-      FsPermission p = getFsPermission();
-      //make sure the  permission has wx for the user
-      if (!p.getUserAction().implies(FsAction.WRITE_EXECUTE)) {
-        p = new FsPermission(p.getUserAction().or(FsAction.WRITE_EXECUTE),
-            p.getGroupAction(), p.getOtherAction());
-      }
-      node.setPermission(p);
-    }
-
+  <T extends INode> T addChild(final T node, boolean setModTime) {
     if (children == null) {
       children = new ArrayList<INode>(DEFAULT_FILES_PER_DIRECTORY);
     }
@@ -298,30 +286,21 @@ class INodeDirectory extends INode {
   }
 
   /**
-   * Equivalent to addNode(path, newNode, false).
-   * @see #addNode(String, INode, boolean)
-   */
-  <T extends INode> T addNode(String path, T newNode) 
-    throws FileNotFoundException, UnresolvedLinkException {
-    return addNode(path, newNode, false);
-  }
-  /**
    * Add new INode to the file tree.
    * Find the parent and insert 
    * 
    * @param path file path
    * @param newNode INode to be added
-   * @param inheritPermission If true, copy the parent's permission to newNode.
    * @return null if the node already exists; inserted INode, otherwise
    * @throws FileNotFoundException if parent does not exist or 
    * @throws UnresolvedLinkException if any path component is a symbolic link
    * is not a directory.
    */
-  <T extends INode> T addNode(String path, T newNode, boolean inheritPermission
+  <T extends INode> T addNode(String path, T newNode
       ) throws FileNotFoundException, UnresolvedLinkException  {
     byte[][] pathComponents = getPathComponents(path);        
     if(addToParent(pathComponents, newNode,
-                    inheritPermission, true) == null)
+                    true) == null)
       return null;
     return newNode;
   }
@@ -338,13 +317,12 @@ class INodeDirectory extends INode {
   INodeDirectory addToParent( byte[] localname,
                               INode newNode,
                               INodeDirectory parent,
-                              boolean inheritPermission,
                               boolean propagateModTime
                               ) throws FileNotFoundException, 
                                        UnresolvedLinkException {
     // insert into the parent children list
     newNode.name = localname;
-    if(parent.addChild(newNode, inheritPermission, propagateModTime) == null)
+    if(parent.addChild(newNode, propagateModTime) == null)
       return null;
     return parent;
   }
@@ -380,7 +358,6 @@ class INodeDirectory extends INode {
    */
   INodeDirectory addToParent( byte[][] pathComponents,
                               INode newNode,
-                              boolean inheritPermission,
                               boolean propagateModTime
                             ) throws FileNotFoundException, 
                                      UnresolvedLinkException {
@@ -391,7 +368,7 @@ class INodeDirectory extends INode {
     newNode.name = pathComponents[pathLen-1];
     // insert into the parent children list
     INodeDirectory parent = getParent(pathComponents);
-    if(parent.addChild(newNode, inheritPermission, propagateModTime) == null)
+    if(parent.addChild(newNode, propagateModTime) == null)
       return null;
     return parent;
   }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java Wed Nov  2 05:34:31 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.Closeable;
 import java.io.IOException;
 
 
@@ -27,7 +28,7 @@ import java.io.IOException;
  * each conceptual place of storage corresponds to exactly one instance of
  * this class, which is created when the EditLog is first opened.
  */
-interface JournalManager {
+interface JournalManager extends Closeable {
   /**
    * Begin writing to a new segment of the log stream, which starts at
    * the given transaction ID.
@@ -81,6 +82,11 @@ interface JournalManager {
    */
   void recoverUnfinalizedSegments() throws IOException;
 
+  /**
+   * Close the journal manager, freeing any resources it may hold.
+   */
+  void close() throws IOException;
+
   /** 
    * Indicate that a journal is cannot be used to load a certain range of 
    * edits.

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java Wed Nov  2 05:34:31 2011
@@ -72,11 +72,20 @@ public class JournalSet implements Journ
     /**
      * Closes the stream, also sets it to null.
      */
-    public void close() throws IOException {
+    public void closeStream() throws IOException {
       if (stream == null) return;
       stream.close();
       stream = null;
     }
+
+    /**
+     * Close the Journal and Stream
+     */
+    public void close() throws IOException {
+      closeStream();
+
+      journal.close();
+    }
     
     /**
      * Aborts the stream, also sets it to null.
@@ -145,13 +154,23 @@ public class JournalSet implements Journ
       @Override
       public void apply(JournalAndStream jas) throws IOException {
         if (jas.isActive()) {
-          jas.close();
+          jas.closeStream();
           jas.getManager().finalizeLogSegment(firstTxId, lastTxId);
         }
       }
     }, "finalize log segment " + firstTxId + ", " + lastTxId);
   }
-  
+   
+  @Override
+  public void close() throws IOException {
+    mapJournalsAndReportErrors(new JournalClosure() {
+      @Override
+      public void apply(JournalAndStream jas) throws IOException {
+        jas.close();
+      }
+    }, "close journal");
+  }
+
   
   /**
    * Find the best editlog input stream to read from txid.
@@ -332,7 +351,7 @@ public class JournalSet implements Journ
       mapJournalsAndReportErrors(new JournalClosure() {
         @Override
         public void apply(JournalAndStream jas) throws IOException {
-          jas.close();
+          jas.closeStream();
         }
       }, "close");
     }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java Wed Nov  2 05:34:31 2011
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.na
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Map;
 
 import javax.servlet.ServletContext;
 
@@ -107,8 +109,9 @@ public class NameNodeHttpServer {
                 //add SPNEGO authentication filter for webhdfs
                 final String name = "SPNEGO";
                 final String classname =  AuthFilter.class.getName();
-                final String pathSpec = "/" + WebHdfsFileSystem.PATH_PREFIX + "/*";
-                defineFilter(webAppContext, name, classname, null,
+                final String pathSpec = WebHdfsFileSystem.PATH_PREFIX + "/*";
+                Map<String, String> params = getAuthFilterParams(conf);
+                defineFilter(webAppContext, name, classname, params,
                     new String[]{pathSpec});
                 LOG.info("Added filter '" + name + "' (class=" + classname + ")");
 
@@ -118,6 +121,28 @@ public class NameNodeHttpServer {
                     + ";" + Param.class.getPackage().getName(), pathSpec);
               }
             }
+
+            private Map<String, String> getAuthFilterParams(Configuration conf)
+                throws IOException {
+              Map<String, String> params = new HashMap<String, String>();
+              String principalInConf = conf
+                  .get(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY);
+              if (principalInConf != null && !principalInConf.isEmpty()) {
+                params
+                    .put(
+                        DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY,
+                        SecurityUtil.getServerPrincipal(principalInConf,
+                            infoHost));
+              }
+              String httpKeytab = conf
+                  .get(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY);
+              if (httpKeytab != null && !httpKeytab.isEmpty()) {
+                params.put(
+                    DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY,
+                    httpKeytab);
+              }
+              return params;
+            }
           };
 
           boolean certSSL = conf.getBoolean("dfs.https.enable", false);

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java Wed Nov  2 05:34:31 2011
@@ -32,6 +32,8 @@ import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.common.Util;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * 
  * NameNodeResourceChecker provides a method -
@@ -91,15 +93,16 @@ public class NameNodeResourceChecker {
   }
 
   /**
-   * Return true if disk space is available on all all the configured volumes.
+   * Return true if disk space is available on at least one of the configured
+   * volumes.
    * 
-   * @return True if the configured amount of disk space is available on all
-   *         volumes, false otherwise.
+   * @return True if the configured amount of disk space is available on at
+   *         least one volume, false otherwise.
    * @throws IOException
    */
   boolean hasAvailableDiskSpace()
       throws IOException {
-    return getVolumesLowOnSpace().size() == 0;
+    return getVolumesLowOnSpace().size() < volumes.size();
   }
 
   /**
@@ -127,4 +130,9 @@ public class NameNodeResourceChecker {
     }
     return lowVolumes;
   }
+  
+  @VisibleForTesting
+  void setVolumes(Map<String, DF> volumes) {
+    this.volumes = volumes;
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Wed Nov  2 05:34:31 2011
@@ -60,6 +60,8 @@ import org.apache.hadoop.hdfs.protocol.U
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
+import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol;
+import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeProtocolServerSideTranslatorR23;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
@@ -72,6 +74,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@@ -119,8 +122,8 @@ class NameNodeRpcServer implements Namen
   private final InetSocketAddress serviceRPCAddress;
   
   /** The RPC server that listens to requests from clients */
-  protected final RPC.Server server;
-  protected final InetSocketAddress rpcAddress;
+  protected final RPC.Server clientRpcServer;
+  protected final InetSocketAddress clientRpcAddress;
 
   public NameNodeRpcServer(Configuration conf, NameNode nn)
       throws IOException {
@@ -132,15 +135,31 @@ class NameNodeRpcServer implements Namen
       conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY, 
                   DFS_DATANODE_HANDLER_COUNT_DEFAULT);
     InetSocketAddress socAddr = nn.getRpcServerAddress(conf);
-
+    ClientNamenodeProtocolServerSideTranslatorR23 
+    clientProtocolServerTranslator = 
+        new ClientNamenodeProtocolServerSideTranslatorR23(this);
+    
     InetSocketAddress dnSocketAddr = nn.getServiceRpcServerAddress(conf);
     if (dnSocketAddr != null) {
       int serviceHandlerCount =
         conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
                     DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
-      this.serviceRpcServer = RPC.getServer(NamenodeProtocols.class, this,
-          dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount,
+      // Add all the RPC protocols that the namenode implements
+      this.serviceRpcServer = 
+          RPC.getServer(org.apache.hadoop.hdfs.protocolR23Compatible.
+              ClientNamenodeWireProtocol.class, clientProtocolServerTranslator,
+          dnSocketAddr.getHostName(), dnSocketAddr.getPort(), 
+          serviceHandlerCount,
           false, conf, namesystem.getDelegationTokenSecretManager());
+      this.serviceRpcServer.addProtocol(DatanodeProtocol.class, this);
+      this.serviceRpcServer.addProtocol(NamenodeProtocol.class, this);
+      this.serviceRpcServer.addProtocol(
+          RefreshAuthorizationPolicyProtocol.class, this);
+      this.serviceRpcServer.addProtocol(
+          RefreshUserMappingsProtocol.class, this);
+      this.serviceRpcServer.addProtocol(GetUserMappingsProtocol.class, this);
+      this.serviceRpcServer.addProtocol(HAServiceProtocol.class, this);
+      
       this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
       nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
     } else {
@@ -148,38 +167,40 @@ class NameNodeRpcServer implements Namen
       serviceRPCAddress = null;
     }
     // Add all the RPC protocols that the namenode implements
-    this.server = RPC.getServer(ClientProtocol.class, this,
-                                socAddr.getHostName(), socAddr.getPort(),
-                                handlerCount, false, conf, 
-                                namesystem.getDelegationTokenSecretManager());
-    this.server.addProtocol(DatanodeProtocol.class, this);
-    this.server.addProtocol(NamenodeProtocol.class, this);
-    this.server.addProtocol(RefreshAuthorizationPolicyProtocol.class, this);
-    this.server.addProtocol(RefreshUserMappingsProtocol.class, this);
-    this.server.addProtocol(GetUserMappingsProtocol.class, this);
-    this.server.addProtocol(HAServiceProtocol.class, this);
+    this.clientRpcServer = RPC.getServer(
+            org.apache.hadoop.hdfs.protocolR23Compatible.
+            ClientNamenodeWireProtocol.class,
+            clientProtocolServerTranslator, socAddr.getHostName(),
+            socAddr.getPort(), handlerCount, false, conf,
+            namesystem.getDelegationTokenSecretManager());
+    this.clientRpcServer.addProtocol(DatanodeProtocol.class, this);
+    this.clientRpcServer.addProtocol(NamenodeProtocol.class, this);
+    this.clientRpcServer.addProtocol(
+        RefreshAuthorizationPolicyProtocol.class, this);
+    this.clientRpcServer.addProtocol(RefreshUserMappingsProtocol.class, this);
+    this.clientRpcServer.addProtocol(GetUserMappingsProtocol.class, this);
     
 
     // set service-level authorization security policy
     if (serviceAuthEnabled =
           conf.getBoolean(
             CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
-      this.server.refreshServiceAcl(conf, new HDFSPolicyProvider());
+      this.clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
       if (this.serviceRpcServer != null) {
         this.serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
       }
     }
 
     // The rpc-server port can be ephemeral... ensure we have the correct info
-    this.rpcAddress = this.server.getListenerAddress(); 
-    nn.setRpcServerAddress(conf, rpcAddress);
+    this.clientRpcAddress = this.clientRpcServer.getListenerAddress(); 
+    nn.setRpcServerAddress(conf, clientRpcAddress);
   }
   
   /**
    * Actually start serving requests.
    */
   void start() {
-    server.start();  //start RPC server
+    clientRpcServer.start();  //start RPC server
     if (serviceRpcServer != null) {
       serviceRpcServer.start();      
     }
@@ -189,11 +210,11 @@ class NameNodeRpcServer implements Namen
    * Wait until the RPC server has shut down.
    */
   void join() throws InterruptedException {
-    this.server.join();
+    this.clientRpcServer.join();
   }
   
   void stop() {
-    if(server != null) server.stop();
+    if(clientRpcServer != null) clientRpcServer.stop();
     if(serviceRpcServer != null) serviceRpcServer.stop();
   }
   
@@ -202,7 +223,7 @@ class NameNodeRpcServer implements Namen
   }
 
   InetSocketAddress getRpcAddress() {
-    return rpcAddress;
+    return clientRpcAddress;
   }
   
   @Override // VersionedProtocol
@@ -216,7 +237,8 @@ class NameNodeRpcServer implements Namen
   public long getProtocolVersion(String protocol, 
                                  long clientVersion) throws IOException {
     if (protocol.equals(ClientProtocol.class.getName())) {
-      return ClientProtocol.versionID; 
+      throw new IOException("Old Namenode Client protocol is not supported:" + 
+      protocol + "Switch your clientside to " + ClientNamenodeWireProtocol.class); 
     } else if (protocol.equals(DatanodeProtocol.class.getName())){
       return DatanodeProtocol.versionID;
     } else if (protocol.equals(NamenodeProtocol.class.getName())){
@@ -850,7 +872,7 @@ class NameNodeRpcServer implements Namen
 
     namesystem.getBlockManager().processReport(nodeReg, poolId, blist);
     if (nn.getFSImage().isUpgradeFinalized())
-      return new DatanodeCommand.Finalize(poolId);
+      return new FinalizeCommand(poolId);
     return null;
   }
 
@@ -923,7 +945,7 @@ class NameNodeRpcServer implements Namen
       throw new AuthorizationException("Service Level Authorization not enabled!");
     }
 
-    this.server.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
+    this.clientRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
     if (this.serviceRpcServer != null) {
       this.serviceRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
     }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java Wed Nov  2 05:34:31 2011
@@ -62,7 +62,9 @@ public class ConfiguredFailoverProxyProv
     AddressRpcProxyPair current = proxies.get(currentProxyIndex);
     if (current.namenode == null) {
       try {
-        current.namenode = DFSUtil.createRPCNamenode(current.address, conf, ugi);
+        // TODO(HA): This will create a NN proxy with an underlying retry
+        // proxy. We don't want this.
+        current.namenode = DFSUtil.createNamenode(current.address, conf, ugi);
       } catch (IOException e) {
         LOG.error("Failed to create RPC proxy to NameNode", e);
         throw new RuntimeException(e);



Mime
View raw message