hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [01/35] hadoop git commit: HDFS-9094. Add command line option to ask NameNode reload configuration. (Contributed by Xiaobing Zhou)
Date Fri, 29 Jan 2016 16:29:04 GMT
Repository: hadoop
Updated Branches:
  refs/heads/YARN-1011 6eacdea0e -> c9a09d692


HDFS-9094. Add command line option to ask NameNode reload configuration. (Contributed by Xiaobing
Zhou)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d62b4a4d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d62b4a4d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d62b4a4d

Branch: refs/heads/YARN-1011
Commit: d62b4a4de75edb840df6634f49cb4beb74e3fb07
Parents: 6eacdea
Author: Arpit Agarwal <arp@apache.org>
Authored: Mon Jan 25 12:17:05 2016 -0800
Committer: Arpit Agarwal <arp@apache.org>
Committed: Mon Jan 25 12:17:05 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSUtilClient.java   |   8 +
 .../hdfs/protocol/ReconfigurationProtocol.java  |   4 +
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../ReconfigurationProtocolServerSideUtils.java |   4 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java |  35 +++
 .../hdfs/server/protocol/NamenodeProtocols.java |   2 +
 .../org/apache/hadoop/hdfs/tools/DFSAdmin.java  | 254 +++++++++++++------
 .../apache/hadoop/hdfs/tools/TestDFSAdmin.java  | 162 ++++++++----
 8 files changed, 350 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b4a4d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index 62c5d81..8f6ed14 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -38,9 +38,11 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
 import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolTranslatorPB;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.util.IOUtilsClient;
 import org.apache.hadoop.hdfs.web.WebHdfsConstants;
@@ -496,6 +498,12 @@ public class DFSUtilClient {
     return new ClientDatanodeProtocolTranslatorPB(addr, ticket, conf, factory);
   }
 
+  public static ReconfigurationProtocol createReconfigurationProtocolProxy(
+      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+      SocketFactory factory) throws IOException {
+    return new ReconfigurationProtocolTranslatorPB(addr, ticket, conf, factory);
+  }
+
   /**
    * Creates a new KeyProvider from the given Configuration.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b4a4d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReconfigurationProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReconfigurationProtocol.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReconfigurationProtocol.java
index 75dc877..8370438 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReconfigurationProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReconfigurationProtocol.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.ReconfigurationTaskStatus;
+import org.apache.hadoop.io.retry.Idempotent;
 
 /**********************************************************************
  * ReconfigurationProtocol is used by HDFS admin to reload configuration
@@ -39,16 +40,19 @@ public interface ReconfigurationProtocol {
   /**
    * Asynchronously reload configuration on disk and apply changes.
    */
+  @Idempotent
   void startReconfiguration() throws IOException;
 
   /**
    * Get the status of the previously issued reconfig task.
    * @see {@link org.apache.hadoop.conf.ReconfigurationTaskStatus}.
    */
+  @Idempotent
   ReconfigurationTaskStatus getReconfigurationStatus() throws IOException;
 
   /**
    * Get a list of allowed properties for reconfiguration.
    */
+  @Idempotent
   List<String> listReconfigurableProperties() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b4a4d/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 49b4d8a..e5285b6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -953,6 +953,9 @@ Release 2.9.0 - UNRELEASED
     HDFS-9674. The HTrace span for OpWriteBlock should record the maxWriteToDisk
     time. (cmccabe via zhz)
 
