hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r888998 - in /hadoop/common/trunk: ./ src/contrib/cloud/ src/contrib/cloud/src/integration-test/ src/contrib/cloud/src/py/ src/contrib/cloud/src/py/hadoop/cloud/ src/contrib/cloud/src/py/hadoop/cloud/providers/ src/contrib/cloud/src/test/ha...
Date Wed, 09 Dec 2009 21:57:11 GMT
Author: tomwhite
Date: Wed Dec  9 21:57:10 2009
New Revision: 888998

URL: http://svn.apache.org/viewvc?rev=888998&view=rev
Log:
HADOOP-6392. Run namenode and jobtracker on separate EC2 instances.

Added:
    hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/service.py
Removed:
    hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/commands.py
Modified:
    hadoop/common/trunk/CHANGES.txt
    hadoop/common/trunk/src/contrib/cloud/README.txt
    hadoop/common/trunk/src/contrib/cloud/src/integration-test/ebs-storage-spec.json
    hadoop/common/trunk/src/contrib/cloud/src/integration-test/persistent-cluster.sh
    hadoop/common/trunk/src/contrib/cloud/src/integration-test/transient-cluster.sh
    hadoop/common/trunk/src/contrib/cloud/src/py/hadoop-ec2-init-remote.sh
    hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/cli.py
    hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/cluster.py
    hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/providers/ec2.py
    hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/storage.py
    hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testcluster.py
    hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/teststorage.py

Modified: hadoop/common/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=888998&r1=888997&r2=888998&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Wed Dec  9 21:57:10 2009
@@ -16,6 +16,9 @@
 
     HADOOP-6108. Add support for EBS storage on EC2. (tomwhite)
 
+    HADOOP-6392. Run namenode and jobtracker on separate EC2 instances.
+    (tomwhite)
+
   IMPROVEMENTS
 
     HADOOP-6283. Improve the exception messages thrown by

Modified: hadoop/common/trunk/src/contrib/cloud/README.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/README.txt?rev=888998&r1=888997&r2=888998&view=diff
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/README.txt (original)
+++ hadoop/common/trunk/src/contrib/cloud/README.txt Wed Dec  9 21:57:10 2009
@@ -61,8 +61,18 @@
 
 % hadoop-ec2 launch-cluster my-hadoop-cluster 10
 
-This will boot the master node and 10 worker nodes. When the nodes have started
-and the Hadoop cluster has come up, the console will display a message like
+This will boot the master node and 10 worker nodes. The master node runs the
+namenode, secondary namenode, and jobtracker, and each worker node runs a
+datanode and a tasktracker. Equivalently the cluster could be launched as:
+
+% hadoop-ec2 launch-cluster my-hadoop-cluster 1 nn,snn,jt 10 dn,tt
+
+Note that using this notation you can launch a split namenode/jobtracker cluster
+
+% hadoop-ec2 launch-cluster my-hadoop-cluster 1 nn,snn 1 jt 10 dn,tt
+
+When the nodes have started and the Hadoop cluster has come up, the console will
+display a message like
 
   Browse the cluster at http://ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com/
 
@@ -70,7 +80,8 @@
 opened for access from your client machine. You may change the firewall settings
 (to allow access from a network, rather than just a single machine, for example)
 by using the Amazon EC2 command line tools, or by using a tool like Elastic Fox.
