spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject git commit: Merge pull request #153 from ankurdave/stop-spot-cluster
Date Mon, 18 Nov 2013 02:49:47 GMT
Updated Branches:
  refs/heads/branch-0.8 6c607682c -> a64397b1e


Merge pull request #153 from ankurdave/stop-spot-cluster

Enable stopping and starting a spot cluster

Clusters launched using `--spot-price` contain an on-demand master and spot slaves. Because
EC2 does not support stopping spot instances, the spark-ec2 script previously could only destroy
such clusters.

This pull request makes it possible to stop and restart a spot cluster.
* The `stop` command works as expected for a spot cluster: the master is stopped and the slaves
are terminated.
* To start a stopped spot cluster, the user must invoke `launch --use-existing-master`. This
launches fresh spot slaves but resumes the existing master.

(cherry picked from commit 87f2f4e5c2812351cdd1b2e35e2b12f62eeb3fdc)
Signed-off-by: Reynold Xin <rxin@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a64397b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a64397b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a64397b1

Branch: refs/heads/branch-0.8
Commit: a64397b1ea3c920a53fdfe7a39c389753694866e
Parents: 6c60768
Author: Matei Zaharia <matei@eecs.berkeley.edu>
Authored: Tue Nov 12 16:26:09 2013 -0800
Committer: Reynold Xin <rxin@apache.org>
Committed: Sun Nov 17 18:49:40 2013 -0800

----------------------------------------------------------------------
 ec2/spark_ec2.py | 51 ++++++++++++++++++++++++++++++++-------------------
 1 file changed, 32 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a64397b1/ec2/spark_ec2.py
----------------------------------------------------------------------
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index d652c90..267c8ba 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -97,6 +97,8 @@ def parse_args():
       help="The SSH user you want to connect as (default: root)")
   parser.add_option("--delete-groups", action="store_true", default=False,
       help="When destroying a cluster, delete the security groups that were created")
+  parser.add_option("--use-existing-master", action="store_true", default=False,
+      help="Launch fresh slaves, but use an existing stopped master if possible")
 
   (opts, args) = parser.parse_args()
   if len(args) != 2:
@@ -232,9 +234,9 @@ def launch_cluster(conn, opts, cluster_name):
     slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0')
 
   # Check if instances are already running in our groups
-  active_nodes = get_existing_cluster(conn, opts, cluster_name,
-                                      die_on_error=False)
-  if any(active_nodes):
+  existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name,
+                                                           die_on_error=False)
+  if existing_slaves or (existing_masters and not opts.use_existing_master):
     print >> stderr, ("ERROR: There are already instances running in " +
         "group %s or %s" % (master_group.name, slave_group.name))
     sys.exit(1)
@@ -335,21 +337,28 @@ def launch_cluster(conn, opts, cluster_name):
                                                         zone, slave_res.id)
       i += 1
 
-  # Launch masters
-  master_type = opts.master_instance_type
-  if master_type == "":
-    master_type = opts.instance_type
-  if opts.zone == 'all':
-    opts.zone = random.choice(conn.get_all_zones()).name
-  master_res = image.run(key_name = opts.key_pair,
-                         security_groups = [master_group],
-                         instance_type = master_type,
-                         placement = opts.zone,
-                         min_count = 1,
-                         max_count = 1,
-                         block_device_map = block_map)
-  master_nodes = master_res.instances
-  print "Launched master in %s, regid = %s" % (zone, master_res.id)
+  # Launch or resume masters
+  if existing_masters:
+    print "Starting master..."
+    for inst in existing_masters:
+      if inst.state not in ["shutting-down", "terminated"]:
+        inst.start()
+    master_nodes = existing_masters
+  else:
+    master_type = opts.master_instance_type
+    if master_type == "":
+      master_type = opts.instance_type
+    if opts.zone == 'all':
+      opts.zone = random.choice(conn.get_all_zones()).name
+    master_res = image.run(key_name = opts.key_pair,
+                           security_groups = [master_group],
+                           instance_type = master_type,
+                           placement = opts.zone,
+                           min_count = 1,
+                           max_count = 1,
+                           block_device_map = block_map)
+    master_nodes = master_res.instances
+    print "Launched master in %s, regid = %s" % (zone, master_res.id)
 
   # Return all the instances
   return (master_nodes, slave_nodes)
@@ -684,6 +693,7 @@ def main():
         cluster_name + "?\nDATA ON EPHEMERAL DISKS WILL BE LOST, " +
         "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" +
         "AMAZON EBS IF IT IS EBS-BACKED!!\n" +
+        "All data on spot-instance slaves will be lost.\n" +
         "Stop cluster " + cluster_name + " (y/N): ")
     if response == "y":
       (master_nodes, slave_nodes) = get_existing_cluster(
@@ -695,7 +705,10 @@ def main():
       print "Stopping slaves..."
       for inst in slave_nodes:
         if inst.state not in ["shutting-down", "terminated"]:
-          inst.stop()
+          if inst.spot_instance_request_id:
+            inst.terminate()
+          else:
+            inst.stop()
 
   elif action == "start":
     (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)


Mime
View raw message