Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 81576 invoked from network); 1 Dec 2009 19:33:38 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 1 Dec 2009 19:33:38 -0000 Received: (qmail 19211 invoked by uid 500); 1 Dec 2009 19:33:37 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 19149 invoked by uid 500); 1 Dec 2009 19:33:37 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 19140 invoked by uid 99); 1 Dec 2009 19:33:37 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Dec 2009 19:33:37 +0000 X-ASF-Spam-Status: No, hits=-2.6 required=5.0 tests=AWL,BAYES_00 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Dec 2009 19:33:33 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 07D4F23889E1; Tue, 1 Dec 2009 19:33:13 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r885888 [2/2] - in /hadoop/common/trunk: ./ src/contrib/cloud/ src/contrib/cloud/src/ src/contrib/cloud/src/integration-test/ src/contrib/cloud/src/py/ src/contrib/cloud/src/py/hadoop/ src/contrib/cloud/src/py/hadoop/cloud/ src/contrib/clou... Date: Tue, 01 Dec 2009 19:33:12 -0000 To: common-commits@hadoop.apache.org From: tomwhite@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091201193313.07D4F23889E1@eris.apache.org> Added: hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/providers/ec2.py URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/providers/ec2.py?rev=885888&view=auto ============================================================================== --- hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/providers/ec2.py (added) +++ hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/providers/ec2.py Tue Dec 1 19:33:10 2009 @@ -0,0 +1,460 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from boto.ec2.connection import EC2Connection +from boto.exception import EC2ResponseError +import logging +from hadoop.cloud.cluster import Cluster +from hadoop.cloud.cluster import Instance +from hadoop.cloud.cluster import RoleSyntaxException +from hadoop.cloud.cluster import TimeoutException +from hadoop.cloud.storage import JsonVolumeManager +from hadoop.cloud.storage import JsonVolumeSpecManager +from hadoop.cloud.storage import MountableVolume +from hadoop.cloud.storage import Storage +from hadoop.cloud.util import xstr +import os +import re +import subprocess +import sys +import time + +logger = logging.getLogger(__name__) + +def _run_command_on_instance(instance, ssh_options, command): + print "Running ssh %s root@%s '%s'" % \ + (ssh_options, instance.public_dns_name, command) + retcode = subprocess.call("ssh %s root@%s '%s'" % + (ssh_options, instance.public_dns_name, command), + shell=True) + print "Command running on %s returned with value %s" % \ + (instance.public_dns_name, retcode) + +def _wait_for_volume(ec2_connection, volume_id): + """ + Waits until a volume becomes available. + """ + while True: + volumes = ec2_connection.get_all_volumes([volume_id,]) + if volumes[0].status == 'available': + break + sys.stdout.write(".") + sys.stdout.flush() + time.sleep(1) + +class Ec2Cluster(Cluster): + """ + A cluster of EC2 instances. A cluster has a unique name. + + Instances running in the cluster run in a security group with the cluster's + name, and also a name indicating the instance's role, e.g. -foo + to show a "foo" instance. + """ + + @staticmethod + def get_clusters_with_role(role, state="running"): + all_instances = EC2Connection().get_all_instances() + clusters = [] + for res in all_instances: + instance = res.instances[0] + for group in res.groups: + if group.id.endswith("-" + role) and instance.state == state: + clusters.append(re.sub("-%s$" % re.escape(role), "", group.id)) + return clusters + + def __init__(self, name, config_dir): + super(Ec2Cluster, self).__init__(name, config_dir) + self.ec2Connection = EC2Connection() + + def get_provider_code(self): + return "ec2" + + def _get_cluster_group_name(self): + return self.name + + def _check_role_name(self, role): + if not re.match("^[a-zA-Z0-9_]+$", role): + raise RoleSyntaxException("Invalid role name '%s'" % role) + + def _group_name_for_role(self, role): + """ + Return the security group name for an instance in a given role. + """ + self._check_role_name(role) + return "%s-%s" % (self.name, role) + + def _get_group_names(self, role): + self._check_role_name(role) + return [self._get_cluster_group_name(), self._group_name_for_role(role)] + + def _get_all_group_names(self): + security_groups = self.ec2Connection.get_all_security_groups() + security_group_names = \ + [security_group.name for security_group in security_groups] + return security_group_names + + def _get_all_group_names_for_cluster(self): + all_group_names = self._get_all_group_names() + r = [] + if self.name not in all_group_names: + return r + for group in all_group_names: + if re.match("^%s(-[a-zA-Z0-9_]+)?$" % self.name, group): + r.append(group) + return r + + def _create_groups(self, role): + """ + Create the security groups for a given role, including a group for the + cluster if it doesn't exist. + """ + self._check_role_name(role) + security_group_names = self._get_all_group_names() + + cluster_group_name = self._get_cluster_group_name() + if not cluster_group_name in security_group_names: + self.ec2Connection.create_security_group(cluster_group_name, + "Cluster (%s)" % (self.name)) + self.ec2Connection.authorize_security_group(cluster_group_name, + cluster_group_name) + # Allow SSH from anywhere + self.ec2Connection.authorize_security_group(cluster_group_name, + ip_protocol="tcp", + from_port=22, to_port=22, + cidr_ip="0.0.0.0/0") + + role_group_name = self._group_name_for_role(role) + if not role_group_name in security_group_names: + self.ec2Connection.create_security_group(role_group_name, + "Role %s (%s)" % (role, self.name)) + + def authorize_role(self, role, from_port, to_port, cidr_ip): + """ + Authorize access to machines in a given role from a given network. + """ + self._check_role_name(role) + role_group_name = self._group_name_for_role(role) + # Revoke first to avoid InvalidPermission.Duplicate error + self.ec2Connection.revoke_security_group(role_group_name, + ip_protocol="tcp", + from_port=from_port, + to_port=to_port, cidr_ip=cidr_ip) + self.ec2Connection.authorize_security_group(role_group_name, + ip_protocol="tcp", + from_port=from_port, + to_port=to_port, + cidr_ip=cidr_ip) + + def _get_instances(self, group_name, state_filter=None): + """ + Get all the instances in a group, filtered by state. + + @param group_name: the name of the group + @param state_filter: the state that the instance should be in + (e.g. "running"), or None for all states + """ + all_instances = self.ec2Connection.get_all_instances() + instances = [] + for res in all_instances: + for group in res.groups: + if group.id == group_name: + for instance in res.instances: + if state_filter == None or instance.state == state_filter: + instances.append(instance) + return instances + + def get_instances_in_role(self, role, state_filter=None): + """ + Get all the instances in a role, filtered by state. + + @param role: the name of the role + @param state_filter: the state that the instance should be in + (e.g. "running"), or None for all states + """ + self._check_role_name(role) + instances = [] + for instance in self._get_instances(self._group_name_for_role(role), + state_filter): + instances.append(Instance(instance.id, instance.dns_name, + instance.private_dns_name)) + return instances + + def _print_instance(self, role, instance): + print "\t".join((role, instance.id, + instance.image_id, + instance.dns_name, instance.private_dns_name, + instance.state, xstr(instance.key_name), instance.instance_type, + str(instance.launch_time), instance.placement)) + + def print_status(self, roles, state_filter="running"): + """ + Print the status of instances in the given roles, filtered by state. + """ + for role in roles: + for instance in self._get_instances(self._group_name_for_role(role), + state_filter): + self._print_instance(role, instance) + + def launch_instances(self, role, number, image_id, size_id, + instance_user_data, **kwargs): + self._check_role_name(role) + + self._create_groups(role) + user_data = instance_user_data.read_as_gzip_stream() + + reservation = self.ec2Connection.run_instances(image_id, min_count=number, + max_count=number, key_name=kwargs.get('key_name', None), + security_groups=self._get_group_names(role), user_data=user_data, + instance_type=size_id, placement=kwargs.get('placement', None)) + return [instance.id for instance in reservation.instances] + + def wait_for_instances(self, instance_ids, timeout=600): + start_time = time.time() + while True: + if (time.time() - start_time >= timeout): + raise TimeoutException() + try: + if self._all_started(self.ec2Connection.get_all_instances(instance_ids)): + break + # don't timeout for race condition where instance is not yet registered + except EC2ResponseError: + pass + sys.stdout.write(".") + sys.stdout.flush() + time.sleep(1) + + def _all_started(self, reservations): + for res in reservations: + for instance in res.instances: + if instance.state != "running": + return False + return True + + def terminate(self): + instances = self._get_instances(self._get_cluster_group_name(), "running") + if instances: + self.ec2Connection.terminate_instances([i.id for i in instances]) + + def delete(self): + """ + Delete the security groups for each role in the cluster, and the group for + the cluster. + """ + group_names = self._get_all_group_names_for_cluster() + for group in group_names: + self.ec2Connection.delete_security_group(group) + + def get_storage(self): + """ + Return the external storage for the cluster. + """ + return Ec2Storage(self) + + +class Ec2Storage(Storage): + """ + Storage volumes for an EC2 cluster. The storage is associated with a named + cluster. Metadata for the storage volumes is kept in a JSON file on the client + machine (in a file called "ec2-storage-.json" in the + configuration directory). + """ + + @staticmethod + def create_formatted_snapshot(cluster, size, availability_zone, image_id, + key_name, ssh_options): + """ + Creates a formatted snapshot of a given size. This saves having to format + volumes when they are first attached. + """ + conn = cluster.ec2Connection + print "Starting instance" + reservation = conn.run_instances(image_id, key_name=key_name, + placement=availability_zone) + instance = reservation.instances[0] + try: + cluster.wait_for_instances([instance.id,]) + print "Started instance %s" % instance.id + except TimeoutException: + print "Timeout" + return + print + print "Waiting 60 seconds before attaching storage" + time.sleep(60) + # Re-populate instance object since it has more details filled in + instance.update() + + print "Creating volume of size %s in %s" % (size, availability_zone) + volume = conn.create_volume(size, availability_zone) + print "Created volume %s" % volume + print "Attaching volume to %s" % instance.id + volume.attach(instance.id, '/dev/sdj') + + _run_command_on_instance(instance, ssh_options, """ + while true ; do + echo 'Waiting for /dev/sdj...'; + if [ -e /dev/sdj ]; then break; fi; + sleep 1; + done; + mkfs.ext3 -F -m 0.5 /dev/sdj + """) + + print "Detaching volume" + conn.detach_volume(volume.id, instance.id) + print "Creating snapshot" + snapshot = volume.create_snapshot() + print "Created snapshot %s" % snapshot.id + _wait_for_volume(conn, volume.id) + print + print "Deleting volume" + volume.delete() + print "Deleted volume" + print "Stopping instance" + terminated = conn.terminate_instances([instance.id,]) + print "Stopped instance %s" % terminated + + def __init__(self, cluster): + super(Ec2Storage, self).__init__(cluster) + self.config_dir = cluster.config_dir + + def _get_storage_filename(self): + return os.path.join(self.config_dir, + "ec2-storage-%s.json" % (self.cluster.name)) + + def create(self, role, number_of_instances, availability_zone, spec_filename): + spec_file = open(spec_filename, 'r') + volume_spec_manager = JsonVolumeSpecManager(spec_file) + volume_manager = JsonVolumeManager(self._get_storage_filename()) + for dummy in range(number_of_instances): + mountable_volumes = [] + volume_specs = volume_spec_manager.volume_specs_for_role(role) + for spec in volume_specs: + logger.info("Creating volume of size %s in %s from snapshot %s" % \ + (spec.size, availability_zone, spec.snapshot_id)) + volume = self.cluster.ec2Connection.create_volume(spec.size, + availability_zone, + spec.snapshot_id) + mountable_volumes.append(MountableVolume(volume.id, spec.mount_point, + spec.device)) + volume_manager.add_instance_storage_for_role(role, mountable_volumes) + + def _get_mountable_volumes(self, role): + storage_filename = self._get_storage_filename() + volume_manager = JsonVolumeManager(storage_filename) + return volume_manager.get_instance_storage_for_role(role) + + def get_mappings_string_for_role(self, role): + mappings = {} + mountable_volumes_list = self._get_mountable_volumes(role) + for mountable_volumes in mountable_volumes_list: + for mountable_volume in mountable_volumes: + mappings[mountable_volume.mount_point] = mountable_volume.device + return ";".join(["%s,%s" % (mount_point, device) for (mount_point, device) + in mappings.items()]) + + def _has_storage(self, role): + return self._get_mountable_volumes(role) + + def has_any_storage(self, roles): + for role in roles: + if self._has_storage(role): + return True + return False + + def _get_ec2_volumes_dict(self, mountable_volumes): + volume_ids = [mv.volume_id for mv in sum(mountable_volumes, [])] + volumes = self.cluster.ec2Connection.get_all_volumes(volume_ids) + volumes_dict = {} + for volume in volumes: + volumes_dict[volume.id] = volume + return volumes_dict + + def _print_volume(self, role, volume): + print "\t".join((role, volume.id, str(volume.size), + volume.snapshot_id, volume.availabilityZone, + volume.status, str(volume.create_time), + str(volume.attach_time))) + + def print_status(self, roles): + for role in roles: + mountable_volumes_list = self._get_mountable_volumes(role) + ec2_volumes = self._get_ec2_volumes_dict(mountable_volumes_list) + for mountable_volumes in mountable_volumes_list: + for mountable_volume in mountable_volumes: + self._print_volume(role, ec2_volumes[mountable_volume.volume_id]) + + def _replace(self, string, replacements): + for (match, replacement) in replacements.iteritems(): + string = string.replace(match, replacement) + return string + + def attach(self, role, instances): + mountable_volumes_list = self._get_mountable_volumes(role) + if not mountable_volumes_list: + return + ec2_volumes = self._get_ec2_volumes_dict(mountable_volumes_list) + + available_mountable_volumes_list = [] + + available_instances_dict = {} + for instance in instances: + available_instances_dict[instance.id] = instance + + # Iterate over mountable_volumes and retain those that are not attached + # Also maintain a list of instances that have no attached storage + # Note that we do not fill in "holes" (instances that only have some of + # their storage attached) + for mountable_volumes in mountable_volumes_list: + available = True + for mountable_volume in mountable_volumes: + if ec2_volumes[mountable_volume.volume_id].status != 'available': + available = False + attach_data = ec2_volumes[mountable_volume.volume_id].attach_data + instance_id = attach_data.instance_id + if available_instances_dict.has_key(instance_id): + del available_instances_dict[instance_id] + if available: + available_mountable_volumes_list.append(mountable_volumes) + + if len(available_instances_dict) != len(available_mountable_volumes_list): + logger.warning("Number of available instances (%s) and volumes (%s) \ + do not match." \ + % (len(available_instances_dict), + len(available_mountable_volumes_list))) + + for (instance, mountable_volumes) in zip(available_instances_dict.values(), + available_mountable_volumes_list): + print "Attaching storage to %s" % instance.id + for mountable_volume in mountable_volumes: + volume = ec2_volumes[mountable_volume.volume_id] + print "Attaching %s to %s" % (volume.id, instance.id) + volume.attach(instance.id, mountable_volume.device) + + def delete(self, role): + storage_filename = self._get_storage_filename() + volume_manager = JsonVolumeManager(storage_filename) + mountable_volumes_list = volume_manager.get_instance_storage_for_role(role) + ec2_volumes = self._get_ec2_volumes_dict(mountable_volumes_list) + all_available = True + for volume in ec2_volumes.itervalues(): + if volume.status != 'available': + all_available = False + logger.warning("Volume %s is not available.", volume) + if not all_available: + logger.warning("Some volumes are still in use for role %s.\ + Aborting delete.", role) + return + for volume in ec2_volumes.itervalues(): + volume.delete() + volume_manager.remove_instance_storage_for_role(role) Added: hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/storage.py URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/storage.py?rev=885888&view=auto ============================================================================== --- hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/storage.py (added) +++ hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/storage.py Tue Dec 1 19:33:10 2009 @@ -0,0 +1,163 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Classes for controlling external cluster storage. +""" + +import logging +import simplejson as json + +logger = logging.getLogger(__name__) + +class VolumeSpec(object): + """ + The specification for a storage volume, encapsulating all the information + needed to create a volume and ultimately mount it on an instance. + """ + def __init__(self, size, mount_point, device, snapshot_id): + self.size = size + self.mount_point = mount_point + self.device = device + self.snapshot_id = snapshot_id + + +class JsonVolumeSpecManager(object): + """ + A container for VolumeSpecs. This object can read VolumeSpecs specified in + JSON. + """ + def __init__(self, spec_file): + self.spec = json.load(spec_file) + + def volume_specs_for_role(self, role): + return [VolumeSpec(d["size_gb"], d["mount_point"], d["device"], + d["snapshot_id"]) for d in self.spec[role]] + + def get_mappings_string_for_role(self, role): + """ + Returns a short string of the form + "mount_point1,device1;mount_point2,device2;..." + which is useful for passing as an environment variable. + """ + return ";".join(["%s,%s" % (d["mount_point"], d["device"]) + for d in self.spec[role]]) + + +class MountableVolume(object): + """ + A storage volume that has been created. It may or may not have been attached + or mounted to an instance. + """ + def __init__(self, volume_id, mount_point, device): + self.volume_id = volume_id + self.mount_point = mount_point + self.device = device + + +class JsonVolumeManager(object): + + def __init__(self, filename): + self.filename = filename + + def _load(self): + try: + return json.load(open(self.filename, "r")) + except IOError: + logger.debug("File %s does not exist.", self.filename) + return {} + + def _store(self, obj): + return json.dump(obj, open(self.filename, "w"), sort_keys=True, indent=2) + + def add_instance_storage_for_role(self, role, mountable_volumes): + json_dict = self._load() + mv_dicts = [mv.__dict__ for mv in mountable_volumes] + json_dict.setdefault(role, []).append(mv_dicts) + self._store(json_dict) + + def remove_instance_storage_for_role(self, role): + json_dict = self._load() + del json_dict[role] + self._store(json_dict) + + def get_instance_storage_for_role(self, role): + """ + Returns a list of lists of MountableVolume objects. Each nested list is + the storage for one instance. + """ + try: + json_dict = self._load() + instance_storage = [] + for instance in json_dict[role]: + vols = [] + for vol in instance: + vols.append(MountableVolume(vol["volume_id"], vol["mount_point"], + vol["device"])) + instance_storage.append(vols) + return instance_storage + except KeyError: + return [] + +class Storage(object): + """ + Storage volumes for a cluster. The storage is associated with a named + cluster. Many clusters just have local storage, in which case this is + not used. + """ + + def __init__(self, cluster): + self.cluster = cluster + + def create(self, role, number_of_instances, availability_zone, spec_filename): + """ + Create new storage volumes for instances with the given role, according to + the mapping defined in the spec file. + """ + pass + + def get_mappings_string_for_role(self, role): + """ + Returns a short string of the form + "mount_point1,device1;mount_point2,device2;..." + which is useful for passing as an environment variable. + """ + raise Exception("Unimplemented") + + def has_any_storage(self, roles): + """ + Return True if any of the given roles has associated storage + """ + return False + + def print_status(self, roles): + """ + Print the status of storage volumes for the given roles. + """ + pass + + def attach(self, role, instances): + """ + Attach volumes for a role to instances. Some volumes may already be + attached, in which case they are ignored, and we take care not to attach + multiple volumes to an instance. + """ + pass + + def delete(self, role): + """ + Permanently delete all the storage for a role. + """ + pass Added: hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/util.py URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/util.py?rev=885888&view=auto ============================================================================== --- hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/util.py (added) +++ hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/util.py Tue Dec 1 19:33:10 2009 @@ -0,0 +1,84 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Utility functions. +""" + +import ConfigParser +import socket +import urllib2 + +def bash_quote(text): + """Quotes a string for bash, by using single quotes.""" + if text == None: + return "" + return "'%s'" % text.replace("'", "'\\''") + +def bash_quote_env(env): + """Quotes the value in an environment variable assignment.""" + if env.find("=") == -1: + return env + (var, value) = env.split("=") + return "%s=%s" % (var, bash_quote(value)) + +def build_env_string(env_strings=[], pairs={}): + """Build a bash environment variable assignment""" + env = '' + if env_strings: + for env_string in env_strings: + env += "%s " % bash_quote_env(env_string) + if pairs: + for key, val in pairs.items(): + env += "%s=%s " % (key, bash_quote(val)) + return env[:-1] + +def merge_config_with_options(section_name, config, options): + """ + Merge configuration options with a dictionary of options. + Keys in the options dictionary take precedence. + """ + res = {} + try: + for (key, value) in config.items(section_name): + if value.find("\n") != -1: + res[key] = value.split("\n") + else: + res[key] = value + except ConfigParser.NoSectionError: + pass + for key in options: + if options[key] != None: + res[key] = options[key] + return res + +def url_get(url, timeout=10, retries=0): + """ + Retrieve content from the given URL. + """ + # in Python 2.6 we can pass timeout to urllib2.urlopen + socket.setdefaulttimeout(timeout) + attempts = 0 + while True: + try: + return urllib2.urlopen(url).read() + except urllib2.URLError: + attempts = attempts + 1 + if attempts > retries: + raise + +def xstr(string): + """Sane string conversion: return an empty string if string is None.""" + return '' if string is None else str(string) Added: hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/__init__.py URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/__init__.py?rev=885888&view=auto ============================================================================== --- hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/__init__.py (added) +++ hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/__init__.py Tue Dec 1 19:33:10 2009 @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file Added: hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/__init__.py URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/__init__.py?rev=885888&view=auto ============================================================================== --- hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/__init__.py (added) +++ hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/__init__.py Tue Dec 1 19:33:10 2009 @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file Added: hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/alltests.py URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/alltests.py?rev=885888&view=auto ============================================================================== --- hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/alltests.py (added) +++ hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/alltests.py Tue Dec 1 19:33:10 2009 @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import unittest +from hadoop.cloud.testcluster import TestCluster +from hadoop.cloud.teststorage import TestJsonVolumeSpecManager +from hadoop.cloud.teststorage import TestJsonVolumeManager +from hadoop.cloud.testuserdata import TestInstanceUserData +from hadoop.cloud.testutil import TestUtilFunctions + +def testSuite(): + alltests = unittest.TestSuite([ + unittest.makeSuite(TestCluster, 'test'), + unittest.makeSuite(TestJsonVolumeSpecManager, 'test'), + unittest.makeSuite(TestJsonVolumeManager, 'test'), + unittest.makeSuite(TestInstanceUserData, 'test'), + unittest.makeSuite(TestUtilFunctions, 'test'), + ]) + return alltests + +if __name__ == "__main__": + runner = unittest.TextTestRunner() + sys.exit(not runner.run(testSuite()).wasSuccessful()) Added: hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testcluster.py URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testcluster.py?rev=885888&view=auto ============================================================================== --- hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testcluster.py (added) +++ hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testcluster.py Tue Dec 1 19:33:10 2009 @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from hadoop.cloud.cluster import RoleSyntaxException +from hadoop.cloud.providers.ec2 import Ec2Cluster + +class TestCluster(unittest.TestCase): + + def test_group_name_for_role(self): + cluster = Ec2Cluster("test-cluster", None) + self.assertEqual("test-cluster-foo", cluster._group_name_for_role("foo")) + + def test_check_role_name_valid(self): + cluster = Ec2Cluster("test-cluster", None) + cluster._check_role_name( + "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_") + + def test_check_role_name_dash_is_invalid(self): + cluster = Ec2Cluster("test-cluster", None) + self.assertRaises(RoleSyntaxException, cluster._check_role_name, "a-b") + +if __name__ == '__main__': + unittest.main() Added: hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/teststorage.py URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/teststorage.py?rev=885888&view=auto ============================================================================== --- hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/teststorage.py (added) +++ hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/teststorage.py Tue Dec 1 19:33:10 2009 @@ -0,0 +1,137 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import unittest + +import simplejson as json +from StringIO import StringIO + +from hadoop.cloud.storage import MountableVolume +from hadoop.cloud.storage import JsonVolumeManager +from hadoop.cloud.storage import JsonVolumeSpecManager + +spec = { + "master": ({"size_gb":"8", "mount_point":"/", "device":"/dev/sdj", + "snapshot_id": "snap_1"}, + ), + "slave": ({"size_gb":"8", "mount_point":"/", "device":"/dev/sdj", + "snapshot_id": "snap_2"}, + {"size_gb":"10", "mount_point":"/data1", "device":"/dev/sdk", + "snapshot_id": "snap_3"}, + ) + } + +class TestJsonVolumeSpecManager(unittest.TestCase): + + def test_volume_specs_for_role(self): + + input = StringIO(json.dumps(spec)) + + volume_spec_manager = JsonVolumeSpecManager(input) + + master_specs = volume_spec_manager.volume_specs_for_role("master") + self.assertEqual(1, len(master_specs)) + self.assertEqual("/", master_specs[0].mount_point) + self.assertEqual("8", master_specs[0].size) + self.assertEqual("/dev/sdj", master_specs[0].device) + self.assertEqual("snap_1", master_specs[0].snapshot_id) + + slave_specs = volume_spec_manager.volume_specs_for_role("slave") + self.assertEqual(2, len(slave_specs)) + self.assertEqual("snap_2", slave_specs[0].snapshot_id) + self.assertEqual("snap_3", slave_specs[1].snapshot_id) + + self.assertRaises(KeyError, volume_spec_manager.volume_specs_for_role, + "no-such-role") + + def test_get_mappings_string_for_role(self): + + input = StringIO(json.dumps(spec)) + + volume_spec_manager = JsonVolumeSpecManager(input) + + master_mappings = volume_spec_manager.get_mappings_string_for_role("master") + self.assertEqual("/,/dev/sdj", master_mappings) + + slave_mappings = volume_spec_manager.get_mappings_string_for_role("slave") + self.assertEqual("/,/dev/sdj;/data1,/dev/sdk", slave_mappings) + + self.assertRaises(KeyError, + volume_spec_manager.get_mappings_string_for_role, + "no-such-role") + +class TestJsonVolumeManager(unittest.TestCase): + + def setUp(self): + try: + os.remove("volumemanagertest.json") + except OSError: + pass + + def test_add_instance_storage_for_role(self): + volume_manager = JsonVolumeManager("volumemanagertest.json") + self.assertEqual(0, + len(volume_manager.get_instance_storage_for_role("master"))) + + volume_manager.add_instance_storage_for_role("master", + [MountableVolume("vol_1", "/", + "/dev/sdj")]) + master_storage = volume_manager.get_instance_storage_for_role("master") + self.assertEqual(1, len(master_storage)) + master_storage_instance0 = master_storage[0] + self.assertEqual(1, len(master_storage_instance0)) + master_storage_instance0_vol0 = master_storage_instance0[0] + self.assertEqual("vol_1", master_storage_instance0_vol0.volume_id) + self.assertEqual("/", master_storage_instance0_vol0.mount_point) + self.assertEqual("/dev/sdj", master_storage_instance0_vol0.device) + + volume_manager.add_instance_storage_for_role("slave", + [MountableVolume("vol_2", "/", + "/dev/sdj")]) + self.assertEqual(1, + len(volume_manager.get_instance_storage_for_role("master"))) + slave_storage = volume_manager.get_instance_storage_for_role("slave") + self.assertEqual(1, len(slave_storage)) + slave_storage_instance0 = slave_storage[0] + self.assertEqual(1, len(slave_storage_instance0)) + slave_storage_instance0_vol0 = slave_storage_instance0[0] + self.assertEqual("vol_2", slave_storage_instance0_vol0.volume_id) + self.assertEqual("/", slave_storage_instance0_vol0.mount_point) + self.assertEqual("/dev/sdj", slave_storage_instance0_vol0.device) + + volume_manager.add_instance_storage_for_role("slave", + [MountableVolume("vol_3", "/", "/dev/sdj"), + MountableVolume("vol_4", "/data1", "/dev/sdk")]) + self.assertEqual(1, + len(volume_manager.get_instance_storage_for_role("master"))) + slave_storage = volume_manager.get_instance_storage_for_role("slave") + self.assertEqual(2, len(slave_storage)) + slave_storage_instance0 = slave_storage[0] + slave_storage_instance1 = slave_storage[1] + self.assertEqual(1, len(slave_storage_instance0)) + self.assertEqual(2, len(slave_storage_instance1)) + slave_storage_instance1_vol0 = slave_storage_instance1[0] + slave_storage_instance1_vol1 = slave_storage_instance1[1] + self.assertEqual("vol_3", slave_storage_instance1_vol0.volume_id) + self.assertEqual("/", slave_storage_instance1_vol0.mount_point) + self.assertEqual("/dev/sdj", slave_storage_instance1_vol0.device) + self.assertEqual("vol_4", slave_storage_instance1_vol1.volume_id) + self.assertEqual("/data1", slave_storage_instance1_vol1.mount_point) + self.assertEqual("/dev/sdk", slave_storage_instance1_vol1.device) + + +if __name__ == '__main__': + unittest.main() Added: hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testuserdata.py URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testuserdata.py?rev=885888&view=auto ============================================================================== --- hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testuserdata.py (added) +++ hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testuserdata.py Tue Dec 1 19:33:10 2009 @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import tempfile +import unittest + +from hadoop.cloud.cluster import InstanceUserData + +class TestInstanceUserData(unittest.TestCase): + + def test_replacement(self): + file = tempfile.NamedTemporaryFile() + file.write("Contents go here") + file.flush() + self.assertEqual("Contents go here", + InstanceUserData(file.name, {}).read()) + self.assertEqual("Contents were here", + InstanceUserData(file.name, { "go": "were"}).read()) + self.assertEqual("Contents here", + InstanceUserData(file.name, { "go": None}).read()) + file.close() + + def test_read_file_url(self): + file = tempfile.NamedTemporaryFile() + file.write("Contents go here") + file.flush() + self.assertEqual("Contents go here", + InstanceUserData("file://%s" % file.name, {}).read()) + file.close() + +if __name__ == '__main__': + unittest.main() Added: hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testutil.py URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testutil.py?rev=885888&view=auto ============================================================================== --- hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testutil.py (added) +++ hadoop/common/trunk/src/contrib/cloud/src/test/hadoop/cloud/testutil.py Tue Dec 1 19:33:10 2009 @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import ConfigParser +import StringIO +import unittest + +from hadoop.cloud.util import bash_quote +from hadoop.cloud.util import bash_quote_env +from hadoop.cloud.util import build_env_string +from hadoop.cloud.util import merge_config_with_options +from hadoop.cloud.util import xstr + +class TestUtilFunctions(unittest.TestCase): + + def test_bash_quote(self): + self.assertEqual("", bash_quote(None)) + self.assertEqual("''", bash_quote("")) + self.assertEqual("'a'", bash_quote("a")) + self.assertEqual("'a b'", bash_quote("a b")) + self.assertEqual("'a\b'", bash_quote("a\b")) + self.assertEqual("'a '\\'' b'", bash_quote("a ' b")) + + def test_bash_quote_env(self): + self.assertEqual("", bash_quote_env("")) + self.assertEqual("a", bash_quote_env("a")) + self.assertEqual("a='b'", bash_quote_env("a=b")) + self.assertEqual("a='b c'", bash_quote_env("a=b c")) + self.assertEqual("a='b\c'", bash_quote_env("a=b\c")) + self.assertEqual("a='b '\\'' c'", bash_quote_env("a=b ' c")) + + def test_build_env_string(self): + self.assertEqual("", build_env_string()) + self.assertEqual("a='b' c='d'", + build_env_string(env_strings=["a=b", "c=d"])) + self.assertEqual("a='b' c='d'", + build_env_string(pairs={"a": "b", "c": "d"})) + + def test_merge_config_with_options(self): + options = { "a": "b" } + config = ConfigParser.ConfigParser() + self.assertEqual({ "a": "b" }, + merge_config_with_options("section", config, options)) + config.add_section("section") + self.assertEqual({ "a": "b" }, + merge_config_with_options("section", config, options)) + config.set("section", "a", "z") + config.set("section", "c", "d") + self.assertEqual({ "a": "z", "c": "d" }, + merge_config_with_options("section", config, {})) + self.assertEqual({ "a": "b", "c": "d" }, + merge_config_with_options("section", config, options)) + + def test_merge_config_with_options_list(self): + config = ConfigParser.ConfigParser() + config.readfp(StringIO.StringIO("""[section] +env1=a=b + c=d +env2=e=f + g=h""")) + self.assertEqual({ "env1": ["a=b", "c=d"], "env2": ["e=f", "g=h"] }, + merge_config_with_options("section", config, {})) + + def test_xstr(self): + self.assertEqual("", xstr(None)) + self.assertEqual("a", xstr("a")) + +if __name__ == '__main__': + unittest.main()