+    HDFS-9094. Add command line option to ask NameNode reload
+    configuration. (Xiaobing Zhou via Arpit Agarwal)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b4a4d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ReconfigurationProtocolServerSideUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ReconfigurationProtocolServerSideUtils.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ReconfigurationProtocolServerSideUtils.java
index b2be9cd..9e24204 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ReconfigurationProtocolServerSideUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ReconfigurationProtocolServerSideUtils.java
@@ -41,9 +41,7 @@ public final class ReconfigurationProtocolServerSideUtils {
           List<String> reconfigurableProperties) {
     ListReconfigurablePropertiesResponseProto.Builder builder =
         ListReconfigurablePropertiesResponseProto.newBuilder();
-    for (String name : reconfigurableProperties) {
-      builder.addName(name);
-    }
+    builder.addAllName(reconfigurableProperties);
     return builder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b4a4d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 7785260..c1646c5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -42,6 +42,8 @@ import com.google.common.collect.Lists;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationException;
+import org.apache.hadoop.conf.ReconfigurationTaskStatus;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
 import org.apache.hadoop.fs.CacheFlag;
@@ -111,12 +113,15 @@ import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
+import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ReconfigurationProtocolService;
 import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -286,6 +291,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
     BlockingService haPbService = HAServiceProtocolService
         .newReflectiveBlockingService(haServiceProtocolXlator);
 
+    ReconfigurationProtocolServerSideTranslatorPB reconfigurationProtocolXlator
+        = new ReconfigurationProtocolServerSideTranslatorPB(this);
+    BlockingService reconfigurationPbService = ReconfigurationProtocolService
+        .newReflectiveBlockingService(reconfigurationProtocolXlator);
+
     TraceAdminProtocolServerSideTranslatorPB traceAdminXlator =
         new TraceAdminProtocolServerSideTranslatorPB(this);
     BlockingService traceAdminService = TraceAdminService
@@ -319,6 +329,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
       // Add all the RPC protocols that the namenode implements
       DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
           serviceRpcServer);
+      DFSUtil.addPBProtocol(conf, ReconfigurationProtocolPB.class,
+          reconfigurationPbService, serviceRpcServer);
       DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
           serviceRpcServer);
       DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
@@ -403,6 +415,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
     // Add all the RPC protocols that the namenode implements
     DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
         clientRpcServer);
+    DFSUtil.addPBProtocol(conf, ReconfigurationProtocolPB.class,
+        reconfigurationPbService, clientRpcServer);
     DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
         clientRpcServer);
     DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
@@ -2173,4 +2187,25 @@ class NameNodeRpcServer implements NamenodeProtocols {
     checkNNStartup();
     return namesystem.getErasureCodingPolicy(src);
   }
