hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r885888 [2/2] - in /hadoop/common/trunk: ./ src/contrib/cloud/ src/contrib/cloud/src/ src/contrib/cloud/src/integration-test/ src/contrib/cloud/src/py/ src/contrib/cloud/src/py/hadoop/ src/contrib/cloud/src/py/hadoop/cloud/ src/contrib/clou...
Date Tue, 01 Dec 2009 19:33:12 GMT
Added: hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/providers/ec2.py
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/providers/ec2.py?rev=885888&view=auto
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/providers/ec2.py (added)
+++ hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/providers/ec2.py Tue Dec  1
19:33:10 2009
@@ -0,0 +1,460 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from boto.ec2.connection import EC2Connection
+from boto.exception import EC2ResponseError
+import logging
+from hadoop.cloud.cluster import Cluster
+from hadoop.cloud.cluster import Instance
+from hadoop.cloud.cluster import RoleSyntaxException
+from hadoop.cloud.cluster import TimeoutException
+from hadoop.cloud.storage import JsonVolumeManager
+from hadoop.cloud.storage import JsonVolumeSpecManager
+from hadoop.cloud.storage import MountableVolume
+from hadoop.cloud.storage import Storage
+from hadoop.cloud.util import xstr
+import os
+import re
+import subprocess
+import sys
+import time
+
+logger = logging.getLogger(__name__)
+
+def _run_command_on_instance(instance, ssh_options, command):
+  print "Running ssh %s root@%s '%s'" % \
+    (ssh_options, instance.public_dns_name, command)
+  retcode = subprocess.call("ssh %s root@%s '%s'" %
+                           (ssh_options, instance.public_dns_name, command),
+                           shell=True)
+  print "Command running on %s returned with value %s" % \
+    (instance.public_dns_name, retcode)
+
+def _wait_for_volume(ec2_connection, volume_id):
+  """
+  Waits until a volume becomes available.
+  """
+  while True:
+    volumes = ec2_connection.get_all_volumes([volume_id,])
+    if volumes[0].status == 'available':
+      break
+    sys.stdout.write(".")
+    sys.stdout.flush()
+    time.sleep(1)
+
+class Ec2Cluster(Cluster):
+  """
+  A cluster of EC2 instances. A cluster has a unique name.
+
+  Instances running in the cluster run in a security group with the cluster's
+  name, and also a name indicating the instance's role, e.g. <cluster-name>-foo
+  to show a "foo" instance.
+  """
+
+  @staticmethod
+  def get_clusters_with_role(role, state="running"):
+    all_instances = EC2Connection().get_all_instances()
+    clusters = []
+    for res in all_instances:
+      instance = res.instances[0]
+      for group in res.groups:
+        if group.id.endswith("-" + role) and instance.state == state:
+          clusters.append(re.sub("-%s$" % re.escape(role), "", group.id))
+    return clusters
+
+  def __init__(self, name, config_dir):
+    super(Ec2Cluster, self).__init__(name, config_dir)
+    self.ec2Connection = EC2Connection()
+
+  def get_provider_code(self):
+    return "ec2"
+
+  def _get_cluster_group_name(self):
+    return self.name
+
+  def _check_role_name(self, role):
+    if not re.match("^[a-zA-Z0-9_]+$", role):
+      raise RoleSyntaxException("Invalid role name '%s'" % role)
+
+  def _group_name_for_role(self, role):
+    """
+    Return the security group name for an instance in a given role.
+    """
+    self._check_role_name(role)
+    return "%s-%s" % (self.name, role)
+
+  def _get_group_names(self, role):
+    self._check_role_name(role)
+    return [self._get_cluster_group_name(), self._group_name_for_role(role)]
+
+  def _get_all_group_names(self):
+    security_groups = self.ec2Connection.get_all_security_groups()
+    security_group_names = \
+      [security_group.name for security_group in security_groups]
+    return security_group_names
+
+  def _get_all_group_names_for_cluster(self):
+    all_group_names = self._get_all_group_names()
+    r = []
+    if self.name not in all_group_names:
+      return r
+    for group in all_group_names:
+      if re.match("^%s(-[a-zA-Z0-9_]+)?$" % self.name, group):
+        r.append(group)
+    return r
+
+  def _create_groups(self, role):
+    """
+    Create the security groups for a given role, including a group for the
+    cluster if it doesn't exist.
+    """
+    self._check_role_name(role)
+    security_group_names = self._get_all_group_names()
+
+    cluster_group_name = self._get_cluster_group_name()
+    if not cluster_group_name in security_group_names:
+      self.ec2Connection.create_security_group(cluster_group_name,
+                                               "Cluster (%s)" % (self.name))
+      self.ec2Connection.authorize_security_group(cluster_group_name,
+                                                  cluster_group_name)
+      # Allow SSH from anywhere
+      self.ec2Connection.authorize_security_group(cluster_group_name,
+                                                  ip_protocol="tcp",
+                                                  from_port=22, to_port=22,
+                                                  cidr_ip="0.0.0.0/0")
+
+    role_group_name = self._group_name_for_role(role)
+    if not role_group_name in security_group_names:
+      self.ec2Connection.create_security_group(role_group_name,
+        "Role %s (%s)" % (role, self.name))
+
+  def authorize_role(self, role, from_port, to_port, cidr_ip):
+    """
+    Authorize access to machines in a given role from a given network.
+    """
+    self._check_role_name(role)
+    role_group_name = self._group_name_for_role(role)
+    # Revoke first to avoid InvalidPermission.Duplicate error
+    self.ec2Connection.revoke_security_group(role_group_name,
+                                             ip_protocol="tcp",
+                                             from_port=from_port,
+                                             to_port=to_port, cidr_ip=cidr_ip)
+    self.ec2Connection.authorize_security_group(role_group_name,
+                                                ip_protocol="tcp",
+                                                from_port=from_port,
+                                                to_port=to_port,
+                                                cidr_ip=cidr_ip)
+
+  def _get_instances(self, group_name, state_filter=None):
+    """
+    Get all the instances in a group, filtered by state.
+
+    @param group_name: the name of the group
+    @param state_filter: the state that the instance should be in
+      (e.g. "running"), or None for all states
+    """
+    all_instances = self.ec2Connection.get_all_instances()
+    instances = []
+    for res in all_instances:
+      for group in res.groups:
+        if group.id == group_name:
+          for instance in res.instances:
+            if state_filter == None or instance.state == state_filter:
+              instances.append(instance)
+    return instances
+
+  def get_instances_in_role(self, role, state_filter=None):
+    """
+    Get all the instances in a role, filtered by state.
+
+    @param role: the name of the role
+    @param state_filter: the state that the instance should be in
+      (e.g. "running"), or None for all states
+    """
+    self._check_role_name(role)
+    instances = []
+    for instance in self._get_instances(self._group_name_for_role(role),
+                                        state_filter):
+      instances.append(Instance(instance.id, instance.dns_name,
+                                instance.private_dns_name))
+    return instances
+
+  def _print_instance(self, role, instance):
+    print "\t".join((role, instance.id,
+      instance.image_id,
+      instance.dns_name, instance.private_dns_name,
+      instance.state, xstr(instance.key_name), instance.instance_type,
+      str(instance.launch_time), instance.placement))
+
+  def print_status(self, roles, state_filter="running"):
+    """
+    Print the status of instances in the given roles, filtered by state.
+    """
+    for role in roles:
+      for instance in self._get_instances(self._group_name_for_role(role),
+                                          state_filter):
+        self._print_instance(role, instance)
+
+  def launch_instances(self, role, number, image_id, size_id,
+                       instance_user_data, **kwargs):
+    self._check_role_name(role)
+
+    self._create_groups(role)
+    user_data = instance_user_data.read_as_gzip_stream()
+
+    reservation = self.ec2Connection.run_instances(image_id, min_count=number,
+      max_count=number, key_name=kwargs.get('key_name', None),
+      security_groups=self._get_group_names(role), user_data=user_data,
+      instance_type=size_id, placement=kwargs.get('placement', None))
+    return [instance.id for instance in reservation.instances]
+
+  def wait_for_instances(self, instance_ids, timeout=600):
+    start_time = time.time()
+    while True:
+      if (time.time() - start_time >= timeout):
+        raise TimeoutException()
+      try:
+        if self._all_started(self.ec2Connection.get_all_instances(instance_ids)):
+          break
+      # don't timeout for race condition where instance is not yet registered
+      except EC2ResponseError:
+        pass
+      sys.stdout.write(".")
+      sys.stdout.flush()
+      time.sleep(1)
+
+  def _all_started(self, reservations):
+    for res in reservations:
+      for instance in res.instances:
+        if instance.state != "running":
+          return False
+    return True
+
+  def terminate(self):
+    instances = self._get_instances(self._get_cluster_group_name(), "running")
+    if instances:
+      self.ec2Connection.terminate_instances([i.id for i in instances])
+
+  def delete(self):
+    """
+    Delete the security groups for each role in the cluster, and the group for
+    the cluster.
+    """
+    group_names = self._get_all_group_names_for_cluster()
+    for group in group_names:
+      self.ec2Connection.delete_security_group(group)
+
+  def get_storage(self):
+    """
+    Return the external storage for the cluster.
+    """
+    return Ec2Storage(self)
+
+
+class Ec2Storage(Storage):
+  """
+  Storage volumes for an EC2 cluster. The storage is associated with a named
+  cluster. Metadata for the storage volumes is kept in a JSON file on the client
+  machine (in a file called "ec2-storage-<cluster-name>.json" in the
+  configuration directory).
+  """
+
+  @staticmethod
+  def create_formatted_snapshot(cluster, size, availability_zone, image_id,
+                                key_name, ssh_options):
+    """
+    Creates a formatted snapshot of a given size. This saves having to format
+    volumes when they are first attached.
+    """
+    conn = cluster.ec2Connection
+    print "Starting instance"
+    reservation = conn.run_instances(image_id, key_name=key_name,
+                                     placement=availability_zone)
+    instance = reservation.instances[0]
+    try:
+      cluster.wait_for_instances([instance.id,])
+      print "Started instance %s" % instance.id
+    except TimeoutException:
+      print "Timeout"
+      return
+    print
+    print "Waiting 60 seconds before attaching storage"
+    time.sleep(60)
+    # Re-populate instance object since it has more details filled in
+    instance.update()
+
+    print "Creating volume of size %s in %s" % (size, availability_zone)
+    volume = conn.create_volume(size, availability_zone)
+    print "Created volume %s" % volume
+    print "Attaching volume to %s" % instance.id
+    volume.attach(instance.id, '/dev/sdj')
+
+    _run_command_on_instance(instance, ssh_options, """
+      while true ; do
+        echo 'Waiting for /dev/sdj...';
+        if [ -e /dev/sdj ]; then break; fi;
+        sleep 1;
+      done;
+      mkfs.ext3 -F -m 0.5 /dev/sdj
+    """)
+
+    print "Detaching volume"
+    conn.detach_volume(volume.id, instance.id)
+    print "Creating snapshot"
+    snapshot = volume.create_snapshot()
+    print "Created snapshot %s" % snapshot.id
+    _wait_for_volume(conn, volume.id)
+    print
+    print "Deleting volume"
+    volume.delete()
+    print "Deleted volume"
+    print "Stopping instance"
+    terminated = conn.terminate_instances([instance.id,])
+    print "Stopped instance %s" % terminated
+
+  def __init__(self, cluster):
+    super(Ec2Storage, self).__init__(cluster)
+    self.config_dir = cluster.config_dir
+
+  def _get_storage_filename(self):
+    return os.path.join(self.config_dir,
+                        "ec2-storage-%s.json" % (self.cluster.name))
+
+  def create(self, role, number_of_instances, availability_zone, spec_filename):
+    spec_file = open(spec_filename, 'r')
+    volume_spec_manager = JsonVolumeSpecManager(spec_file)
+    volume_manager = JsonVolumeManager(self._get_storage_filename())
+    for dummy in range(number_of_instances):
+      mountable_volumes = []
+      volume_specs = volume_spec_manager.volume_specs_for_role(role)
+      for spec in volume_specs:
+        logger.info("Creating volume of size %s in %s from snapshot %s" % \
+                    (spec.size, availability_zone, spec.snapshot_id))
+        volume = self.cluster.ec2Connection.create_volume(spec.size,
+                                                          availability_zone,
+                                                          spec.snapshot_id)
+        mountable_volumes.append(MountableVolume(volume.id, spec.mount_point,
+                                                 spec.device))
+      volume_manager.add_instance_storage_for_role(role, mountable_volumes)
+
+  def _get_mountable_volumes(self, role):
+    storage_filename = self._get_storage_filename()
+    volume_manager = JsonVolumeManager(storage_filename)
+    return volume_manager.get_instance_storage_for_role(role)
+
+  def get_mappings_string_for_role(self, role):
+    mappings = {}
+    mountable_volumes_list = self._get_mountable_volumes(role)
+    for mountable_volumes in mountable_volumes_list:
+      for mountable_volume in mountable_volumes:
+        mappings[mountable_volume.mount_point] = mountable_volume.device
+    return ";".join(["%s,%s" % (mount_point, device) for (mount_point, device)
+                     in mappings.items()])
+
+  def _has_storage(self, role):
+    return self._get_mountable_volumes(role)
+
+  def has_any_storage(self, roles):
+    for role in roles:
+      if self._has_storage(role):
+        return True
+    return False
+
+  def _get_ec2_volumes_dict(self, mountable_volumes):
+    volume_ids = [mv.volume_id for mv in sum(mountable_volumes, [])]
+    volumes = self.cluster.ec2Connection.get_all_volumes(volume_ids)
+    volumes_dict = {}
+    for volume in volumes:
+      volumes_dict[volume.id] = volume
+    return volumes_dict
+
+  def _print_volume(self, role, volume):
+    print "\t".join((role, volume.id, str(volume.size),
+                     volume.snapshot_id, volume.availabilityZone,
+                     volume.status, str(volume.create_time),
+                     str(volume.attach_time)))
+
+  def print_status(self, roles):
+    for role in roles:
+      mountable_volumes_list = self._get_mountable_volumes(role)
+      ec2_volumes = self._get_ec2_volumes_dict(mountable_volumes_list)
+      for mountable_volumes in mountable_volumes_list:
+        for mountable_volume in mountable_volumes:
+          self._print_volume(role, ec2_volumes[mountable_volume.volume_id])
+
+  def _replace(self, string, replacements):
+    for (match, replacement) in replacements.iteritems():
+      string = string.replace(match, replacement)
+    return string
+
+  def attach(self, role, instances):
+    mountable_volumes_list = self._get_mountable_volumes(role)
+    if not mountable_volumes_list:
+      return
+    ec2_volumes = self._get_ec2_volumes_dict(mountable_volumes_list)
+
+    available_mountable_volumes_list = []
+
+    available_instances_dict = {}
+    for instance in instances:
+      available_instances_dict[instance.id] = instance
+
+    # Iterate over mountable_volumes and retain those that are not attached
+    # Also maintain a list of instances that have no attached storage
+    # Note that we do not fill in "holes" (instances that only have some of
+    # their storage attached)
+    for mountable_volumes in mountable_volumes_list:
+      available = True
+      for mountable_volume in mountable_volumes:
+        if ec2_volumes[mountable_volume.volume_id].status != 'available':
+          available = False
+          attach_data = ec2_volumes[mountable_volume.volume_id].attach_data
+          instance_id = attach_data.instance_id
+          if available_instances_dict.has_key(instance_id):
+            del available_instances_dict[instance_id]
+      if available:
+        available_mountable_volumes_list.append(mountable_volumes)
+
+    if len(available_instances_dict) != len(available_mountable_volumes_list):
+      logger.warning("Number of available instances (%s) and volumes (%s) \
+        do not match." \
+        % (len(available_instances_dict),
+           len(available_mountable_volumes_list)))
+
+    for (instance, mountable_volumes) in zip(available_instances_dict.values(),
+                                             available_mountable_volumes_list):
+      print "Attaching storage to %s" % instance.id
+      for mountable_volume in mountable_volumes:
+        volume = ec2_volumes[mountable_volume.volume_id]
+        print "Attaching %s to %s" % (volume.id, instance.id)
+        volume.attach(instance.id, mountable_volume.device)
+
+  def delete(self, role):
+    storage_filename = self._get_storage_filename()
+    volume_manager = JsonVolumeManager(storage_filename)
+    mountable_volumes_list = volume_manager.get_instance_storage_for_role(role)
+    ec2_volumes = self._get_ec2_volumes_dict(mountable_volumes_list)
+    all_available = True
+    for volume in ec2_volumes.itervalues():
+      if volume.status != 'available':
+        all_available = False
+        logger.warning("Volume %s is not available.", volume)
+    if not all_available:
+      logger.warning("Some volumes are still in use for role %s.\
+        Aborting delete.", role)
+      return
+    for volume in ec2_volumes.itervalues():
+      volume.delete()
+    volume_manager.remove_instance_storage_for_role(role)

