spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [1/2] git commit: Enable stopping and starting a spot cluster
Date Wed, 13 Nov 2013 00:26:19 GMT
Updated Branches:
  refs/heads/master b8bf04a08 -> 87f2f4e5c


Enable stopping and starting a spot cluster


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/bc9f7eac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/bc9f7eac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/bc9f7eac

Branch: refs/heads/master
Commit: bc9f7eacb94e05ec089ee7a2d130a2e8a9e54c64
Parents: 23b53ef
Author: Ankur Dave <ankurdave@gmail.com>
Authored: Sat Nov 9 05:12:51 2013 -0800
Committer: Ankur Dave <ankurdave@gmail.com>
Committed: Mon Nov 11 17:50:31 2013 -0800

----------------------------------------------------------------------
 ec2/spark_ec2.py | 51 ++++++++++++++++++++++++++++++++-------------------
 1 file changed, 32 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bc9f7eac/ec2/spark_ec2.py
----------------------------------------------------------------------
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index f85342a..1189232 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -101,6 +101,8 @@ def parse_args():
       help="The SSH user you want to connect as (default: root)")
   parser.add_option("--delete-groups", action="store_true", default=False,
       help="When destroying a cluster, delete the security groups that were created")
+  parser.add_option("--use-existing-master", action="store_true", default=False,
+      help="Launch fresh slaves, but use an existing stopped master if possible")
 
   (opts, args) = parser.parse_args()
   if len(args) != 2:
@@ -233,9 +235,9 @@ def launch_cluster(conn, opts, cluster_name):
     slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0')
 
   # Check if instances are already running in our groups
-  active_nodes = get_existing_cluster(conn, opts, cluster_name,
-                                      die_on_error=False)
-  if any(active_nodes):
+  existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name,
+                                                           die_on_error=False)
+  if existing_slaves or (existing_masters and not opts.use_existing_master):
     print >> stderr, ("ERROR: There are already instances running in " +
         "group %s or %s" % (master_group.name, slave_group.name))
     sys.exit(1)
@@ -336,21 +338,28 @@ def launch_cluster(conn, opts, cluster_name):
                                                         zone, slave_res.id)
       i += 1
 
-  # Launch masters
-  master_type = opts.master_instance_type
-  if master_type == "":
-    master_type = opts.instance_type
-  if opts.zone == 'all':
-    opts.zone = random.choice(conn.get_all_zones()).name
-  master_res = image.run(key_name = opts.key_pair,
-                         security_groups = [master_group],
-                         instance_type = master_type,
-                         placement = opts.zone,
-                         min_count = 1,
-                         max_count = 1,
-                         block_device_map = block_map)
-  master_nodes = master_res.instances
-  print "Launched master in %s, regid = %s" % (zone, master_res.id)
+  # Launch or resume masters
+  if existing_masters:
+    print "Starting master..."
+    for inst in existing_masters:
+      if inst.state not in ["shutting-down", "terminated"]:
+        inst.start()
+    master_nodes = existing_masters
+  else:
+    master_type = opts.master_instance_type
+    if master_type == "":
+      master_type = opts.instance_type
+    if opts.zone == 'all':
+      opts.zone = random.choice(conn.get_all_zones()).name
+    master_res = image.run(key_name = opts.key_pair,
+                           security_groups = [master_group],
+                           instance_type = master_type,
+                           placement = opts.zone,
+                           min_count = 1,
+                           max_count = 1,
+                           block_device_map = block_map)
+    master_nodes = master_res.instances
+    print "Launched master in %s, regid = %s" % (zone, master_res.id)
 
   # Return all the instances
   return (master_nodes, slave_nodes)
@@ -732,6 +741,7 @@ def real_main():
         cluster_name + "?\nDATA ON EPHEMERAL DISKS WILL BE LOST, " +
         "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" +
         "AMAZON EBS IF IT IS EBS-BACKED!!\n" +
+        "All data on spot-instance slaves will be lost.\n" +
         "Stop cluster " + cluster_name + " (y/N): ")
     if response == "y":
       (master_nodes, slave_nodes) = get_existing_cluster(
@@ -743,7 +753,10 @@ def real_main():
       print "Stopping slaves..."
       for inst in slave_nodes:
         if inst.state not in ["shutting-down", "terminated"]:
-          inst.stop()
+          if inst.spot_instance_request_id:
+            inst.terminate()
+          else:
+            inst.stop()
 
   elif action == "start":
     (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)


Mime
View raw message