+
+  @Override // ReconfigurationProtocol
+  public void startReconfiguration() {
+    throw new UnsupportedOperationException(
+        "Namenode startReconfiguration is not implemented.",
+        new ReconfigurationException());
+  }
+
+  @Override // ReconfigurationProtocol
+  public ReconfigurationTaskStatus getReconfigurationStatus() {
+    throw new UnsupportedOperationException(
+        " Namenode getReconfigurationStatus is not implemented.",
+        new ReconfigurationException());
+  }
+
+  @Override // ReconfigurationProtocol
+  public List<String> listReconfigurableProperties() {
+    throw new UnsupportedOperationException(
+        " Namenode listReconfigurableProperties is not implemented.",
+        new ReconfigurationException());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b4a4d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
index 23b6f2e..4a3d83d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.protocol;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.security.RefreshUserMappingsProtocol;
 import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
@@ -35,6 +36,7 @@ public interface NamenodeProtocols
           DatanodeProtocol,
           NamenodeProtocol,
           RefreshAuthorizationPolicyProtocol,
+          ReconfigurationProtocol,
           RefreshUserMappingsProtocol,
           RefreshCallQueueProtocol,
           GenericRefreshProtocol,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b4a4d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
index 5da3bc5..9c782e9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
@@ -37,6 +37,7 @@ import java.util.TreeSet;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -69,6 +70,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -414,7 +416,8 @@ public class DFSAdmin extends FsShell {
     "\t[-refreshSuperUserGroupsConfiguration]\n" +
     "\t[-refreshCallQueue]\n" +
     "\t[-refresh <host:ipc_port> <key> [arg1..argn]\n" +
-    "\t[-reconfig <datanode|...> <host:ipc_port> <start|status|properties>]\n"
+
+    "\t[-reconfig <namenode|datanode> <host:ipc_port> " +
+      "<start|status|properties>]\n" +
     "\t[-printTopology]\n" +
     "\t[-refreshNamenodes datanode_host:ipc_port]\n"+
     "\t[-deleteBlockPool datanode_host:ipc_port blockpoolId [force]]\n"+
@@ -1028,12 +1031,12 @@ public class DFSAdmin extends FsShell {
 
     String refreshCallQueue = "-refreshCallQueue: Reload the call queue from config\n";
 
-    String reconfig = "-reconfig <datanode|...> <host:ipc_port> <start|status|properties>:\n"
+
+    String reconfig = "-reconfig <namenode|datanode> <host:ipc_port> " +
+        "<start|status|properties>:\n" +
         "\tStarts or gets the status of a reconfiguration operation, \n" +
         "\tor gets a list of reconfigurable properties.\n" +
-        "\tThe second parameter specifies the node type.\n" +
-        "\tCurrently, only reloading DataNode's configuration is supported.\n";
 
+        "\tThe second parameter specifies the node type\n";
     String genericRefresh = "-refresh: Arguments are <hostname:port> <resource_identifier>
[arg1..argn]\n" +
       "\tTriggers a runtime-refresh of the resource specified by <resource_identifier>\n"
+
       "\ton <hostname:port>. All other args after are sent to the host.\n";
@@ -1494,104 +1497,186 @@ public class DFSAdmin extends FsShell {
     String nodeType = argv[i];
     String address = argv[i + 1];
     String op = argv[i + 2];
+
     if ("start".equals(op)) {
-      return startReconfiguration(nodeType, address);
+      return startReconfiguration(nodeType, address, System.out, System.err);
     } else if ("status".equals(op)) {
       return getReconfigurationStatus(nodeType, address, System.out, System.err);
     } else if ("properties".equals(op)) {
-      return getReconfigurableProperties(
-          nodeType, address, System.out, System.err);
+      return getReconfigurableProperties(nodeType, address, System.out,
+          System.err);
     }
     System.err.println("Unknown operation: " + op);
     return -1;
   }
 
-  int startReconfiguration(String nodeType, String address) throws IOException {
-    if ("datanode".equals(nodeType)) {
-      ClientDatanodeProtocol dnProxy = getDataNodeProxy(address);
-      dnProxy.startReconfiguration();
-      System.out.println("Started reconfiguration task on DataNode " + address);
+  int startReconfiguration(final String nodeThpe, final String address)
+      throws IOException {
+    return startReconfiguration(nodeThpe, address, System.out, System.err);
+  }
+
+  int startReconfiguration(final String nodeType, final String address,
+      final PrintStream out, final PrintStream err) throws IOException {
+    String outMsg = null;
+    String errMsg = null;
+    int ret = 0;
+
+    try {
+      ret = startReconfigurationDispatch(nodeType, address, out, err);
+      outMsg = String.format("Started reconfiguration task on node [%s].",
+          address);
+    } catch (IOException e) {
+      errMsg = String.format("Node [%s] reconfiguring: %s.", address,
+          e.toString());
+    }
+
+    if (errMsg != null) {
+      err.println(errMsg);
+      return 1;
+    } else {
+      out.println(outMsg);
+      return ret;
+    }
+  }
+
+  int startReconfigurationDispatch(final String nodeType,
+      final String address, final PrintStream out, final PrintStream err)
+      throws IOException {
+    if ("namenode".equals(nodeType)) {
+      ReconfigurationProtocol reconfProxy = getNameNodeProxy(address);
+      reconfProxy.startReconfiguration();
+      return 0;
+    } else if ("datanode".equals(nodeType)) {
+      ClientDatanodeProtocol reconfProxy = getDataNodeProxy(address);
+      reconfProxy.startReconfiguration();
       return 0;
     } else {
-      System.err.println("Node type " + nodeType +
-          " does not support reconfiguration.");
+      System.err.println("Node type " + nodeType
+          + " does not support reconfiguration.");
       return 1;
     }
   }
 
-  int getReconfigurationStatus(String nodeType, String address,
-      PrintStream out, PrintStream err) throws IOException {
-    if ("datanode".equals(nodeType)) {
-      ClientDatanodeProtocol dnProxy = getDataNodeProxy(address);
-      try {
-        ReconfigurationTaskStatus status = dnProxy.getReconfigurationStatus();
-        out.print("Reconfiguring status for DataNode[" + address + "]: ");
-        if (!status.hasTask()) {
-          out.println("no task was found.");
-          return 0;
-        }
-        out.print("started at " + new Date(status.getStartTime()));
-        if (!status.stopped()) {
-          out.println(" and is still running.");
-          return 0;
-        }
+  int getReconfigurationStatus(final String nodeType, final String address,
+      final PrintStream out, final PrintStream err) throws IOException {
+    String outMsg = null;
+    String errMsg = null;
+    ReconfigurationTaskStatus status = null;
 
-        out.println(" and finished at " +
-            new Date(status.getEndTime()).toString() + ".");
-        if (status.getStatus() == null) {
-          // Nothing to report.
-          return 0;
-        }
-        for (Map.Entry<PropertyChange, Optional<String>> result :
-            status.getStatus().entrySet()) {
-          if (!result.getValue().isPresent()) {
-            out.printf(
-                "SUCCESS: Changed property %s%n\tFrom: \"%s\"%n\tTo: \"%s\"%n",
-                result.getKey().prop, result.getKey().oldVal,
-                result.getKey().newVal);
-          } else {
-            final String errorMsg = result.getValue().get();
-            out.printf(
-                  "FAILED: Change property %s%n\tFrom: \"%s\"%n\tTo: \"%s\"%n",
-                  result.getKey().prop, result.getKey().oldVal,
-                  result.getKey().newVal);
-            out.println("\tError: " + errorMsg + ".");
-          }
+    try {
+      status = getReconfigurationStatusDispatch(nodeType, address, out, err);
+      outMsg = String.format("Reconfiguring status for node [%s]: ", address);
+    } catch (IOException e) {
+      errMsg = String.format("Node [%s] reloading configuration: %s.", address,
+          e.toString());
+    }
+
+    if (errMsg != null) {
+      err.println(errMsg);
+      return 1;
+    } else {
+      out.print(outMsg);
+    }
+
+    if (status != null) {
+      if (!status.hasTask()) {
+        out.println("no task was found.");
+        return 0;
+      }
+      out.print("started at " + new Date(status.getStartTime()));
+      if (!status.stopped()) {
+        out.println(" and is still running.");
+        return 0;
+      }
+
+      out.println(" and finished at "
+          + new Date(status.getEndTime()).toString() + ".");
+      if (status.getStatus() == null) {
+        // Nothing to report.
+        return 0;
+      }
+      for (Map.Entry<PropertyChange, Optional<String>> result : status
+          .getStatus().entrySet()) {
+        if (!result.getValue().isPresent()) {
+          out.printf(
+              "SUCCESS: Changed property %s%n\tFrom: \"%s\"%n\tTo: \"%s\"%n",
+              result.getKey().prop, result.getKey().oldVal,
+              result.getKey().newVal);
+        } else {
+          final String errorMsg = result.getValue().get();
+          out.printf(
+              "FAILED: Change property %s%n\tFrom: \"%s\"%n\tTo: \"%s\"%n",
+              result.getKey().prop, result.getKey().oldVal,
+              result.getKey().newVal);
+          out.println("\tError: " + errorMsg + ".");
         }
-      } catch (IOException e) {
-        err.println("DataNode reloading configuration: " + e + ".");
-        return 1;
       }
     } else {
-      err.println("Node type " + nodeType +
-          " does not support reconfiguration.");
       return 1;
     }
+
     return 0;
   }
 
-  int getReconfigurableProperties(String nodeType, String address,
-      PrintStream out, PrintStream err) throws IOException {
-    if ("datanode".equals(nodeType)) {
-      ClientDatanodeProtocol dnProxy = getDataNodeProxy(address);
-      try {
-        List<String> properties =
-            dnProxy.listReconfigurableProperties();
-        out.println(
-            "Configuration properties that are allowed to be reconfigured:");
-        for (String name : properties) {
-          out.println(name);
-        }
-      } catch (IOException e) {
-        err.println("DataNode reconfiguration: " + e + ".");
-        return 1;
-      }
+  ReconfigurationTaskStatus getReconfigurationStatusDispatch(
+      final String nodeType, final String address, final PrintStream out,
+      final PrintStream err) throws IOException {
+    if ("namenode".equals(nodeType)) {
+      ReconfigurationProtocol reconfProxy = getNameNodeProxy(address);
+      return reconfProxy.getReconfigurationStatus();
+    } else if ("datanode".equals(nodeType)) {
+      ClientDatanodeProtocol reconfProxy = getDataNodeProxy(address);
+      return reconfProxy.getReconfigurationStatus();
     } else {
-      err.println("Node type " + nodeType +
-          " does not support reconfiguration.");
+      err.println("Node type " + nodeType
+          + " does not support reconfiguration.");
+      return null;
+    }
+  }
+
+  int getReconfigurableProperties(final String nodeType, final String address,
+      final PrintStream out, final PrintStream err) throws IOException {
+    String outMsg = null;
+    String errMsg = null;
+    List<String> properties = null;
+
+    try {
+      properties = getReconfigurablePropertiesDispatch(nodeType, address, out,
+          err);
+      outMsg = String.format("Node [%s] Reconfigurable properties:", address);
+    } catch (IOException e) {
+      errMsg = String.format("Node [%s] reconfiguration: %s.", address,
+          e.toString());
+    }
+
+    if (errMsg != null) {
+      err.println(errMsg);
       return 1;
+    } else if (properties == null) {
+      return 1;
+    } else {
+      out.println(outMsg);
+      for (String name : properties) {
+        out.println(name);
+      }
+      return 0;
+    }
+  }
+
+  List<String> getReconfigurablePropertiesDispatch(final String nodeType,
+      final String address, final PrintStream out, final PrintStream err)
+      throws IOException {
+    if ("namenode".equals(nodeType)) {
+      ReconfigurationProtocol reconfProxy = getNameNodeProxy(address);
+      return reconfProxy.listReconfigurableProperties();
+    } else if ("datanode".equals(nodeType)) {
+      ClientDatanodeProtocol reconfProxy = getDataNodeProxy(address);
+      return reconfProxy.listReconfigurableProperties();
+    } else {
+      err.println("Node type " + nodeType
+          + " does not support reconfiguration.");
+      return null;
     }
-    return 0;
   }
 
   public int genericRefresh(String[] argv, int i) throws IOException {
@@ -1712,7 +1797,7 @@ public class DFSAdmin extends FsShell {
                          + " [-refreshCallQueue]");
     } else if ("-reconfig".equals(cmd)) {
       System.err.println("Usage: hdfs dfsadmin"
-                         + " [-reconfig <datanode|...> <host:port> <start|status>]");
+          + " [-reconfig <namenode|datanode> <host:port> <start|status>]");
     } else if ("-refresh".equals(cmd)) {
       System.err.println("Usage: hdfs dfsadmin"
                          + " [-refresh <hostname:port> <resource_identifier>
[arg1..argn]");
@@ -2028,6 +2113,23 @@ public class DFSAdmin extends FsShell {
             NetUtils.getSocketFactory(conf, ClientDatanodeProtocol.class));
     return dnProtocol;
   }
+
+  private ReconfigurationProtocol getNameNodeProxy(String node)
+      throws IOException {
+    InetSocketAddress nodeAddr = NetUtils.createSocketAddr(node);
+    // Get the current configuration
+    Configuration conf = getConf();
+
+    // For namenode proxy the server principal should be NN's one.
+    conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY,
+        conf.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, ""));
+
+    // Create the client
+    ReconfigurationProtocol reconfigProtocol = DFSUtilClient
+        .createReconfigurationProtocolProxy(nodeAddr, getUGI(), conf,
+            NetUtils.getSocketFactory(conf, ReconfigurationProtocol.class));
+    return reconfigProtocol;
+  }
   
   private int deleteBlockPool(String[] argv, int i) throws IOException {
     ClientDatanodeProtocol dnProxy = getDataNodeProxy(argv[i]);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b4a4d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
index 3a30ccf..a3ed4f6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
@@ -20,6 +20,9 @@ package org.apache.hadoop.hdfs.tools;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
 
 import com.google.common.collect.Lists;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.ReconfigurationUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -27,6 +30,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -52,10 +56,12 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TestDFSAdmin {
+  private static final Log LOG = LogFactory.getLog(DFSAdmin.class);
   private Configuration conf = null;
   private MiniDFSCluster cluster;
   private DFSAdmin admin;
   private DataNode datanode;
+  private NameNode namenode;
 
   @Before
   public void setUp() throws Exception {
@@ -80,21 +86,64 @@ public class TestDFSAdmin {
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
     cluster.waitActive();
     datanode = cluster.getDataNodes().get(0);
+    namenode = cluster.getNameNode();
+  }
+
+  private void startReconfiguration(String nodeType, String address,
+      final List<String> outs, final List<String> errs) throws IOException {
+    reconfigurationOutErrFormatter("startReconfiguration", nodeType,
+        address, outs, errs);
+  }
+
+  private void getReconfigurableProperties(String nodeType, String address,
+      final List<String> outs, final List<String> errs) throws IOException {
+    reconfigurationOutErrFormatter("getReconfigurableProperties", nodeType,
+        address, outs, errs);
+  }
+
+  private void getReconfigurationStatus(String nodeType, String address,
+      final List<String> outs, final List<String> errs) throws IOException {
+    reconfigurationOutErrFormatter("getReconfigurationStatus", nodeType,
+        address, outs, errs);
   }
 
-  private List<String> getReconfigureStatus(String nodeType, String address)
-      throws IOException {
+  private void reconfigurationOutErrFormatter(String methodName,
+      String nodeType, String address, final List<String> outs,
+      final List<String> errs) throws IOException {
     ByteArrayOutputStream bufOut = new ByteArrayOutputStream();
     PrintStream out = new PrintStream(bufOut);
     ByteArrayOutputStream bufErr = new ByteArrayOutputStream();
     PrintStream err = new PrintStream(bufErr);
-    admin.getReconfigurationStatus(nodeType, address, out, err);
+
+    if (methodName.equals("getReconfigurableProperties")) {
+      admin.getReconfigurableProperties(nodeType, address, out, err);
+    } else if (methodName.equals("getReconfigurationStatus")) {
+      admin.getReconfigurationStatus(nodeType, address, out, err);
+    } else if (methodName.equals("startReconfiguration")) {
+      admin.startReconfiguration(nodeType, address, out, err);
+    }
+
     Scanner scanner = new Scanner(bufOut.toString());
-    List<String> outputs = Lists.newArrayList();
     while (scanner.hasNextLine()) {
-      outputs.add(scanner.nextLine());
+      outs.add(scanner.nextLine());
+    }
+    scanner.close();
+    scanner = new Scanner(bufErr.toString());
+    while (scanner.hasNextLine()) {
+      errs.add(scanner.nextLine());
     }
-    return outputs;
+    scanner.close();
+  }
+
+  @Test(timeout = 30000)
+  public void testDataNodeGetReconfigurableProperties() throws IOException {
+    final int port = datanode.getIpcPort();
+    final String address = "localhost:" + port;
+    final List<String> outs = Lists.newArrayList();
+    final List<String> errs = Lists.newArrayList();
+    getReconfigurableProperties("datanode", address, outs, errs);
+    assertEquals(3, outs.size());
+    assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
   }
 
   /**
@@ -103,7 +152,7 @@ public class TestDFSAdmin {
    * @throws IOException
    * @throws InterruptedException
    */
-  private void testGetReconfigurationStatus(boolean expectedSuccuss)
+  private void testDataNodeGetReconfigurationStatus(boolean expectedSuccuss)
       throws IOException, InterruptedException {
     ReconfigurationUtil ru = mock(ReconfigurationUtil.class);
     datanode.setReconfigurationUtil(ru);
@@ -130,21 +179,25 @@ public class TestDFSAdmin {
 
     assertThat(admin.startReconfiguration("datanode", address), is(0));
 
-    List<String> outputs = null;
     int count = 100;
+    final List<String> outs = Lists.newArrayList();
+    final List<String> errs = Lists.newArrayList();
     while (count > 0) {
-      outputs = getReconfigureStatus("datanode", address);
-      if (!outputs.isEmpty() && outputs.get(0).contains("finished")) {
+      outs.clear();
+      errs.clear();
+      getReconfigurationStatus("datanode", address, outs, errs);
+      if (!outs.isEmpty() && outs.get(0).contains("finished")) {
         break;
       }
       count--;
       Thread.sleep(100);
     }
+    LOG.info(String.format("count=%d", count));
     assertTrue(count > 0);
     if (expectedSuccuss) {
-      assertThat(outputs.size(), is(4));
+      assertThat(outs.size(), is(4));
     } else {
-      assertThat(outputs.size(), is(6));
+      assertThat(outs.size(), is(6));
     }
 
     List<StorageLocation> locations = DataNode.getStorageLocations(
@@ -160,55 +213,78 @@ public class TestDFSAdmin {
 
     int offset = 1;
     if (expectedSuccuss) {
-      assertThat(outputs.get(offset),
+      assertThat(outs.get(offset),
           containsString("SUCCESS: Changed property " +
               DFS_DATANODE_DATA_DIR_KEY));
     } else {
-      assertThat(outputs.get(offset),
+      assertThat(outs.get(offset),
           containsString("FAILED: Change property " +
               DFS_DATANODE_DATA_DIR_KEY));
     }
-    assertThat(outputs.get(offset + 1),
+    assertThat(outs.get(offset + 1),
         is(allOf(containsString("From:"), containsString("data1"),
             containsString("data2"))));
-    assertThat(outputs.get(offset + 2),
+    assertThat(outs.get(offset + 2),
         is(not(anyOf(containsString("data1"), containsString("data2")))));
-    assertThat(outputs.get(offset + 2),
+    assertThat(outs.get(offset + 2),
         is(allOf(containsString("To"), containsString("data_new"))));
   }
 
   @Test(timeout = 30000)
-  public void testGetReconfigurationStatus()
-      throws IOException, InterruptedException {
-    testGetReconfigurationStatus(true);
+  public void testDataNodeGetReconfigurationStatus() throws IOException,
+      InterruptedException {
+    testDataNodeGetReconfigurationStatus(true);
     restartCluster();
-    testGetReconfigurationStatus(false);
+    testDataNodeGetReconfigurationStatus(false);
   }
 
-  private List<String> getReconfigurationAllowedProperties(
-      String nodeType, String address)
-      throws IOException {
-    ByteArrayOutputStream bufOut = new ByteArrayOutputStream();
-    PrintStream out = new PrintStream(bufOut);
-    ByteArrayOutputStream bufErr = new ByteArrayOutputStream();
-    PrintStream err = new PrintStream(bufErr);
-    admin.getReconfigurableProperties(nodeType, address, out, err);
-    Scanner scanner = new Scanner(bufOut.toString());
-    List<String> outputs = Lists.newArrayList();
-    while (scanner.hasNextLine()) {
-      outputs.add(scanner.nextLine());
-    }
-    return outputs;
+  @Test(timeout = 30000)
+  public void testNameNodeStartReconfiguration() throws IOException {
+    final String address = namenode.getHostAndPort();
+    final List<String> outs = Lists.newArrayList();
+    final List<String> errs = Lists.newArrayList();
+    startReconfiguration("namenode", address, outs, errs);
+    assertEquals(0, outs.size());
+    assertTrue(errs.size() > 1);
+    assertThat(
+        errs.get(0),
+        is(allOf(containsString("Namenode"), containsString("reconfiguring:"),
+            containsString("startReconfiguration"),
+            containsString("is not implemented"),
+            containsString("UnsupportedOperationException"))));
   }
 
   @Test(timeout = 30000)
-  public void testGetReconfigAllowedProperties() throws IOException {
-    final int port = datanode.getIpcPort();
-    final String address = "localhost:" + port;
-    List<String> outputs =
-        getReconfigurationAllowedProperties("datanode", address);
-    assertEquals(3, outputs.size());
-    assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
-        outputs.get(1));
+  public void testNameNodeGetReconfigurableProperties() throws IOException {
+    final String address = namenode.getHostAndPort();
+    final List<String> outs = Lists.newArrayList();
+    final List<String> errs = Lists.newArrayList();
+    getReconfigurableProperties("namenode", address, outs, errs);
+    assertEquals(0, outs.size());
+    assertTrue(errs.size() > 1);
+    assertThat(
+        errs.get(0),
+        is(allOf(containsString("Namenode"),
+            containsString("reconfiguration:"),
+            containsString("listReconfigurableProperties"),
+            containsString("is not implemented"),
+            containsString("UnsupportedOperationException"))));
+  }
+
+  @Test(timeout = 30000)
+  public void testNameNodeGetReconfigurationStatus() throws IOException {
+    final String address = namenode.getHostAndPort();
+    final List<String> outs = Lists.newArrayList();
+    final List<String> errs = Lists.newArrayList();
+    getReconfigurationStatus("namenode", address, outs, errs);
+    assertEquals(0, outs.size());
+    assertTrue(errs.size() > 1);
+    assertThat(
+        errs.get(0),
+        is(allOf(containsString("Namenode"),
+            containsString("reloading configuration:"),
+            containsString("getReconfigurationStatus"),
+            containsString("is not implemented"),
+            containsString("UnsupportedOperationException"))));
   }
 }
\ No newline at end of file


Mime
View raw message