Added: hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/storage.py
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/storage.py?rev=885888&view=auto
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/storage.py (added)
+++ hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/storage.py Tue Dec  1 19:33:10
2009
@@ -0,0 +1,163 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Classes for controlling external cluster storage.
+"""
+
+import logging
+import simplejson as json
+
+logger = logging.getLogger(__name__)
+
+class VolumeSpec(object):
+  """
+  The specification for a storage volume, encapsulating all the information
+  needed to create a volume and ultimately mount it on an instance.
+  """
+  def __init__(self, size, mount_point, device, snapshot_id):
+    self.size = size
+    self.mount_point = mount_point
+    self.device = device
+    self.snapshot_id = snapshot_id
+
+
+class JsonVolumeSpecManager(object):
+  """
+  A container for VolumeSpecs. This object can read VolumeSpecs specified in
+  JSON.
+  """
+  def __init__(self, spec_file):
+    self.spec = json.load(spec_file)
+
+  def volume_specs_for_role(self, role):
+    return [VolumeSpec(d["size_gb"], d["mount_point"], d["device"],
+                       d["snapshot_id"]) for d in self.spec[role]]
+
+  def get_mappings_string_for_role(self, role):
+    """
+    Returns a short string of the form
+    "mount_point1,device1;mount_point2,device2;..."
+    which is useful for passing as an environment variable.
+    """
+    return ";".join(["%s,%s" % (d["mount_point"], d["device"])
+                     for d in self.spec[role]])
+
+
+class MountableVolume(object):
+  """
+  A storage volume that has been created. It may or may not have been attached
+  or mounted to an instance.
+  """
+  def __init__(self, volume_id, mount_point, device):
+    self.volume_id = volume_id
+    self.mount_point = mount_point
+    self.device = device
+
+
+class JsonVolumeManager(object):
+
+  def __init__(self, filename):
+    self.filename = filename
+
+  def _load(self):
+    try:
+      return json.load(open(self.filename, "r"))
+    except IOError:
+      logger.debug("File %s does not exist.", self.filename)
+      return {}
+
+  def _store(self, obj):
+    return json.dump(obj, open(self.filename, "w"), sort_keys=True, indent=2)
+
+  def add_instance_storage_for_role(self, role, mountable_volumes):
+    json_dict = self._load()
+    mv_dicts = [mv.__dict__ for mv in mountable_volumes]
+    json_dict.setdefault(role, []).append(mv_dicts)
+    self._store(json_dict)
+
+  def remove_instance_storage_for_role(self, role):
+    json_dict = self._load()
+    del json_dict[role]
+    self._store(json_dict)
+
+  def get_instance_storage_for_role(self, role):
+    """
+    Returns a list of lists of MountableVolume objects. Each nested list is
+    the storage for one instance.
+    """
+    try:
+      json_dict = self._load()
+      instance_storage = []
+      for instance in json_dict[role]:
+        vols = []
+        for vol in instance:
+          vols.append(MountableVolume(vol["volume_id"], vol["mount_point"],
+                                      vol["device"]))
+        instance_storage.append(vols)
+      return instance_storage
+    except KeyError:
+      return []
+
+class Storage(object):
+  """
+  Storage volumes for a cluster. The storage is associated with a named
+  cluster. Many clusters just have local storage, in which case this is
+  not used.
+  """
+
+  def __init__(self, cluster):
+    self.cluster = cluster
+
+  def create(self, role, number_of_instances, availability_zone, spec_filename):
+    """
+    Create new storage volumes for instances with the given role, according to
+    the mapping defined in the spec file.
+    """
+    pass
+
+  def get_mappings_string_for_role(self, role):
+    """
+    Returns a short string of the form
+    "mount_point1,device1;mount_point2,device2;..."
+    which is useful for passing as an environment variable.
+    """
+    raise Exception("Unimplemented")
+
+  def has_any_storage(self, roles):
+    """
+    Return True if any of the given roles has associated storage
+    """
+    return False
+
+  def print_status(self, roles):
+    """
+    Print the status of storage volumes for the given roles.
+    """
+    pass
+
+  def attach(self, role, instances):
+    """
+    Attach volumes for a role to instances. Some volumes may already be
+    attached, in which case they are ignored, and we take care not to attach
+    multiple volumes to an instance.
+    """
+    pass
+
+  def delete(self, role):
+    """
+    Permanently delete all the storage for a role.
+    """
+    pass

Added: hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/util.py
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/util.py?rev=885888&view=auto
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/util.py (added)
+++ hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/util.py Tue Dec  1 19:33:10
2009
@@ -0,0 +1,84 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Utility functions.
+"""
+
+import ConfigParser
+import socket
+import urllib2
+
+def bash_quote(text):
+  """Quotes a string for bash, by using single quotes."""
+  if text == None:
+    return ""
+  return "'%s'" % text.replace("'", "'\\''")
+
+def bash_quote_env(env):
+  """Quotes the value in an environment variable assignment."""
+  if env.find("=") == -1:
+    return env
+  (var, value) = env.split("=")
+  return "%s=%s" % (var, bash_quote(value))
+
+def build_env_string(env_strings=[], pairs={}):
+  """Build a bash environment variable assignment"""
+  env = ''
+  if env_strings:
+    for env_string in env_strings:
+      env += "%s " % bash_quote_env(env_string)
+  if pairs:
+    for key, val in pairs.items():
+      env += "%s=%s " % (key, bash_quote(val))
+  return env[:-1]
+
+def merge_config_with_options(section_name, config, options):
+  """
+  Merge configuration options with a dictionary of options.
+  Keys in the options dictionary take precedence.
+  """
+  res = {}
+  try:
+    for (key, value) in config.items(section_name):
+      if value.find("\n") != -1:
+        res[key] = value.split("\n")
+      else:
+        res[key] = value
+  except ConfigParser.NoSectionError:
+    pass
+  for key in options:
+    if options[key] != None:
+      res[key] = options[key]
+  return res
+
+def url_get(url, timeout=10, retries=0):
+  """
+  Retrieve content from the given URL.
+  """
+   # in Python 2.6 we can pass timeout to urllib2.urlopen
+  socket.setdefaulttimeout(timeout)
+  attempts = 0
+  while True:
+    try:
+      return urllib2.urlopen(url).read()
+    except urllib2.URLError:
+      attempts = attempts + 1
+      if attempts > retries:
+        raise
+
+def xstr(string):
+  """Sane string conversion: return an empty string if string is None."""
+  return '' if string is None else str(string)

