hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r896259 - in /hadoop/common/trunk: CHANGES.txt src/contrib/cloud/README.txt src/contrib/cloud/src/py/hadoop/cloud/cli.py src/contrib/cloud/src/py/hadoop/cloud/data/zookeeper-ec2-init-remote.sh src/contrib/cloud/src/py/hadoop/cloud/service.py
Date Tue, 05 Jan 2010 22:54:56 GMT
Author: tomwhite
Date: Tue Jan  5 22:54:51 2010
New Revision: 896259

URL: http://svn.apache.org/viewvc?rev=896259&view=rev
Log:
HADOOP-6466. Add a ZooKeeper service to the cloud scripts.

Added:
    hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/data/zookeeper-ec2-init-remote.sh
Modified:
    hadoop/common/trunk/CHANGES.txt
    hadoop/common/trunk/src/contrib/cloud/README.txt
    hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/cli.py
    hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/service.py

Modified: hadoop/common/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=896259&r1=896258&r2=896259&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Tue Jan  5 22:54:51 2010
@@ -33,6 +33,8 @@
     HADOOP-6415. Adds a common token interface for both job token and 
     delegation token. (Kan Zhang via ddas)
 
+    HADOOP-6466. Add a ZooKeeper service to the cloud scripts. (tomwhite)
+
   IMPROVEMENTS
 
     HADOOP-6283. Improve the exception messages thrown by

Modified: hadoop/common/trunk/src/contrib/cloud/README.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/README.txt?rev=896259&r1=896258&r2=896259&view=diff
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/README.txt (original)
+++ hadoop/common/trunk/src/contrib/cloud/README.txt Tue Jan  5 22:54:51 2010
@@ -316,3 +316,24 @@
 It's possible to use any image, as long as it i) runs (gzip compressed) user
 data on boot, and ii) has Java installed.
 
+OTHER SERVICES
+==============
+
+ZooKeeper
+=========
+
+You can run ZooKeeper by setting the "service" parameter to "zookeeper". For
+example:
+
+[my-zookeeper-cluster]
+service=zookeeper
+ami=ami-ed59bf84
+instance_type=m1.small
+key_name=tom
+availability_zone=us-east-1c
+public_key=PATH_TO_PUBLIC_KEY
+private_key=PATH_TO_PRIVATE_KEY
+
+Then to launch a three-node ZooKeeper ensemble, run:
+
+% ./hadoop-ec2 launch-cluster my-zookeeper-cluster 3 zk

Modified: hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/cli.py
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/cli.py?rev=896259&r1=896258&r2=896259&view=diff
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/cli.py (original)
+++ hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/cli.py Tue Jan  5 22:54:51 2010
@@ -18,8 +18,8 @@
 import ConfigParser
 from hadoop.cloud import VERSION
 from hadoop.cloud.cluster import get_cluster
+from hadoop.cloud.service import get_service
 from hadoop.cloud.service import InstanceTemplate
-from hadoop.cloud.service import HadoopService
 from hadoop.cloud.service import NAMENODE
 from hadoop.cloud.service import SECONDARY_NAMENODE
 from hadoop.cloud.service import JOBTRACKER
@@ -33,6 +33,7 @@
 import os
 import sys
 
+DEFAULT_SERVICE_NAME = 'hadoop'
 DEFAULT_CLOUD_PROVIDER = 'ec2'
 
 DEFAULT_CONFIG_DIR_NAME = '.hadoop-cloud'
@@ -183,8 +184,10 @@
   cluster_name = args[0]
   opt = merge_config_with_options(cluster_name, config, options_dict)
   logging.debug("Options: %s", str(opt))