-The security group to change is the one named <cluster-name>-master.
+There is a security group for each node's role. The one for the namenode
+is <cluster-name>-nn, for example.
 
 For security reasons, traffic from the network your client is running on is
 proxied through the master node of the cluster using an SSH tunnel (a SOCKS
@@ -109,13 +120,13 @@
 
 % hadoop-ec2 create-formatted-snapshot my-ebs-cluster 100
 
-We create storage for a single master and for two slaves. The volumes to create
-are described in a JSON spec file, which references the snapshot we just
+We create storage for a single namenode and for two datanodes. The volumes to
+create are described in a JSON spec file, which references the snapshot we just
 created. Here is the contents of a JSON file, called
 my-ebs-cluster-storage-spec.json:
 
 {
-  "master": [
+  "nn": [
     {
       "device": "/dev/sdj",
       "mount_point": "/ebs1",
@@ -129,7 +140,7 @@
       "snapshot_id": "snap-268e704f"
     }
   ],
-  "slave": [
+  "dn": [
     {
       "device": "/dev/sdj",
       "mount_point": "/ebs1",
@@ -146,7 +157,7 @@
 }
 
 
-Each role (here "master" and "slave") is the key to an array of volume
+Each role (here "nn" and "dn") is the key to an array of volume
 specifications. In this example, the "slave" role has two devices ("/dev/sdj"
 and "/dev/sdk") with different mount points, sizes, and generated from an EBS
 snapshot. The snapshot is the formatted snapshot created earlier, so that the
@@ -155,9 +166,9 @@
 
 Let's create actual volumes using this file.
 
-% hadoop-ec2 create-storage my-ebs-cluster master 1 \
+% hadoop-ec2 create-storage my-ebs-cluster nn 1 \
     my-ebs-cluster-storage-spec.json
-% hadoop-ec2 create-storage my-ebs-cluster slave 2 \
+% hadoop-ec2 create-storage my-ebs-cluster dn 2 \
     my-ebs-cluster-storage-spec.json
 
 Now let's start the cluster with 2 slave nodes:
@@ -214,7 +225,7 @@
 
 Of course, these examples assume that you have installed Hadoop on your local
 machine. It is also possible to launch jobs from within the cluster. First log
-into the master node:
+into the namenode:
 
 % hadoop-ec2 login my-hadoop-cluster
 

Modified: hadoop/common/trunk/src/contrib/cloud/src/integration-test/ebs-storage-spec.json
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/integration-test/ebs-storage-spec.json?rev=888998&r1=888997&r2=888998&view=diff
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/integration-test/ebs-storage-spec.json (original)
+++ hadoop/common/trunk/src/contrib/cloud/src/integration-test/ebs-storage-spec.json Wed Dec  9 21:57:10 2009
@@ -1,5 +1,5 @@
 {
-  "master": [
+  "nn": [
     {
       "device": "/dev/sdj",
       "mount_point": "/ebs1",
@@ -13,7 +13,7 @@
       "snapshot_id": "snap-fe44bb97"
     }
   ],
-  "slave": [
+  "dn": [
     {
       "device": "/dev/sdj",
       "mount_point": "/ebs1",

Modified: hadoop/common/trunk/src/contrib/cloud/src/integration-test/persistent-cluster.sh
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/integration-test/persistent-cluster.sh?rev=888998&r1=888997&r2=888998&view=diff
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/integration-test/persistent-cluster.sh (original)
+++ hadoop/common/trunk/src/contrib/cloud/src/integration-test/persistent-cluster.sh Wed Dec  9 21:57:10 2009
@@ -71,10 +71,10 @@
 
 # Create storage
 $HADOOP_CLOUD_SCRIPT create-storage --config-dir=$CONFIG_DIR \
-  --availability-zone=$AVAILABILITY_ZONE $CLUSTER master 1 \
+  --availability-zone=$AVAILABILITY_ZONE $CLUSTER nn 1 \
   $bin/ebs-storage-spec.json
 $HADOOP_CLOUD_SCRIPT create-storage --config-dir=$CONFIG_DIR \
-  --availability-zone=$AVAILABILITY_ZONE $CLUSTER slave 1 \
+  --availability-zone=$AVAILABILITY_ZONE $CLUSTER dn 1 \
   $bin/ebs-storage-spec.json
 
 # Launch a cluster

Modified: hadoop/common/trunk/src/contrib/cloud/src/integration-test/transient-cluster.sh
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/integration-test/transient-cluster.sh?rev=888998&r1=888997&r2=888998&view=diff
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/integration-test/transient-cluster.sh (original)
+++ hadoop/common/trunk/src/contrib/cloud/src/integration-test/transient-cluster.sh Wed Dec  9 21:57:10 2009
@@ -41,6 +41,7 @@
 HADOOP_CLOUD_PROVIDER=${HADOOP_CLOUD_PROVIDER:-ec2}
 SSH_OPTIONS=${SSH_OPTIONS:-"-i ~/.$HADOOP_CLOUD_PROVIDER/id_rsa-$KEY_NAME \
   -o StrictHostKeyChecking=no"}
+LAUNCH_ARGS=${LAUNCH_ARGS:-1} # Try LAUNCH_ARGS="1 nn,snn 1 jt 1 dn,tt"
 
 HADOOP_CLOUD_SCRIPT=$HADOOP_CLOUD_HOME/hadoop-$HADOOP_CLOUD_PROVIDER
 export HADOOP_CONF_DIR=$CONFIG_DIR/$CLUSTER
@@ -56,7 +57,12 @@
 # Launch a cluster
 $HADOOP_CLOUD_SCRIPT launch-cluster --config-dir=$CONFIG_DIR \
   --image-id=$IMAGE_ID --key-name=$KEY_NAME --auto-shutdown=$AUTO_SHUTDOWN \
-  --availability-zone=$AVAILABILITY_ZONE $CLIENT_CIDRS $ENVS $CLUSTER 1
+  --availability-zone=$AVAILABILITY_ZONE $CLIENT_CIDRS $ENVS $CLUSTER \
+  $LAUNCH_ARGS
+  
+# List clusters
+$HADOOP_CLOUD_SCRIPT list --config-dir=$CONFIG_DIR
+$HADOOP_CLOUD_SCRIPT list --config-dir=$CONFIG_DIR $CLUSTER
 
 # Run a proxy and save its pid in HADOOP_CLOUD_PROXY_PID
 eval `$HADOOP_CLOUD_SCRIPT proxy --config-dir=$CONFIG_DIR \

Modified: hadoop/common/trunk/src/contrib/cloud/src/py/hadoop-ec2-init-remote.sh
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/py/hadoop-ec2-init-remote.sh?rev=888998&r1=888997&r2=888998&view=diff
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/py/hadoop-ec2-init-remote.sh (original)
+++ hadoop/common/trunk/src/contrib/cloud/src/py/hadoop-ec2-init-remote.sh Wed Dec  9 21:57:10 2009
@@ -30,16 +30,20 @@
 # Substitute environment variables passed by the client
 export %ENV%
 
-if [ -z "$MASTER_HOST" ]; then
-  IS_MASTER=true
-  MASTER_HOST=`wget -q -O - http://169.254.169.254/latest/meta-data/public-hostname`
-else
-  IS_MASTER=false
-fi
-
 HADOOP_VERSION=${HADOOP_VERSION:-0.20.1}
 HADOOP_HOME=/usr/local/hadoop-$HADOOP_VERSION
 HADOOP_CONF_DIR=$HADOOP_HOME/conf
+SELF_HOST=`wget -q -O - http://169.254.169.254/latest/meta-data/public-hostname`
+for role in $(echo "$ROLES" | tr "," "\n"); do
+  case $role in
+  nn)
+    NN_HOST=$SELF_HOST
+    ;;
+  jt)
+    JT_HOST=$SELF_HOST
+    ;;
+  esac
+done
 
 function register_auto_shutdown() {
   if [ ! -z "$AUTO_SHUTDOWN" ]; then
@@ -237,6 +241,9 @@
   # Create tmp directory
   mkdir /mnt/tmp
   chmod a+rwxt /mnt/tmp
+  
+  mkdir /etc/hadoop
+  ln -s $HADOOP_CONF_DIR /etc/hadoop/conf
 
   ##############################################################################
   # Modify this section to customize your Hadoop cluster.
@@ -301,7 +308,7 @@
 </property>
 <property>
   <name>fs.default.name</name>
-  <value>hdfs://$MASTER_HOST:8020/</value>
+  <value>hdfs://$NN_HOST:8020/</value>
 </property>
 <property>
   <name>fs.trash.interval</name>
@@ -328,7 +335,7 @@
 </property>
 <property>
   <name>mapred.job.tracker</name>
-  <value>$MASTER_HOST:8021</value>
+  <value>$JT_HOST:8021</value>
 </property>
 <property>
   <name>mapred.job.tracker.handler.count</name>
@@ -473,8 +480,8 @@
 you may wish to use
 <a href="https://addons.mozilla.org/en-US/firefox/addon/2464">FoxyProxy</a>.
 <ul>
-<li><a href="http://$MASTER_HOST:50070/">NameNode</a>
-<li><a href="http://$MASTER_HOST:50030/">JobTracker</a>
+<li><a href="http://$NN_HOST:50070/">NameNode</a>
+<li><a href="http://$JT_HOST:50030/">JobTracker</a>
 </ul>
 </body>
 </html>
@@ -484,7 +491,7 @@
 
 }
 
-function start_hadoop_master() {
+function start_namenode() {
   if which dpkg &> /dev/null; then
     AS_HADOOP="su -s /bin/bash - hadoop -c"
   elif which rpm &> /dev/null; then
@@ -495,8 +502,6 @@
   [ ! -e $FIRST_MOUNT/hadoop/hdfs ] && $AS_HADOOP "$HADOOP_HOME/bin/hadoop namenode -format"
 
   $AS_HADOOP "$HADOOP_HOME/bin/hadoop-daemon.sh start namenode"
-  $AS_HADOOP "$HADOOP_HOME/bin/hadoop-daemon.sh start secondarynamenode"
-  $AS_HADOOP "$HADOOP_HOME/bin/hadoop-daemon.sh start jobtracker"
 
   $AS_HADOOP "$HADOOP_HOME/bin/hadoop dfsadmin -safemode wait"
   $AS_HADOOP "$HADOOP_HOME/bin/hadoop fs -mkdir /user"
@@ -506,15 +511,13 @@
 
 }
 
-function start_hadoop_slave() {
+function start_daemon() {
   if which dpkg &> /dev/null; then
     AS_HADOOP="su -s /bin/bash - hadoop -c"
   elif which rpm &> /dev/null; then
     AS_HADOOP="/sbin/runuser -s /bin/bash - hadoop -c"
   fi
-
-  $AS_HADOOP "$HADOOP_HOME/bin/hadoop-daemon.sh start datanode"
-  $AS_HADOOP "$HADOOP_HOME/bin/hadoop-daemon.sh start tasktracker"
+  $AS_HADOOP "$HADOOP_HOME/bin/hadoop-daemon.sh start $1"
 }
 
 register_auto_shutdown
@@ -522,9 +525,24 @@
 install_hadoop
 configure_hadoop
 
-if $IS_MASTER ; then
-  setup_web
-  start_hadoop_master
-else
-  start_hadoop_slave
-fi
+for role in $(echo "$ROLES" | tr "," "\n"); do
+  case $role in
+  nn)
+    setup_web
+    start_namenode
+    ;;
+  snn)
+    start_daemon secondarynamenode
+    ;;
+  jt)
+    start_daemon jobtracker
+    ;;
+  dn)
+    start_daemon datanode
+    ;;
+  tt)
+    start_daemon tasktracker
+    ;;
+  esac
+done
+

Modified: hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/cli.py
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/cli.py?rev=888998&r1=888997&r2=888998&view=diff
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/cli.py (original)
+++ hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/cli.py Wed Dec  9 21:57:10 2009
@@ -16,17 +16,20 @@
 from __future__ import with_statement
 
 import ConfigParser
-import hadoop.cloud.commands as commands
 from hadoop.cloud.cluster import get_cluster
-from hadoop.cloud.cluster import TimeoutException
-from hadoop.cloud.providers.ec2 import Ec2Storage
+from hadoop.cloud.service import InstanceTemplate
+from hadoop.cloud.service import HadoopService
+from hadoop.cloud.service import NAMENODE
+from hadoop.cloud.service import SECONDARY_NAMENODE
+from hadoop.cloud.service import JOBTRACKER
+from hadoop.cloud.service import DATANODE
+from hadoop.cloud.service import TASKTRACKER
 from hadoop.cloud.util import merge_config_with_options
 from hadoop.cloud.util import xstr
 import logging
 from optparse import OptionParser
 from optparse import make_option
 import os
-import subprocess
 import sys
 
 version_file = os.path.join(sys.path[0], "VERSION")
@@ -125,8 +128,8 @@
                                         or instances in CLUSTER
   launch-master CLUSTER               launch or find a master in CLUSTER
   launch-slaves CLUSTER NUM_SLAVES    launch NUM_SLAVES slaves in CLUSTER
-  launch-cluster CLUSTER NUM_SLAVES   launch a master and NUM_SLAVES slaves
-                                        in CLUSTER
+  launch-cluster CLUSTER (NUM_SLAVES| launch a master and NUM_SLAVES slaves or
+    N ROLE [N ROLE ...])                N instances in ROLE in CLUSTER
   create-formatted-snapshot CLUSTER   create an empty, formatted snapshot of
     SIZE                                size SIZE GiB
   list-storage CLUSTER                list storage volumes for CLUSTER
@@ -147,6 +150,9 @@
 Use %(script)s COMMAND --help to see additional options for specific commands.
 """ % locals()
 
+def print_deprecation(script, replacement):
+  print "Deprecated. Use '%(script)s %(replacement)s'." % locals()
+
 def parse_options_and_config(command, option_list=[], extra_arguments=(),
                              unbounded_args=False):
   """
@@ -176,8 +182,9 @@
   cluster_name = args[0]
   opt = merge_config_with_options(cluster_name, config, options_dict)
   logging.debug("Options: %s", str(opt))
-  return (opt, args, get_cluster(get_cloud_provider(opt))(cluster_name,
-                                                          config_dir))
+  cluster = get_cluster(get_cloud_provider(opt))(cluster_name, config_dir)
+  service = get_service(cluster)
+  return (opt, args, service)
 
 def parse_options(command, option_list=[], expected_arguments=(),
                   unbounded_args=False):
@@ -221,6 +228,9 @@
     provider = DEFAULT_CLOUD_PROVIDER
   return provider
 
+def get_service(cluster):
+  return HadoopService(cluster)
+
 def check_options_set(options, option_names):
   for option_name in option_names:
     if options.get(option_name) is None:
@@ -242,10 +252,6 @@
   else:
     return options.get('image_id')
 
-def _prompt(prompt):
-  """ Returns true if user responds "yes" to prompt. """
-  return raw_input("%s [yes or no]: " % prompt).lower() == "yes"
-
 def main():
   # Use HADOOP_CLOUD_LOGGING_LEVEL=DEBUG to enable debugging output.
   logging.basicConfig(level=getattr(logging,
@@ -261,194 +267,148 @@
   if command == 'list':
     (opt, args) = parse_options(command, BASIC_OPTIONS, unbounded_args=True)
     if len(args) == 0:
-      commands.list_all(get_cloud_provider(opt))
+      service = get_service(None)
+      service.list_all(get_cloud_provider(opt))
     else:
-      (opt, args, cluster) = parse_options_and_config(command, BASIC_OPTIONS)
-      commands.list_cluster(cluster)
+      (opt, args, service) = parse_options_and_config(command, BASIC_OPTIONS)
+      service.list()
 
   elif command == 'launch-master':
-    (opt, args, cluster) = parse_options_and_config(command, LAUNCH_OPTIONS)
-    check_launch_options_set(cluster, opt)
+    (opt, args, service) = parse_options_and_config(command, LAUNCH_OPTIONS)
+    check_launch_options_set(service.cluster, opt)
     config_dir = get_config_dir(opt)
-    commands.launch_master(cluster, config_dir, get_image_id(cluster, opt),
-      opt.get('instance_type'),
-      opt.get('key_name'), opt.get('public_key'), opt.get('user_data_file'),
-      opt.get('availability_zone'), opt.get('user_packages'),
-      opt.get('auto_shutdown'), opt.get('env'), opt.get('client_cidr'))
-    commands.attach_storage(cluster, (commands.MASTER,))
-    try:
-      commands.wait_for_hadoop(cluster, 0)
-    except TimeoutException:
-      print "Timeout while waiting for Hadoop to start. Please check logs on" +\
-        " master."
-    commands.print_master_url(cluster)
+    template = InstanceTemplate((NAMENODE, SECONDARY_NAMENODE, JOBTRACKER), 1,
+                         get_image_id(service.cluster, opt),
+                         opt.get('instance_type'), opt.get('key_name'),
+                         opt.get('public_key'), opt.get('user_data_file'),
+                         opt.get('availability_zone'), opt.get('user_packages'),
+                         opt.get('auto_shutdown'), opt.get('env'))
+    service.launch_master(template, config_dir, opt.get('client_cidr'))
 
   elif command == 'launch-slaves':
-    (opt, args, cluster) = parse_options_and_config(command, LAUNCH_OPTIONS,
+    (opt, args, service) = parse_options_and_config(command, LAUNCH_OPTIONS,
                                                     ("NUM_SLAVES",))
     number_of_slaves = int(args[1])
-    check_launch_options_set(cluster, opt)
-    config_dir = get_config_dir(opt)
-    commands.launch_slaves(cluster, number_of_slaves, get_image_id(cluster, opt),
-      opt.get('instance_type'),
-      opt.get('key_name'), opt.get('public_key'), opt.get('user_data_file'),
-      opt.get('availability_zone'), opt.get('user_packages'),
-      opt.get('auto_shutdown'), opt.get('env'))
-    commands.attach_storage(cluster, (commands.SLAVE,))
-    commands.print_master_url(cluster)
+    check_launch_options_set(service.cluster, opt)
+    template = InstanceTemplate((DATANODE, TASKTRACKER), number_of_slaves,
+                         get_image_id(service.cluster, opt),
+                         opt.get('instance_type'), opt.get('key_name'),
+                         opt.get('public_key'), opt.get('user_data_file'),
+                         opt.get('availability_zone'), opt.get('user_packages'),
+                         opt.get('auto_shutdown'), opt.get('env'))
+    service.launch_slaves(template)
 
   elif command == 'launch-cluster':
-    (opt, args, cluster) = parse_options_and_config(command, LAUNCH_OPTIONS,
-                                                    ("NUM_SLAVES",))
-    number_of_slaves = int(args[1])
-    check_launch_options_set(cluster, opt)
+    (opt, args, service) = parse_options_and_config(command, LAUNCH_OPTIONS,
+                                                    ("NUM_SLAVES",),
+                                                    unbounded_args=True)
+    check_launch_options_set(service.cluster, opt)
     config_dir = get_config_dir(opt)
-    commands.launch_master(cluster, config_dir, get_image_id(cluster, opt),
-      opt.get('instance_type'),
-      opt.get('key_name'), opt.get('public_key'), opt.get('user_data_file'),
-      opt.get('availability_zone'), opt.get('user_packages'),
-      opt.get('auto_shutdown'), opt.get('env'), opt.get('client_cidr'))
-    commands.launch_slaves(cluster, number_of_slaves, get_image_id(cluster, opt),
-      opt.get('instance_type'),
-      opt.get('key_name'), opt.get('public_key'), opt.get('user_data_file'),
-      opt.get('availability_zone'), opt.get('user_packages'),
-      opt.get('auto_shutdown'), opt.get('env'))
-    commands.attach_storage(cluster, commands.ROLES)
-    try:
-      commands.wait_for_hadoop(cluster, number_of_slaves)
-    except TimeoutException:
-      print "Timeout while waiting for Hadoop to start. Please check logs on" +\
-        " cluster."
-    commands.print_master_url(cluster)
+    instance_templates = []
+    if len(args) == 2:
+      number_of_slaves = int(args[1])
+      print_deprecation(sys.argv[0], 'launch-cluster %s 1 nn,snn,jt %s dn,tt' %
+                        (service.cluster.name, number_of_slaves))
+      instance_templates = [
+        InstanceTemplate((NAMENODE, SECONDARY_NAMENODE, JOBTRACKER), 1,
+                         get_image_id(service.cluster, opt),
+                         opt.get('instance_type'), opt.get('key_name'),
+                         opt.get('public_key'), opt.get('user_data_file'),
+                         opt.get('availability_zone'), opt.get('user_packages'),
+                         opt.get('auto_shutdown'), opt.get('env')),
+        InstanceTemplate((DATANODE, TASKTRACKER), number_of_slaves,
+                         get_image_id(service.cluster, opt),
+                         opt.get('instance_type'), opt.get('key_name'),
+                         opt.get('public_key'), opt.get('user_data_file'),
+                         opt.get('availability_zone'), opt.get('user_packages'),
+                         opt.get('auto_shutdown'), opt.get('env')),
+                         ]
+    elif len(args) > 2 and len(args) % 2 == 0:
+      print_usage(sys.argv[0])
+      sys.exit(1)
+    else:
+      for i in range(len(args) / 2):
+        number = int(args[2 * i + 1])
+        roles = args[2 * i + 2].split(",")
+        instance_templates.append(
+          InstanceTemplate(roles, number, get_image_id(service.cluster, opt),
+                           opt.get('instance_type'), opt.get('key_name'),
+                           opt.get('public_key'), opt.get('user_data_file'),
+                           opt.get('availability_zone'),
+                           opt.get('user_packages'),
+                           opt.get('auto_shutdown'), opt.get('env')))
+
+    service.launch_cluster(instance_templates, config_dir,
+                           opt.get('client_cidr'))
 
   elif command == 'login':
-    (opt, args, cluster) = parse_options_and_config(command, SSH_OPTIONS)
-    instances = cluster.check_running(commands.MASTER, 1)
-    if not instances:
-      sys.exit(1)
-    subprocess.call('ssh %s root@%s' % \
-                    (xstr(opt.get('ssh_options')), instances[0].public_ip),
-                    shell=True)
+    (opt, args, service) = parse_options_and_config(command, SSH_OPTIONS)
+    service.login(opt.get('ssh_options'))
 
   elif command == 'proxy':
-    (opt, args, cluster) = parse_options_and_config(command, SSH_OPTIONS)
-    instances = cluster.check_running(commands.MASTER, 1)
-    if not instances:
-      sys.exit(1)
-    options = '-o "ConnectTimeout 10" -o "ServerAliveInterval 60" ' \
-              '-N -D 6666'
-    process = subprocess.Popen('ssh %s %s root@%s' %
-      (xstr(opt.get('ssh_options')), options, instances[0].public_ip),
-      stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
-      shell=True)
-    print """export HADOOP_CLOUD_PROXY_PID=%s;
-echo Proxy pid %s;""" % (process.pid, process.pid)
+    (opt, args, service) = parse_options_and_config(command, SSH_OPTIONS)
+    service.proxy(opt.get('ssh_options'))
 
   elif command == 'push':
-    (opt, args, cluster) = parse_options_and_config(command, SSH_OPTIONS,
+    (opt, args, service) = parse_options_and_config(command, SSH_OPTIONS,
                                                     ("FILE",))
-    instances = cluster.check_running(commands.MASTER, 1)
-    if not instances:
-      sys.exit(1)
-    subprocess.call('scp %s -r %s root@%s:' % (xstr(opt.get('ssh_options')),
-                                               args[1], instances[0].public_ip),
-                                               shell=True)
+    service.proxy(opt.get('ssh_options'), args[1])
 
   elif command == 'exec':
-    (opt, args, cluster) = parse_options_and_config(command, SSH_OPTIONS,
+    (opt, args, service) = parse_options_and_config(command, SSH_OPTIONS,
                                                     ("CMD",), True)
-    instances = cluster.check_running(commands.MASTER, 1)
-    if not instances:
-      sys.exit(1)
-    subprocess.call("ssh %s root@%s '%s'" % (xstr(opt.get('ssh_options')),
-                                             instances[0].public_ip,
-                                             " ".join(args[1:])), shell=True)
+    service.execute(opt.get('ssh_options'), args[1:])
 
   elif command == 'terminate-cluster':
-    (opt, args, cluster) = parse_options_and_config(command, FORCE_OPTIONS)
-    cluster.print_status(commands.ROLES)
-    if not opt["force"] and not _prompt("Terminate all instances?"):
-      print "Not terminating cluster."
-    else:
-      print "Terminating cluster"
-      cluster.terminate()
+    (opt, args, service) = parse_options_and_config(command, FORCE_OPTIONS)
+    service.terminate_cluster(opt["force"])
 
   elif command == 'delete-cluster':
-    (opt, args, cluster) = parse_options_and_config(command, BASIC_OPTIONS)
-    cluster.delete()
+    (opt, args, service) = parse_options_and_config(command, BASIC_OPTIONS)
+    service.delete_cluster()
 
   elif command == 'create-formatted-snapshot':
-    (opt, args, cluster) = parse_options_and_config(command, SNAPSHOT_OPTIONS,
+    (opt, args, service) = parse_options_and_config(command, SNAPSHOT_OPTIONS,
                                                     ("SIZE",))
     size = int(args[1])
     check_options_set(opt, ['availability_zone', 'key_name'])
     ami_ubuntu_intrepid_x86 = 'ami-ec48af85' # use a general AMI
-    Ec2Storage.create_formatted_snapshot(cluster, size,
+    service.create_formatted_snapshot(size,
                                          opt.get('availability_zone'),
                                          ami_ubuntu_intrepid_x86,
                                          opt.get('key_name'),
                                          xstr(opt.get('ssh_options')))
 
   elif command == 'list-storage':
-    (opt, args, cluster) = parse_options_and_config(command, BASIC_OPTIONS)
-    storage = cluster.get_storage()
-    storage.print_status(commands.ROLES)
+    (opt, args, service) = parse_options_and_config(command, BASIC_OPTIONS)
+    service.list_storage()
 
   elif command == 'create-storage':
-    (opt, args, cluster) = parse_options_and_config(command, PLACEMENT_OPTIONS,
+    (opt, args, service) = parse_options_and_config(command, PLACEMENT_OPTIONS,
                                                     ("ROLE", "NUM_INSTANCES",
                                                      "SPEC_FILE"))
-    storage = cluster.get_storage()
     role = args[1]
     number_of_instances = int(args[2])
     spec_file = args[3]
     check_options_set(opt, ['availability_zone'])
-    storage.create(role, number_of_instances, opt.get('availability_zone'),
-                   spec_file)
-    storage.print_status(commands.ROLES)
+    service.create_storage(role, number_of_instances,
+                           opt.get('availability_zone'), spec_file)
 
   elif command == 'attach-storage':
-    (opt, args, cluster) = parse_options_and_config(command, BASIC_OPTIONS,
+    (opt, args, service) = parse_options_and_config(command, BASIC_OPTIONS,
                                                     ("ROLE",))
-    storage = cluster.get_storage()
-    role = args[1]
-    storage.attach(role, cluster.get_instances_in_role(role, 'running'))
-    storage.print_status(commands.ROLES)
+    service.attach_storage(args[1])
 
   elif command == 'delete-storage':
-    (opt, args, cluster) = parse_options_and_config(command, FORCE_OPTIONS)
-    storage = cluster.get_storage()
-    storage.print_status(commands.ROLES)
-    if not opt["force"] and not _prompt("Delete all storage volumes? THIS WILL \
-      PERMANENTLY DELETE ALL DATA"):
-      print "Not deleting storage volumes."
-    else:
-      print "Deleting storage"
-      for role in commands.ROLES:
-        storage.delete(role)
+    (opt, args, service) = parse_options_and_config(command, FORCE_OPTIONS)
+    service.delete_storage(opt["force"])
 
   elif command == 'update-slaves-file':
-    (opt, args, cluster) = parse_options_and_config(command, SSH_OPTIONS)
+    (opt, args, service) = parse_options_and_config(command, SSH_OPTIONS)
     check_options_set(opt, ['private_key'])
     ssh_options = xstr(opt.get('ssh_options'))
-    instances = cluster.check_running(commands.MASTER, 1)
-    if not instances:
-      sys.exit(1)
-    master = instances[0]
-    slaves = cluster.get_instances_in_role(commands.SLAVE)
-    with open('slaves', 'w') as f:
-      for slave in slaves:
-        f.write(slave.public_ip + "\n")
-    subprocess.call('scp %s -r %s root@%s:/etc/hadoop/conf' % \
-                    (ssh_options, 'slaves', master.public_ip), shell=True)
-
-    # Copy private key
-    private_key = opt.get('private_key')
-    subprocess.call('scp %s -r %s root@%s:/root/.ssh/id_rsa' % \
-                    (ssh_options, private_key, master.public_ip), shell=True)
-    for slave in slaves:
-      subprocess.call('scp %s -r %s root@%s:/root/.ssh/id_rsa' % \
-                      (ssh_options, private_key, slave.public_ip), shell=True)
+    config_dir = get_config_dir(opt)
+    service.update_slaves_file(config_dir, ssh_options, opt.get('private_key'))
 
   else:
     print "Unrecognized command '%s'" % command

Modified: hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/cluster.py
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/cluster.py?rev=888998&r1=888997&r2=888998&view=diff
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/cluster.py (original)
+++ hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/cluster.py Wed Dec  9 21:57:10 2009
@@ -70,7 +70,7 @@
     """
     raise Exception("Unimplemented")
 
-  def print_status(self, roles, state_filter="running"):
+  def print_status(self, roles=None, state_filter="running"):
     """
     Print the status of instances in the given roles, filtered by state.
     """
@@ -88,10 +88,10 @@
     else:
       return instances
 
-  def launch_instances(self, role, number, image_id, size_id,
+  def launch_instances(self, roles, number, image_id, size_id,
                        instance_user_data, **kwargs):
     """
-    Launch instances (of the given role) in the cluster.
+    Launch instances (having the given roles) in the cluster.
     Returns a list of IDs for the instances started.
     """
     pass

Modified: hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/providers/ec2.py
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/providers/ec2.py?rev=888998&r1=888997&r2=888998&view=diff
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/providers/ec2.py (original)
+++ hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/providers/ec2.py Wed Dec  9 21:57:10 2009
@@ -85,7 +85,7 @@
     return self.name
 
   def _check_role_name(self, role):
-    if not re.match("^[a-zA-Z0-9_]+$", role):
+    if not re.match("^[a-zA-Z0-9_+]+$", role):
       raise RoleSyntaxException("Invalid role name '%s'" % role)
 
   def _group_name_for_role(self, role):
@@ -95,9 +95,11 @@
     self._check_role_name(role)
     return "%s-%s" % (self.name, role)
 
-  def _get_group_names(self, role):
-    self._check_role_name(role)
-    return [self._get_cluster_group_name(), self._group_name_for_role(role)]
+  def _get_group_names(self, roles):
+    group_names = [self._get_cluster_group_name()]
+    for role in roles:
+      group_names.append(self._group_name_for_role(role))
+    return group_names
 
   def _get_all_group_names(self):
     security_groups = self.ec2Connection.get_all_security_groups()
@@ -111,7 +113,7 @@
     if self.name not in all_group_names:
       return r
     for group in all_group_names:
-      if re.match("^%s(-[a-zA-Z0-9_]+)?$" % self.name, group):
+      if re.match("^%s(-[a-zA-Z0-9_+]+)?$" % self.name, group):
         r.append(group)
     return r
 
@@ -198,25 +200,31 @@
       instance.state, xstr(instance.key_name), instance.instance_type,
       str(instance.launch_time), instance.placement))
 
-  def print_status(self, roles, state_filter="running"):
+  def print_status(self, roles=None, state_filter="running"):
     """
     Print the status of instances in the given roles, filtered by state.
     """
-    for role in roles:
-      for instance in self._get_instances(self._group_name_for_role(role),
+    if not roles:
+      for instance in self._get_instances(self._get_cluster_group_name(),
                                           state_filter):
-        self._print_instance(role, instance)
+        self._print_instance("", instance)
+    else:
+      for role in roles:
+        for instance in self._get_instances(self._group_name_for_role(role),
+                                            state_filter):
+          self._print_instance(role, instance)
 
-  def launch_instances(self, role, number, image_id, size_id,
+  def launch_instances(self, roles, number, image_id, size_id,
                        instance_user_data, **kwargs):
-    self._check_role_name(role)
-
-    self._create_groups(role)
+    for role in roles:
+      self._check_role_name(role)  
+      self._create_groups(role)
+      
     user_data = instance_user_data.read_as_gzip_stream()
 
     reservation = self.ec2Connection.run_instances(image_id, min_count=number,
       max_count=number, key_name=kwargs.get('key_name', None),
-      security_groups=self._get_group_names(role), user_data=user_data,
+      security_groups=self._get_group_names(roles), user_data=user_data,
       instance_type=size_id, placement=kwargs.get('placement', None))
     return [instance.id for instance in reservation.instances]
 
@@ -372,6 +380,11 @@
         return True
     return False
 
+  def get_roles(self):
+    storage_filename = self._get_storage_filename()
+    volume_manager = JsonVolumeManager(storage_filename)
+    return volume_manager.get_roles()
+  
   def _get_ec2_volumes_dict(self, mountable_volumes):
     volume_ids = [mv.volume_id for mv in sum(mountable_volumes, [])]
     volumes = self.cluster.ec2Connection.get_all_volumes(volume_ids)
@@ -386,7 +399,11 @@
                      volume.status, str(volume.create_time),
                      str(volume.attach_time)))
 
-  def print_status(self, roles):
+  def print_status(self, roles=None):
+    if roles == None:
+      storage_filename = self._get_storage_filename()
+      volume_manager = JsonVolumeManager(storage_filename)
+      roles = volume_manager.get_roles()
     for role in roles:
       mountable_volumes_list = self._get_mountable_volumes(role)
       ec2_volumes = self._get_ec2_volumes_dict(mountable_volumes_list)
@@ -441,20 +458,21 @@
         print "Attaching %s to %s" % (volume.id, instance.id)
         volume.attach(instance.id, mountable_volume.device)
 
-  def delete(self, role):
+  def delete(self, roles=[]):
     storage_filename = self._get_storage_filename()
     volume_manager = JsonVolumeManager(storage_filename)
-    mountable_volumes_list = volume_manager.get_instance_storage_for_role(role)
-    ec2_volumes = self._get_ec2_volumes_dict(mountable_volumes_list)
-    all_available = True
-    for volume in ec2_volumes.itervalues():
-      if volume.status != 'available':
-        all_available = False
-        logger.warning("Volume %s is not available.", volume)
-    if not all_available:
-      logger.warning("Some volumes are still in use for role %s.\
-        Aborting delete.", role)
-      return
-    for volume in ec2_volumes.itervalues():
-      volume.delete()
-    volume_manager.remove_instance_storage_for_role(role)
+    for role in roles:
+      mountable_volumes_list = volume_manager.get_instance_storage_for_role(role)
+      ec2_volumes = self._get_ec2_volumes_dict(mountable_volumes_list)
+      all_available = True
+      for volume in ec2_volumes.itervalues():
+        if volume.status != 'available':
+          all_available = False
+          logger.warning("Volume %s is not available.", volume)
+      if not all_available:
+        logger.warning("Some volumes are still in use for role %s.\
+          Aborting delete.", role)
+        return
+      for volume in ec2_volumes.itervalues():
+        volume.delete()
+      volume_manager.remove_instance_storage_for_role(role)

Added: hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/service.py
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/service.py?rev=888998&view=auto
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/service.py (added)
+++ hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/service.py Wed Dec  9 21:57:10 2009
@@ -0,0 +1,462 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Classes for running services on a cluster.
+"""
+
+from __future__ import with_statement
+
+from hadoop.cloud.cluster import get_cluster
+from hadoop.cloud.cluster import InstanceUserData
+from hadoop.cloud.cluster import TimeoutException
+from hadoop.cloud.providers.ec2 import Ec2Storage
+from hadoop.cloud.util import build_env_string
+from hadoop.cloud.util import url_get
+from hadoop.cloud.util import xstr
+import logging
+import os
+import re
+import socket
+import subprocess
+import sys
+import time
+
+logger = logging.getLogger(__name__)
+
+MASTER = "master"  # Deprecated.
+
+NAMENODE = "nn"
+SECONDARY_NAMENODE = "snn"
+JOBTRACKER = "jt"
+DATANODE = "dn"
+TASKTRACKER = "tt"
+
+class InstanceTemplate(object):
+  """
+  A template for creating server instances in a cluster.
+  """
+  def __init__(self, roles, number, image_id, size_id,
+                     key_name, public_key,
+                     user_data_file_template=None, placement=None,
+                     user_packages=None, auto_shutdown=None, env_strings=[]):
+    self.roles = roles
+    self.number = number
+    self.image_id = image_id
+    self.size_id = size_id
+    self.key_name = key_name
+    self.public_key = public_key
+    self.user_data_file_template = user_data_file_template
+    self.placement = placement
+    self.user_packages = user_packages
+    self.auto_shutdown = auto_shutdown
+    self.env_strings = env_strings
+
+  def add_env_strings(self, env_strings):
+    new_env_strings = list(self.env_strings or [])
+    new_env_strings.extend(env_strings)
+    self.env_strings = new_env_strings
+
+class HadoopService(object):
+  """
+  A HDFS and MapReduce service.
+  """
+  
+  def __init__(self, cluster):
+    self.cluster = cluster
+    
+  def list_all(self, provider):
+    """
+    Find and print clusters that have a running namenode instances
+    """
+    legacy_clusters = get_cluster(provider).get_clusters_with_role(MASTER)
+    clusters = get_cluster(provider).get_clusters_with_role(NAMENODE)
+    clusters.extend(legacy_clusters)
+    if not clusters:
+      print "No running clusters"
+    else:
+      for cluster in clusters:
+        print cluster
+    
+  def list(self):
+    self.cluster.print_status()
+
+  def launch_master(self, instance_template, config_dir, client_cidr):
+    if self.cluster.check_running(NAMENODE, 0) == False:
+      return  # don't proceed if another master is running
+    self.launch_cluster((instance_template,), config_dir, client_cidr)
+  
+  def launch_slaves(self, instance_template):
+    instances = self.cluster.check_running(NAMENODE, 1)
+    if not instances:
+      return
+    master = instances[0]
+    for role in (NAMENODE, SECONDARY_NAMENODE, JOBTRACKER): 
+      singleton_host_env = "%s_HOST=%s" % \
+              (self._sanitize_role_name(role), master.public_ip)
+      instance_template.add_env_strings((singleton_host_env))
+    self._launch_instances(instance_template)              
+    self._attach_storage(instance_template.roles)
+    self._print_master_url()
+      
+  def launch_cluster(self, instance_templates, config_dir, client_cidr):
+    number_of_tasktrackers = 0
+    roles = []
+    for it in instance_templates:
+      roles.extend(it.roles)
+      if TASKTRACKER in it.roles:
+        number_of_tasktrackers += it.number
+    self._launch_cluster_instances(instance_templates)
+    self._create_client_hadoop_site_file(config_dir)
+    self._authorize_client_ports(client_cidr)
+    self._attach_storage(roles)
+    try:
+      self._wait_for_hadoop(number_of_tasktrackers)
+    except TimeoutException:
+      print "Timeout while waiting for Hadoop to start. Please check logs on" +\
+        " cluster."
+    self._print_master_url()
+    
+  def login(self, ssh_options):
+    master = self._get_master()
+    if not master:
+      sys.exit(1)
+    subprocess.call('ssh %s root@%s' % \
+                    (xstr(ssh_options), master.public_ip),
+                    shell=True)
+    
+  def proxy(self, ssh_options):
+    master = self._get_master()
+    if not master:
+      sys.exit(1)
+    options = '-o "ConnectTimeout 10" -o "ServerAliveInterval 60" ' \
+              '-N -D 6666'
+    process = subprocess.Popen('ssh %s %s root@%s' %
+      (xstr(ssh_options), options, master.public_ip),
+      stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+      shell=True)
+    print """export HADOOP_CLOUD_PROXY_PID=%s;
+echo Proxy pid %s;""" % (process.pid, process.pid)
+    
+  def push(self, ssh_options, file):
+    master = self._get_master()
+    if not master:
+      sys.exit(1)
+    subprocess.call('scp %s -r %s root@%s:' % (xstr(ssh_options),
+                                               file, master.public_ip),
+                                               shell=True)
+    
+  def push(self, ssh_options, file):
+    master = self._get_master()
+    if not master:
+      sys.exit(1)
+    subprocess.call('scp %s -r %s root@%s:' % (xstr(ssh_options),
+                                               file, master.public_ip),
+                                               shell=True)
+    
+  def execute(self, ssh_options, args):
+    master = self._get_master()
+    if not master:
+      sys.exit(1)
+    subprocess.call("ssh %s root@%s '%s'" % (xstr(ssh_options),
+                                             master.public_ip,
+                                             " ".join(args)), shell=True)
+  
+  def terminate_cluster(self,  force=False):
+    self.cluster.print_status()
+    if not force and not self._prompt("Terminate all instances?"):
+      print "Not terminating cluster."
+    else:
+      print "Terminating cluster"
+      self.cluster.terminate()
+      
+  def delete_cluster(self):
+    self.cluster.delete()
+    
+  def create_formatted_snapshot(self, size, availability_zone,
+                                image_id, key_name, ssh_options):
+    Ec2Storage.create_formatted_snapshot(self.cluster, size,
+                                         availability_zone,
+                                         image_id,
+                                         key_name,
+                                         ssh_options)
+
+  def list_storage(self):
+    storage = self.cluster.get_storage()
+    storage.print_status()
+
+  def create_storage(self, role, number_of_instances,
+                     availability_zone, spec_file):
+    storage = self.cluster.get_storage()
+    storage.create(role, number_of_instances, availability_zone, spec_file)
+    storage.print_status()
+    
+  def attach_storage(self, role):
+    storage = self.cluster.get_storage()
+    storage.attach(role, self.cluster.get_instances_in_role(role, 'running'))
+    storage.print_status()
+    
+  def delete_storage(self, force=False):
+    storage = self.cluster.get_storage()
+    storage.print_status()
+    if not force and not self._prompt("Delete all storage volumes? THIS WILL \
+      PERMANENTLY DELETE ALL DATA"):
+      print "Not deleting storage volumes."
+    else:
+      print "Deleting storage"
+      for role in storage.get_roles():
+        storage.delete(role)
+        
+  def update_slaves_file(self, config_dir, ssh_options, private_key):
+    instances = self.cluster.check_running(NAMENODE, 1)
+    if not instances:
+      sys.exit(1)
+    master = instances[0]
+    slaves = self.cluster.get_instances_in_role(DATANODE, "running")
+    cluster_dir = os.path.join(config_dir, self.cluster.name)
+    slaves_file = os.path.join(cluster_dir, 'slaves')
+    with open(slaves_file, 'w') as f:
+      for slave in slaves:
+        f.write(slave.public_ip + "\n")
+    subprocess.call('scp %s -r %s root@%s:/etc/hadoop/conf' % \
+                    (ssh_options, slaves_file, master.public_ip), shell=True)
+    # Copy private key
+    subprocess.call('scp %s -r %s root@%s:/root/.ssh/id_rsa' % \
+                    (ssh_options, private_key, master.public_ip), shell=True)
+    for slave in slaves:
+      subprocess.call('scp %s -r %s root@%s:/root/.ssh/id_rsa' % \
+                      (ssh_options, private_key, slave.public_ip), shell=True)
+        
+  def _prompt(self, prompt):
+    """ Returns true if user responds "yes" to prompt. """
+    return raw_input("%s [yes or no]: " % prompt).lower() == "yes"
+  
+
+  def _get_default_user_data_file_template(self):
+    return os.path.join(sys.path[0], 'hadoop-%s-init-remote.sh' %
+                 self.cluster.get_provider_code())
+  
+  def _get_master(self):
+    # For split namenode/jobtracker, designate the namenode as the master
+    return self._get_namenode()
+
+  def _get_namenode(self):
+    instances = self.cluster.get_instances_in_role(NAMENODE, "running")
+    if not instances:
+      return None
+    return instances[0]
+
+  def _get_jobtracker(self):
+    instances = self.cluster.get_instances_in_role(JOBTRACKER, "running")
+    if not instances:
+      return None
+    return instances[0]
+
+  def _launch_cluster_instances(self, instance_templates):
+    singleton_hosts = []
+    for instance_template in instance_templates:
+      instance_template.add_env_strings(singleton_hosts)
+      instances = self._launch_instances(instance_template)
+      if instance_template.number == 1:
+        if len(instances) != 1:
+          logger.error("Expected a single '%s' instance, but found %s.",
+                       "".join(instance_template.roles), len(instances))
+          return
+        else:
+          for role in instance_template.roles:
+            singleton_host_env = "%s_HOST=%s" % \
+              (self._sanitize_role_name(role),
+               instances[0].public_ip)
+            singleton_hosts.append(singleton_host_env)
+
+  def _sanitize_role_name(self, role):
+    """Replace characters in role name with ones allowed in bash variable names"""
+    return role.replace('+', '_').upper()
+
+  def _launch_instances(self, instance_template):
+    it = instance_template
+    user_data_file_template = it.user_data_file_template
+    if it.user_data_file_template == None:
+      user_data_file_template = self._get_default_user_data_file_template()
+    ebs_mappings = ''
+    storage = self.cluster.get_storage()
+    for role in it.roles:
+      if storage.has_any_storage((role,)):
+        ebs_mappings = storage.get_mappings_string_for_role(role)
+    replacements = { "%ENV%": build_env_string(it.env_strings, {
+      "ROLES": ",".join(it.roles),
+      "USER_PACKAGES": it.user_packages,
+      "AUTO_SHUTDOWN": it.auto_shutdown,
+      "EBS_MAPPINGS": ebs_mappings,
+    }) }
+    instance_user_data = InstanceUserData(user_data_file_template, replacements)
+    instance_ids = self.cluster.launch_instances(it.roles, it.number, it.image_id,
+                                            it.size_id,
+                                            instance_user_data,
+                                            key_name=it.key_name,
+                                            public_key=it.public_key,
+                                            placement=it.placement)
+    print "Waiting for %s instances in role %s to start" % \
+      (it.number, ",".join(it.roles))
+    try:
+      self.cluster.wait_for_instances(instance_ids)
+      print "%s instances started" % ",".join(it.roles)
+    except TimeoutException:
+      print "Timeout while waiting for %s instance to start." % ",".join(it.roles)
+      return
+    print
+    self.cluster.print_status(it.roles[0])
+    return self.cluster.get_instances_in_role(it.roles[0], "running")
+
+  def _authorize_client_ports(self, client_cidrs=[]):
+    if not client_cidrs:
+      logger.debug("No client CIDRs specified, using local address.")
+      client_ip = url_get('http://checkip.amazonaws.com/').strip()
+      client_cidrs = ("%s/32" % client_ip,)
+    logger.debug("Client CIDRs: %s", client_cidrs)
+    namenode = self._get_namenode()
+    jobtracker = self._get_jobtracker()
+    for client_cidr in client_cidrs:
+      # Allow access to port 80 on namenode from client
+      self.cluster.authorize_role(NAMENODE, 80, 80, client_cidr)
+      # Allow access to jobtracker UI on master from client
+      # (so we can see when the cluster is ready)
+      self.cluster.authorize_role(JOBTRACKER, 50030, 50030, client_cidr)
+    # Allow access to namenode and jobtracker via public address from each other
+    namenode_ip = socket.gethostbyname(namenode.public_ip)
+    jobtracker_ip = socket.gethostbyname(jobtracker.public_ip)
+    self.cluster.authorize_role(NAMENODE, 8020, 8020, "%s/32" % namenode_ip)
+    self.cluster.authorize_role(NAMENODE, 8020, 8020, "%s/32" % jobtracker_ip)
+    self.cluster.authorize_role(JOBTRACKER, 8021, 8021, "%s/32" % namenode_ip)
+    self.cluster.authorize_role(JOBTRACKER, 8021, 8021,
+                                "%s/32" % jobtracker_ip)
+  
+  def _create_client_hadoop_site_file(self, config_dir):
+    namenode = self._get_namenode()
+    jobtracker = self._get_jobtracker()
+    cluster_dir = os.path.join(config_dir, self.cluster.name)
+    aws_access_key_id = os.environ['AWS_ACCESS_KEY_ID']
+    aws_secret_access_key = os.environ['AWS_SECRET_ACCESS_KEY']
+    if not os.path.exists(cluster_dir):
+      os.makedirs(cluster_dir)
+    with open(os.path.join(cluster_dir, 'hadoop-site.xml'), 'w') as f:
+      f.write("""<?xml version="1.0"?>
+  <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+  <!-- Put site-specific property overrides in this file. -->
+  <configuration>
+  <property>
+    <name>hadoop.job.ugi</name>
+    <value>root,root</value>
+  </property>
+  <property>
+    <name>fs.default.name</name>
+    <value>hdfs://%(namenode)s:8020/</value>
+  </property>
+  <property>
+    <name>mapred.job.tracker</name>
+    <value>%(jobtracker)s:8021</value>
+  </property>
+  <property>
+    <name>hadoop.socks.server</name>
+    <value>localhost:6666</value>
+  </property>
+  <property>
+    <name>hadoop.rpc.socket.factory.class.default</name>
+    <value>org.apache.hadoop.net.SocksSocketFactory</value>
+  </property>
+  <property>
+    <name>fs.s3.awsAccessKeyId</name>
+    <value>%(aws_access_key_id)s</value>
+  </property>
+  <property>
+    <name>fs.s3.awsSecretAccessKey</name>
+    <value>%(aws_secret_access_key)s</value>
+  </property>
+  <property>
+    <name>fs.s3n.awsAccessKeyId</name>
+    <value>%(aws_access_key_id)s</value>
+  </property>
+  <property>
+    <name>fs.s3n.awsSecretAccessKey</name>
+    <value>%(aws_secret_access_key)s</value>
+  </property>
+  </configuration>
+  """ % {'namenode': namenode.public_ip,
+    'jobtracker': jobtracker.public_ip,
+    'aws_access_key_id': aws_access_key_id,
+    'aws_secret_access_key': aws_secret_access_key})        
+
+  def _wait_for_hadoop(self, number, timeout=600):
+    start_time = time.time()
+    jobtracker = self._get_jobtracker()
+    if not jobtracker:
+      return
+    print "Waiting for jobtracker to start"
+    previous_running = 0
+    while True:
+      if (time.time() - start_time >= timeout):
+        raise TimeoutException()
+      try:
+        actual_running = self._number_of_tasktrackers(jobtracker.public_ip, 1)
+        break
+      except IOError:
+        pass
+      sys.stdout.write(".")
+      sys.stdout.flush()
+      time.sleep(1)
+    print
+    if number > 0:
+      print "Waiting for %d tasktrackers to start" % number
+      while actual_running < number:
+        if (time.time() - start_time >= timeout):
+          raise TimeoutException()
+        try:
+          actual_running = self._number_of_tasktrackers(jobtracker.public_ip, 5, 2)
+          if actual_running != previous_running:
+            sys.stdout.write("%d" % actual_running)
+          sys.stdout.write(".")
+          sys.stdout.flush()
+          time.sleep(1)
+          previous_running = actual_running
+        except IOError:
+          pass
+      print
+
+  # The optional ?type=active is a difference between Hadoop 0.18 and 0.20
+  _NUMBER_OF_TASK_TRACKERS = re.compile(
+    r'<a href="machines.jsp(?:\?type=active)?">(\d+)</a>')
+  
+  def _number_of_tasktrackers(self, jt_hostname, timeout, retries=0):
+    jt_page = url_get("http://%s:50030/jobtracker.jsp" % jt_hostname, timeout,
+                      retries)
+    m = self._NUMBER_OF_TASK_TRACKERS.search(jt_page)
+    if m:
+      return int(m.group(1))
+    return 0
+
+  def _print_master_url(self):
+    webserver = self._get_jobtracker()
+    if not webserver:
+      return
+    print "Browse the cluster at http://%s/" % webserver.public_ip
+
+  def _attach_storage(self, roles):
+    storage = self.cluster.get_storage()
+    if storage.has_any_storage(roles):
+      print "Waiting 10 seconds before attaching storage"
+      time.sleep(10)
+      for role in roles:
+        storage.attach(role, self.cluster.get_instances_in_role(role, 'running'))
+      storage.print_status(roles)
\ No newline at end of file

Modified: hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/storage.py
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/storage.py?rev=888998&r1=888997&r2=888998&view=diff
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/storage.py (original)
+++ hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/storage.py Wed Dec  9 21:57:10 2009
@@ -81,6 +81,10 @@
 
   def _store(self, obj):
     return json.dump(obj, open(self.filename, "w"), sort_keys=True, indent=2)
+  
+  def get_roles(self):
+    json_dict = self._load()
+    return json_dict.keys()
 
   def add_instance_storage_for_role(self, role, mountable_volumes):
     json_dict = self._load()
@@ -142,7 +146,13 @@
     """
     return False
 
-  def print_status(self, roles):
+  def get_roles(self):
+    """
+    Return a list of roles that have storage defined.
+    """
+    return []
+
+  def print_status(self, roles=None):
     """
     Print the status of storage volumes for the given roles.
     """
@@ -156,8 +166,8 @@
     """
     pass
 
-  def delete(self, role):
+  def delete(self, roles=[]):
     """
-    Permanently delete all the storage for a role.
+    Permanently delete all the storage for the given roles.
     """
     pass

Modified: hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testcluster.py
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testcluster.py?rev=888998&r1=888997&r2=888998&view=diff
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testcluster.py (original)
+++ hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testcluster.py Wed Dec  9 21:57:10 2009
@@ -27,7 +27,7 @@
   def test_check_role_name_valid(self):
     cluster = Ec2Cluster("test-cluster", None)
     cluster._check_role_name(
-      "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_")
+      "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_+")
 
   def test_check_role_name_dash_is_invalid(self):
     cluster = Ec2Cluster("test-cluster", None)

Modified: hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/teststorage.py
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/teststorage.py?rev=888998&r1=888997&r2=888998&view=diff
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/teststorage.py (original)
+++ hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/teststorage.py Wed Dec  9 21:57:10 2009
@@ -85,6 +85,7 @@
     volume_manager = JsonVolumeManager("volumemanagertest.json")
     self.assertEqual(0,
       len(volume_manager.get_instance_storage_for_role("master")))
+    self.assertEqual(0, len(volume_manager.get_roles))
 
     volume_manager.add_instance_storage_for_role("master",
                                                  [MountableVolume("vol_1", "/",
@@ -131,6 +132,11 @@
     self.assertEqual("vol_4", slave_storage_instance1_vol1.volume_id)
     self.assertEqual("/data1", slave_storage_instance1_vol1.mount_point)
     self.assertEqual("/dev/sdk", slave_storage_instance1_vol1.device)
+    
+    roles = volume_manager.get_roles
+    self.assertEqual(2, len(roles))
+    self.assertTrue("slave" in roles)
+    self.assertTrue("master" in roles)
 
 
 if __name__ == '__main__':



Mime
View raw message