Added: hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/__init__.py
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/__init__.py?rev=885888&view=auto
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/__init__.py (added)
+++ hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/__init__.py Tue Dec  1 19:33:10
2009
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
\ No newline at end of file

Added: hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/__init__.py
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/__init__.py?rev=885888&view=auto
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/__init__.py (added)
+++ hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/__init__.py Tue Dec  1 19:33:10
2009
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
\ No newline at end of file

Added: hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/alltests.py
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/alltests.py?rev=885888&view=auto
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/alltests.py (added)
+++ hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/alltests.py Tue Dec  1 19:33:10
2009
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+import unittest
+from hadoop.cloud.testcluster import TestCluster
+from hadoop.cloud.teststorage import TestJsonVolumeSpecManager
+from hadoop.cloud.teststorage import TestJsonVolumeManager
+from hadoop.cloud.testuserdata import TestInstanceUserData
+from hadoop.cloud.testutil import TestUtilFunctions
+
+def testSuite():
+  alltests = unittest.TestSuite([
+    unittest.makeSuite(TestCluster, 'test'),
+    unittest.makeSuite(TestJsonVolumeSpecManager, 'test'),
+    unittest.makeSuite(TestJsonVolumeManager, 'test'),
+    unittest.makeSuite(TestInstanceUserData, 'test'),
+    unittest.makeSuite(TestUtilFunctions, 'test'),
+  ])
+  return alltests
+
+if __name__ == "__main__":
+  runner = unittest.TextTestRunner()
+  sys.exit(not runner.run(testSuite()).wasSuccessful())