-  cluster = get_cluster(get_cloud_provider(opt))(cluster_name, config_dir)
-  service = get_service(cluster)
+  service_name = get_service_name(opt)
+  cloud_provider = get_cloud_provider(opt)
+  cluster = get_cluster(cloud_provider)(cluster_name, config_dir)
+  service = get_service(service_name, cloud_provider)(cluster)
   return (opt, args, service)
 
 def parse_options(command, option_list=[], expected_arguments=(),
@@ -223,15 +226,18 @@
     config_dir = DEFAULT_CONFIG_DIR
   return config_dir
 
+def get_service_name(options_dict):
+  service_name = options_dict.get("service", None)
+  if service_name is None:
+    service_name = DEFAULT_SERVICE_NAME
+  return service_name
+
 def get_cloud_provider(options_dict):
   provider = options_dict.get("cloud_provider", None)
   if provider is None:
     provider = DEFAULT_CLOUD_PROVIDER
   return provider
 
-def get_service(cluster):
-  return HadoopService(cluster)
-
 def check_options_set(options, option_names):
   for option_name in option_names:
     if options.get(option_name) is None:
@@ -268,8 +274,10 @@
   if command == 'list':
     (opt, args) = parse_options(command, BASIC_OPTIONS, unbounded_args=True)
     if len(args) == 0:
-      service = get_service(None)
-      service.list_all(get_cloud_provider(opt))
+      service_name = get_service_name(opt)
+      cloud_provider = get_cloud_provider(opt)
+      service = get_service(service_name, cloud_provider)(None)
+      service.list_all(cloud_provider)
     else:
       (opt, args, service) = parse_options_and_config(command, BASIC_OPTIONS)
       service.list()

Added: hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/data/zookeeper-ec2-init-remote.sh
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/data/zookeeper-ec2-init-remote.sh?rev=896259&view=auto
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/data/zookeeper-ec2-init-remote.sh
(added)
+++ hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/data/zookeeper-ec2-init-remote.sh
Tue Jan  5 22:54:51 2010
@@ -0,0 +1,112 @@
+#!/bin/bash -x
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+################################################################################
+# Script that is run on each EC2 instance on boot. It is passed in the EC2 user
+# data, so should not exceed 16K in size after gzip compression.
+#
+# This script is executed by /etc/init.d/ec2-run-user-data, and output is
+# logged to /var/log/messages.
+################################################################################
+
+################################################################################
+# Initialize variables
+################################################################################
+
+# Substitute environment variables passed by the client
+export %ENV%
+
+ZK_VERSION=${ZK_VERSION:-3.2.2}
+ZOOKEEPER_HOME=/usr/local/zookeeper-$ZK_VERSION
+ZK_CONF_DIR=/etc/zookeeper/conf
+
+function register_auto_shutdown() {
+  if [ ! -z "$AUTO_SHUTDOWN" ]; then
+    shutdown -h +$AUTO_SHUTDOWN >/dev/null &
+  fi
+}
+
+# Install a list of packages on debian or redhat as appropriate
+function install_packages() {
+  if which dpkg &> /dev/null; then
+    apt-get update
+    apt-get -y install $@
+  elif which rpm &> /dev/null; then
+    yum install -y $@
+  else
+    echo "No package manager found."
+  fi
+}
+
+# Install any user packages specified in the USER_PACKAGES environment variable
+function install_user_packages() {
+  if [ ! -z "$USER_PACKAGES" ]; then
+    install_packages $USER_PACKAGES
+  fi
+}
+
+function install_zookeeper() {
+  zk_tar_url=http://www.apache.org/dist/hadoop/zookeeper/zookeeper-$ZK_VERSION/zookeeper-$ZK_VERSION.tar.gz
+  zk_tar_file=`basename $zk_tar_url`
+  zk_tar_md5_file=`basename $zk_tar_url.md5`
+
+  curl="curl --retry 3 --silent --show-error --fail"
+  for i in `seq 1 3`;
+  do
+    $curl -O $zk_tar_url
+    $curl -O $zk_tar_url.md5
+    if md5sum -c $zk_tar_md5_file; then
+      break;
+    else
+      rm -f $zk_tar_file $zk_tar_md5_file
+    fi
+  done
+
+  if [ ! -e $zk_tar_file ]; then
+    echo "Failed to download $zk_tar_url. Aborting."
+    exit 1
+  fi
+
+  tar zxf $zk_tar_file -C /usr/local
+  rm -f $zk_tar_file $zk_tar_md5_file
+
+  echo "export ZOOKEEPER_HOME=$ZOOKEEPER_HOME" >> ~root/.bashrc
+  echo 'export PATH=$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$PATH' >> ~root/.bashrc
+}
+
+function configure_zookeeper() {
+  mkdir -p /mnt/zookeeper/logs
+  ln -s /mnt/zookeeper/logs /var/log/zookeeper
+  mkdir -p /var/log/zookeeper/txlog
+  mkdir -p $ZK_CONF_DIR
+  cp $ZOOKEEPER_HOME/conf/log4j.properties $ZK_CONF_DIR
+
+  sed -i -e "s|log4j.rootLogger=INFO, CONSOLE|log4j.rootLogger=INFO, ROLLINGFILE|" \
+         -e "s|log4j.appender.ROLLINGFILE.File=zookeeper.log|log4j.appender.ROLLINGFILE.File=/var/log/zookeeper/zookeeper.log|"
\
+      $ZK_CONF_DIR/log4j.properties
+      
+  # Ensure ZooKeeper starts on boot
+  cat > /etc/rc.local <<EOF
+ZOOCFGDIR=$ZK_CONF_DIR $ZOOKEEPER_HOME/bin/zkServer.sh start > /dev/null 2>&1 &
+EOF
+
+}
+
+register_auto_shutdown
+install_user_packages
+install_zookeeper
+configure_zookeeper

Modified: hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/service.py
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/service.py?rev=896259&r1=896258&r2=896259&view=diff
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/service.py (original)
+++ hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/service.py Tue Jan  5 22:54:51
2010
@@ -71,14 +71,174 @@
     new_env_strings.extend(env_strings)
     self.env_strings = new_env_strings
 
-class HadoopService(object):
+
+class Service(object):
   """
-  A HDFS and MapReduce service.
+  A general service that runs on a cluster.
   """
   
   def __init__(self, cluster):
     self.cluster = cluster
     
+  def get_service_code(self):
+    """
+    The code that uniquely identifies the service.
+    """
+    raise Exception("Unimplemented")
+    
+  def list_all(self, provider):
+    """
+    Find and print all clusters running this type of service.
+    """
+    raise Exception("Unimplemented")
+
+  def list(self):
+    """
+    Find and print all the instances running in this cluster.
+    """
+    raise Exception("Unimplemented")
+  
+  def launch_master(self, instance_template, config_dir, client_cidr):
+    """
+    Launch a "master" instance.
+    """
+    raise Exception("Unimplemented")
+  
+  def launch_slaves(self, instance_template):
+    """
+    Launch "slave" instance.
+    """
+    raise Exception("Unimplemented")
+  
+  def launch_cluster(self, instance_templates, config_dir, client_cidr):
+    """
+    Launch a cluster of instances.
+    """
+    raise Exception("Unimplemented")
+  
+  def terminate_cluster(self,  force=False):
+    self.cluster.print_status()
+    if not force and not self._prompt("Terminate all instances?"):
+      print "Not terminating cluster."
+    else:
+      print "Terminating cluster"
+      self.cluster.terminate()
+      
+  def delete_cluster(self):
+    self.cluster.delete()
+    
+  def create_formatted_snapshot(self, size, availability_zone,
+                                image_id, key_name, ssh_options):
+    Ec2Storage.create_formatted_snapshot(self.cluster, size,
+                                         availability_zone,
+                                         image_id,
+                                         key_name,
+                                         ssh_options)
+
+  def list_storage(self):
+    storage = self.cluster.get_storage()
+    storage.print_status()
+
+  def create_storage(self, role, number_of_instances,
+                     availability_zone, spec_file):
+    storage = self.cluster.get_storage()
+    storage.create(role, number_of_instances, availability_zone, spec_file)
+    storage.print_status()
+    
+  def attach_storage(self, role):
+    storage = self.cluster.get_storage()
+    storage.attach(role, self.cluster.get_instances_in_role(role, 'running'))
+    storage.print_status()
+    
+  def delete_storage(self, force=False):
+    storage = self.cluster.get_storage()
+    storage.print_status()
+    if not force and not self._prompt("Delete all storage volumes? THIS WILL \
+      PERMANENTLY DELETE ALL DATA"):
+      print "Not deleting storage volumes."
+    else:
+      print "Deleting storage"
+      for role in storage.get_roles():
+        storage.delete(role)
+  
+  def login(self, ssh_options):
+    raise Exception("Unimplemented")
+    
+  def proxy(self, ssh_options):
+    raise Exception("Unimplemented")
+    
+  def push(self, ssh_options, file):
+    raise Exception("Unimplemented")
+    
+  def execute(self, ssh_options, args):
+    raise Exception("Unimplemented")
+  
+  def update_slaves_file(self, config_dir, ssh_options, private_key):
+    raise Exception("Unimplemented")
+  
+  def _prompt(self, prompt):
+    """ Returns true if user responds "yes" to prompt. """
+    return raw_input("%s [yes or no]: " % prompt).lower() == "yes"
+
+  def _call(self, command):
+    print command
+    try:
+      subprocess.call(command, shell=True)
+    except Exception, e:
+      print e
+
+  def _get_default_user_data_file_template(self):
+    data_path = os.path.join(os.path.dirname(__file__), 'data')
+    return os.path.join(data_path, '%s-%s-init-remote.sh' %
+                 (self.get_service_code(), self.cluster.get_provider_code()))
+
+  def _launch_instances(self, instance_template):
+    it = instance_template
+    user_data_file_template = it.user_data_file_template
+    if it.user_data_file_template == None:
+      user_data_file_template = self._get_default_user_data_file_template()
+    ebs_mappings = ''
+    storage = self.cluster.get_storage()
+    for role in it.roles:
+      if storage.has_any_storage((role,)):
+        ebs_mappings = storage.get_mappings_string_for_role(role)
+    replacements = { "%ENV%": build_env_string(it.env_strings, {
+      "ROLES": ",".join(it.roles),
+      "USER_PACKAGES": it.user_packages,
+      "AUTO_SHUTDOWN": it.auto_shutdown,
+      "EBS_MAPPINGS": ebs_mappings,
+    }) }
+    instance_user_data = InstanceUserData(user_data_file_template, replacements)
+    instance_ids = self.cluster.launch_instances(it.roles, it.number, it.image_id,
+                                            it.size_id,
+                                            instance_user_data,
+                                            key_name=it.key_name,
+                                            public_key=it.public_key,
+                                            placement=it.placement)
+    print "Waiting for %s instances in role %s to start" % \
+      (it.number, ",".join(it.roles))
+    try:
+      self.cluster.wait_for_instances(instance_ids)
+      print "%s instances started" % ",".join(it.roles)
+    except TimeoutException:
+      print "Timeout while waiting for %s instance to start." % ",".join(it.roles)
+      return
+    print
+    self.cluster.print_status(it.roles[0])
+    return self.cluster.get_instances_in_role(it.roles[0], "running")
+
+  
+class HadoopService(Service):
+  """
+  A HDFS and MapReduce service.
+  """
+  
+  def __init__(self, cluster):
+    super(HadoopService, self).__init__(cluster)
+    
+  def get_service_code(self):
+    return "hadoop"
+    
   def list_all(self, provider):
     """
     Find and print clusters that have a running namenode instances
@@ -160,14 +320,6 @@
                                                file, master.public_ip),
                                                shell=True)
     
-  def push(self, ssh_options, file):
-    master = self._get_master()
-    if not master:
-      sys.exit(1)
-    subprocess.call('scp %s -r %s root@%s:' % (xstr(ssh_options),
-                                               file, master.public_ip),
-                                               shell=True)
-    
   def execute(self, ssh_options, args):
     master = self._get_master()
     if not master:
@@ -176,51 +328,6 @@
                                              master.public_ip,
                                              " ".join(args)), shell=True)
   
-  def terminate_cluster(self,  force=False):
-    self.cluster.print_status()
-    if not force and not self._prompt("Terminate all instances?"):
-      print "Not terminating cluster."
-    else:
-      print "Terminating cluster"
-      self.cluster.terminate()
-      
-  def delete_cluster(self):
-    self.cluster.delete()
-    
-  def create_formatted_snapshot(self, size, availability_zone,
-                                image_id, key_name, ssh_options):
-    Ec2Storage.create_formatted_snapshot(self.cluster, size,
-                                         availability_zone,
-                                         image_id,
-                                         key_name,
-                                         ssh_options)
-
-  def list_storage(self):
-    storage = self.cluster.get_storage()
-    storage.print_status()
-
-  def create_storage(self, role, number_of_instances,
-                     availability_zone, spec_file):
-    storage = self.cluster.get_storage()
-    storage.create(role, number_of_instances, availability_zone, spec_file)
-    storage.print_status()
-    
-  def attach_storage(self, role):
-    storage = self.cluster.get_storage()
-    storage.attach(role, self.cluster.get_instances_in_role(role, 'running'))
-    storage.print_status()
-    
-  def delete_storage(self, force=False):
-    storage = self.cluster.get_storage()
-    storage.print_status()
-    if not force and not self._prompt("Delete all storage volumes? THIS WILL \
-      PERMANENTLY DELETE ALL DATA"):
-      print "Not deleting storage volumes."
-    else:
-      print "Deleting storage"
-      for role in storage.get_roles():
-        storage.delete(role)
-        
   def update_slaves_file(self, config_dir, ssh_options, private_key):
     instances = self.cluster.check_running(NAMENODE, 1)
     if not instances:
@@ -241,16 +348,6 @@
       subprocess.call('scp %s -r %s root@%s:/root/.ssh/id_rsa' % \
                       (ssh_options, private_key, slave.public_ip), shell=True)
         
-  def _prompt(self, prompt):
-    """ Returns true if user responds "yes" to prompt. """
-    return raw_input("%s [yes or no]: " % prompt).lower() == "yes"
-  
-
-  def _get_default_user_data_file_template(self):
-    data_path = os.path.join(os.path.dirname(__file__), 'data')
-    return os.path.join(data_path, 'hadoop-%s-init-remote.sh' %
-                 self.cluster.get_provider_code())
-  
   def _get_master(self):
     # For split namenode/jobtracker, designate the namenode as the master
     return self._get_namenode()
@@ -288,42 +385,6 @@
     """Replace characters in role name with ones allowed in bash variable names"""
     return role.replace('+', '_').upper()
 
-  def _launch_instances(self, instance_template):
-    it = instance_template
-    user_data_file_template = it.user_data_file_template
-    if it.user_data_file_template == None:
-      user_data_file_template = self._get_default_user_data_file_template()
-    ebs_mappings = ''
-    storage = self.cluster.get_storage()
-    for role in it.roles:
-      if storage.has_any_storage((role,)):
-        ebs_mappings = storage.get_mappings_string_for_role(role)
-    replacements = { "%ENV%": build_env_string(it.env_strings, {
-      "ROLES": ",".join(it.roles),
-      "USER_PACKAGES": it.user_packages,
-      "AUTO_SHUTDOWN": it.auto_shutdown,
-      "EBS_MAPPINGS": ebs_mappings,
-    }) }
-    instance_user_data = InstanceUserData(user_data_file_template, replacements)
-    instance_ids = self.cluster.launch_instances(it.roles, it.number, it.image_id,
-                                            it.size_id,
-                                            instance_user_data,
-                                            key_name=it.key_name,
-                                            public_key=it.public_key,
-                                            placement=it.placement,
-                                            security_groups=it.security_groups)
-    print "Waiting for %s instances in role %s to start" % \
-      (it.number, ",".join(it.roles))
-    try:
-      self.cluster.wait_for_instances(instance_ids)
-      print "%s instances started" % ",".join(it.roles)
-    except TimeoutException:
-      print "Timeout while waiting for %s instance to start." % ",".join(it.roles)
-      return
-    print
-    self.cluster.print_status(it.roles[0])
-    return self.cluster.get_instances_in_role(it.roles[0], "running")
-
   def _authorize_client_ports(self, client_cidrs=[]):
     if not client_cidrs:
       logger.debug("No client CIDRs specified, using local address.")
@@ -463,4 +524,111 @@
       time.sleep(10)
       for role in roles:
         storage.attach(role, self.cluster.get_instances_in_role(role, 'running'))
-      storage.print_status(roles)
\ No newline at end of file
+      storage.print_status(roles)
+
+
+class ZooKeeperService(Service):
+  """
+  A ZooKeeper service.
+  """
+
+  ZOOKEEPER_ROLE = "zk"
+
+  def __init__(self, cluster):
+    super(ZooKeeperService, self).__init__(cluster)
+    
+  def get_service_code(self):
+    return "zookeeper"
+
+  def launch_cluster(self, instance_templates, config_dir, client_cidr):
+    self._launch_cluster_instances(instance_templates)
+    self._authorize_client_ports(client_cidr)
+    self._update_cluster_membership(instance_templates[0].public_key)
+    
+  def _launch_cluster_instances(self, instance_templates):
+    for instance_template in instance_templates:
+      instances = self._launch_instances(instance_template)
+
+  def _authorize_client_ports(self, client_cidrs=[]):
+    if not client_cidrs:
+      logger.debug("No client CIDRs specified, using local address.")
+      client_ip = url_get('http://checkip.amazonaws.com/').strip()
+      client_cidrs = ("%s/32" % client_ip,)
+    logger.debug("Client CIDRs: %s", client_cidrs)
+    for client_cidr in client_cidrs:
+      self.cluster.authorize_role(self.ZOOKEEPER_ROLE, 2181, 2181, client_cidr)
+  
+  def _update_cluster_membership(self, public_key):
+    time.sleep(30) # wait for SSH daemon to start
+    
+    ssh_options = '-o StrictHostKeyChecking=no'
+    private_key = public_key[:-4] # TODO: pass in private key explicitly
+
+    instances = self.cluster.get_instances_in_role(self.ZOOKEEPER_ROLE,
+                                                   'running')
+    config_file = 'zoo.cfg'
+    with open(config_file, 'w') as f:
+      f.write("""# The number of milliseconds of each tick
+tickTime=2000
+# The number of ticks that the initial
+# synchronization phase can take
+initLimit=10
+# The number of ticks that can pass between
+# sending a request and getting an acknowledgement
+syncLimit=5
+# The directory where the snapshot is stored.
+dataDir=/var/log/zookeeper/txlog
+# The port at which the clients will connect
+clientPort=2181
+# The servers in the ensemble
+""")
+      counter = 1
+      for i in instances:
+        f.write("server.%s=%s:2888:3888\n" % (counter, i.private_ip))
+        counter += 1
+    # copy to each node in the cluster
+    myid_file = 'myid'
+    counter = 1
+    for i in instances:
+      self._call('scp -i %s %s %s root@%s:/etc/zookeeper/conf/zoo.cfg' \
+                 % (private_key, ssh_options, config_file, i.public_ip))
+      with open(myid_file, 'w') as f:
+        f.write(str(counter) + "\n")
+      self._call('scp -i %s %s %s root@%s:/var/log/zookeeper/txlog/myid' \
+                 % (private_key, ssh_options, myid_file, i.public_ip))
+      counter += 1
+    os.remove(config_file)
+    os.remove(myid_file)
+
+    # start the zookeeper servers
+    for i in instances:
+      self._call('ssh -i %s %s root@%s nohup /etc/rc.local &' \
+                 % (private_key, ssh_options, i.public_ip))
+      
+    hosts_string = ",".join(["%s:2181" % i.public_ip for i in instances]) 
+    print "ZooKeeper cluster: %s" % hosts_string
+
+SERVICE_PROVIDER_MAP = {
+  "hadoop": {
+    # "provider_code": ('hadoop.cloud.providers.provider_code', 'ProviderHadoopService')
+  },
+  "zookeeper": {
+    # "provider_code": ('hadoop.cloud.providers.provider_code', 'ProviderZooKeeperService')
+  },
+}
+
+DEFAULT_SERVICE_PROVIDER_MAP = {
+  "hadoop": HadoopService,
+  "zookeeper": ZooKeeperService
+}
+
+def get_service(service, provider):
+  """
+  Retrieve the Service class for a service and provider.
+  """
+  try:
+    mod_name, service_classname = SERVICE_PROVIDER_MAP[service][provider]
+    _mod = __import__(mod_name, globals(), locals(), [service_classname])
+    return getattr(_mod, service_classname)
+  except KeyError:
+    return DEFAULT_SERVICE_PROVIDER_MAP[service]



Mime
View raw message