Added: hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testcluster.py
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testcluster.py?rev=885888&view=auto
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testcluster.py (added)
+++ hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testcluster.py Tue Dec  1
19:33:10 2009
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+from hadoop.cloud.cluster import RoleSyntaxException
+from hadoop.cloud.providers.ec2 import Ec2Cluster
+
+class TestCluster(unittest.TestCase):
+
+  def test_group_name_for_role(self):
+    cluster = Ec2Cluster("test-cluster", None)
+    self.assertEqual("test-cluster-foo", cluster._group_name_for_role("foo"))
+
+  def test_check_role_name_valid(self):
+    cluster = Ec2Cluster("test-cluster", None)
+    cluster._check_role_name(
+      "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_")
+
+  def test_check_role_name_dash_is_invalid(self):
+    cluster = Ec2Cluster("test-cluster", None)
+    self.assertRaises(RoleSyntaxException, cluster._check_role_name, "a-b")
+
+if __name__ == '__main__':
+  unittest.main()

Added: hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/teststorage.py
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/teststorage.py?rev=885888&view=auto
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/teststorage.py (added)
+++ hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/teststorage.py Tue Dec  1
19:33:10 2009
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import unittest
+
+import simplejson as json
+from StringIO import StringIO
+
+from hadoop.cloud.storage import MountableVolume
+from hadoop.cloud.storage import JsonVolumeManager
+from hadoop.cloud.storage import JsonVolumeSpecManager
+
+spec = {
+ "master": ({"size_gb":"8", "mount_point":"/", "device":"/dev/sdj",
+             "snapshot_id": "snap_1"},
+            ),
+ "slave": ({"size_gb":"8", "mount_point":"/", "device":"/dev/sdj",
+            "snapshot_id": "snap_2"},
+           {"size_gb":"10", "mount_point":"/data1", "device":"/dev/sdk",
+            "snapshot_id": "snap_3"},
+           )
+ }
+
+class TestJsonVolumeSpecManager(unittest.TestCase):
+
+  def test_volume_specs_for_role(self):
+
+    input = StringIO(json.dumps(spec))
+
+    volume_spec_manager = JsonVolumeSpecManager(input)
+
+    master_specs = volume_spec_manager.volume_specs_for_role("master")
+    self.assertEqual(1, len(master_specs))
+    self.assertEqual("/", master_specs[0].mount_point)
+    self.assertEqual("8", master_specs[0].size)
+    self.assertEqual("/dev/sdj", master_specs[0].device)
+    self.assertEqual("snap_1", master_specs[0].snapshot_id)
+
+    slave_specs = volume_spec_manager.volume_specs_for_role("slave")
+    self.assertEqual(2, len(slave_specs))
+    self.assertEqual("snap_2", slave_specs[0].snapshot_id)
+    self.assertEqual("snap_3", slave_specs[1].snapshot_id)
+
+    self.assertRaises(KeyError, volume_spec_manager.volume_specs_for_role,
+                      "no-such-role")
+
+  def test_get_mappings_string_for_role(self):
+
+    input = StringIO(json.dumps(spec))
+
+    volume_spec_manager = JsonVolumeSpecManager(input)
+
+    master_mappings = volume_spec_manager.get_mappings_string_for_role("master")
+    self.assertEqual("/,/dev/sdj", master_mappings)
+
+    slave_mappings = volume_spec_manager.get_mappings_string_for_role("slave")
+    self.assertEqual("/,/dev/sdj;/data1,/dev/sdk", slave_mappings)
+
+    self.assertRaises(KeyError,
+                      volume_spec_manager.get_mappings_string_for_role,
+                      "no-such-role")
+
+class TestJsonVolumeManager(unittest.TestCase):
+
+  def setUp(self):
+    try:
+      os.remove("volumemanagertest.json")
+    except OSError:
+      pass
+
+  def test_add_instance_storage_for_role(self):
+    volume_manager = JsonVolumeManager("volumemanagertest.json")
+    self.assertEqual(0,
+      len(volume_manager.get_instance_storage_for_role("master")))
+
+    volume_manager.add_instance_storage_for_role("master",
+                                                 [MountableVolume("vol_1", "/",
+                                                                  "/dev/sdj")])
+    master_storage = volume_manager.get_instance_storage_for_role("master")
+    self.assertEqual(1, len(master_storage))
+    master_storage_instance0 = master_storage[0]
+    self.assertEqual(1, len(master_storage_instance0))
+    master_storage_instance0_vol0 = master_storage_instance0[0]
+    self.assertEqual("vol_1", master_storage_instance0_vol0.volume_id)
+    self.assertEqual("/", master_storage_instance0_vol0.mount_point)
+    self.assertEqual("/dev/sdj", master_storage_instance0_vol0.device)
+
+    volume_manager.add_instance_storage_for_role("slave",
+                                                 [MountableVolume("vol_2", "/",
+                                                                  "/dev/sdj")])
+    self.assertEqual(1,
+      len(volume_manager.get_instance_storage_for_role("master")))
+    slave_storage = volume_manager.get_instance_storage_for_role("slave")
+    self.assertEqual(1, len(slave_storage))
+    slave_storage_instance0 = slave_storage[0]
+    self.assertEqual(1, len(slave_storage_instance0))
+    slave_storage_instance0_vol0 = slave_storage_instance0[0]
+    self.assertEqual("vol_2", slave_storage_instance0_vol0.volume_id)
+    self.assertEqual("/", slave_storage_instance0_vol0.mount_point)
+    self.assertEqual("/dev/sdj", slave_storage_instance0_vol0.device)
+
+    volume_manager.add_instance_storage_for_role("slave",
+      [MountableVolume("vol_3", "/", "/dev/sdj"),
+       MountableVolume("vol_4", "/data1", "/dev/sdk")])
+    self.assertEqual(1,
+      len(volume_manager.get_instance_storage_for_role("master")))
+    slave_storage = volume_manager.get_instance_storage_for_role("slave")
+    self.assertEqual(2, len(slave_storage))
+    slave_storage_instance0 = slave_storage[0]
+    slave_storage_instance1 = slave_storage[1]
+    self.assertEqual(1, len(slave_storage_instance0))
+    self.assertEqual(2, len(slave_storage_instance1))
+    slave_storage_instance1_vol0 = slave_storage_instance1[0]
+    slave_storage_instance1_vol1 = slave_storage_instance1[1]
+    self.assertEqual("vol_3", slave_storage_instance1_vol0.volume_id)
+    self.assertEqual("/", slave_storage_instance1_vol0.mount_point)
+    self.assertEqual("/dev/sdj", slave_storage_instance1_vol0.device)
+    self.assertEqual("vol_4", slave_storage_instance1_vol1.volume_id)
+    self.assertEqual("/data1", slave_storage_instance1_vol1.mount_point)
+    self.assertEqual("/dev/sdk", slave_storage_instance1_vol1.device)
+
+
+if __name__ == '__main__':
+  unittest.main()

Added: hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testuserdata.py
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testuserdata.py?rev=885888&view=auto
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testuserdata.py (added)
+++ hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testuserdata.py Tue Dec  1
19:33:10 2009
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import tempfile
+import unittest
+
+from hadoop.cloud.cluster import InstanceUserData
+
+class TestInstanceUserData(unittest.TestCase):
+
+  def test_replacement(self):
+    file = tempfile.NamedTemporaryFile()
+    file.write("Contents go here")
+    file.flush()
+    self.assertEqual("Contents go here",
+                     InstanceUserData(file.name, {}).read())
+    self.assertEqual("Contents were here",
+                     InstanceUserData(file.name, { "go": "were"}).read())
+    self.assertEqual("Contents  here",
+                     InstanceUserData(file.name, { "go": None}).read())
+    file.close()
+
+  def test_read_file_url(self):
+    file = tempfile.NamedTemporaryFile()
+    file.write("Contents go here")
+    file.flush()
+    self.assertEqual("Contents go here",
+                     InstanceUserData("file://%s" % file.name, {}).read())
+    file.close()
+
+if __name__ == '__main__':
+  unittest.main()

Added: hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testutil.py
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testutil.py?rev=885888&view=auto
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testutil.py (added)
+++ hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testutil.py Tue Dec  1 19:33:10
2009
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import ConfigParser
+import StringIO
+import unittest
+
+from hadoop.cloud.util import bash_quote
+from hadoop.cloud.util import bash_quote_env
+from hadoop.cloud.util import build_env_string
+from hadoop.cloud.util import merge_config_with_options
+from hadoop.cloud.util import xstr
+
+class TestUtilFunctions(unittest.TestCase):
+
+  def test_bash_quote(self):
+    self.assertEqual("", bash_quote(None))
+    self.assertEqual("''", bash_quote(""))
+    self.assertEqual("'a'", bash_quote("a"))
+    self.assertEqual("'a b'", bash_quote("a b"))
+    self.assertEqual("'a\b'", bash_quote("a\b"))
+    self.assertEqual("'a '\\'' b'", bash_quote("a ' b"))
+
+  def test_bash_quote_env(self):
+    self.assertEqual("", bash_quote_env(""))
+    self.assertEqual("a", bash_quote_env("a"))
+    self.assertEqual("a='b'", bash_quote_env("a=b"))
+    self.assertEqual("a='b c'", bash_quote_env("a=b c"))
+    self.assertEqual("a='b\c'", bash_quote_env("a=b\c"))
+    self.assertEqual("a='b '\\'' c'", bash_quote_env("a=b ' c"))
+
+  def test_build_env_string(self):
+    self.assertEqual("", build_env_string())
+    self.assertEqual("a='b' c='d'",
+                     build_env_string(env_strings=["a=b", "c=d"]))
+    self.assertEqual("a='b' c='d'",
+                     build_env_string(pairs={"a": "b", "c": "d"}))
+
+  def test_merge_config_with_options(self):
+    options = { "a": "b" }
+    config = ConfigParser.ConfigParser()
+    self.assertEqual({ "a": "b" },
+                     merge_config_with_options("section", config, options))
+    config.add_section("section")
+    self.assertEqual({ "a": "b" },
+                     merge_config_with_options("section", config, options))
+    config.set("section", "a", "z")
+    config.set("section", "c", "d")
+    self.assertEqual({ "a": "z", "c": "d" },
+                     merge_config_with_options("section", config, {}))
+    self.assertEqual({ "a": "b", "c": "d" },
+                     merge_config_with_options("section", config, options))
+
+  def test_merge_config_with_options_list(self):
+    config = ConfigParser.ConfigParser()
+    config.readfp(StringIO.StringIO("""[section]
+env1=a=b
+ c=d
+env2=e=f
+ g=h"""))
+    self.assertEqual({ "env1": ["a=b", "c=d"], "env2": ["e=f", "g=h"] },
+                     merge_config_with_options("section", config, {}))
+
+  def test_xstr(self):
+    self.assertEqual("", xstr(None))
+    self.assertEqual("a", xstr("a"))
+
+if __name__ == '__main__':
+  unittest.main()



Mime
View raw message