Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 94B2110194 for ; Wed, 13 Nov 2013 00:26:41 +0000 (UTC) Received: (qmail 12569 invoked by uid 500); 13 Nov 2013 00:26:41 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 12532 invoked by uid 500); 13 Nov 2013 00:26:41 -0000 Mailing-List: contact commits-help@spark.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@spark.incubator.apache.org Delivered-To: mailing list commits@spark.incubator.apache.org Received: (qmail 12525 invoked by uid 99); 13 Nov 2013 00:26:41 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Nov 2013 00:26:41 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 13 Nov 2013 00:26:40 +0000 Received: (qmail 12398 invoked by uid 99); 13 Nov 2013 00:26:20 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Nov 2013 00:26:20 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id EEF99823DF6; Wed, 13 Nov 2013 00:26:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: matei@apache.org To: commits@spark.incubator.apache.org Date: Wed, 13 Nov 2013 00:26:19 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] git commit: Enable stopping and starting a spot cluster X-Virus-Checked: Checked by ClamAV on apache.org Updated Branches: refs/heads/master b8bf04a08 -> 87f2f4e5c Enable stopping and starting a spot cluster Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/bc9f7eac Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/bc9f7eac Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/bc9f7eac Branch: refs/heads/master Commit: bc9f7eacb94e05ec089ee7a2d130a2e8a9e54c64 Parents: 23b53ef Author: Ankur Dave Authored: Sat Nov 9 05:12:51 2013 -0800 Committer: Ankur Dave Committed: Mon Nov 11 17:50:31 2013 -0800 ---------------------------------------------------------------------- ec2/spark_ec2.py | 51 ++++++++++++++++++++++++++++++++------------------- 1 file changed, 32 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bc9f7eac/ec2/spark_ec2.py ---------------------------------------------------------------------- diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index f85342a..1189232 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -101,6 +101,8 @@ def parse_args(): help="The SSH user you want to connect as (default: root)") parser.add_option("--delete-groups", action="store_true", default=False, help="When destroying a cluster, delete the security groups that were created") + parser.add_option("--use-existing-master", action="store_true", default=False, + help="Launch fresh slaves, but use an existing stopped master if possible") (opts, args) = parser.parse_args() if len(args) != 2: @@ -233,9 +235,9 @@ def launch_cluster(conn, opts, cluster_name): slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0') # Check if instances are already running in our groups - active_nodes = get_existing_cluster(conn, opts, cluster_name, - die_on_error=False) - if any(active_nodes): + existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name, + die_on_error=False) + if existing_slaves or (existing_masters and not opts.use_existing_master): print >> stderr, ("ERROR: There are already instances running in " + "group %s or %s" % (master_group.name, slave_group.name)) sys.exit(1) @@ -336,21 +338,28 @@ def launch_cluster(conn, opts, cluster_name): zone, slave_res.id) i += 1 - # Launch masters - master_type = opts.master_instance_type - if master_type == "": - master_type = opts.instance_type - if opts.zone == 'all': - opts.zone = random.choice(conn.get_all_zones()).name - master_res = image.run(key_name = opts.key_pair, - security_groups = [master_group], - instance_type = master_type, - placement = opts.zone, - min_count = 1, - max_count = 1, - block_device_map = block_map) - master_nodes = master_res.instances - print "Launched master in %s, regid = %s" % (zone, master_res.id) + # Launch or resume masters + if existing_masters: + print "Starting master..." + for inst in existing_masters: + if inst.state not in ["shutting-down", "terminated"]: + inst.start() + master_nodes = existing_masters + else: + master_type = opts.master_instance_type + if master_type == "": + master_type = opts.instance_type + if opts.zone == 'all': + opts.zone = random.choice(conn.get_all_zones()).name + master_res = image.run(key_name = opts.key_pair, + security_groups = [master_group], + instance_type = master_type, + placement = opts.zone, + min_count = 1, + max_count = 1, + block_device_map = block_map) + master_nodes = master_res.instances + print "Launched master in %s, regid = %s" % (zone, master_res.id) # Return all the instances return (master_nodes, slave_nodes) @@ -732,6 +741,7 @@ def real_main(): cluster_name + "?\nDATA ON EPHEMERAL DISKS WILL BE LOST, " + "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" + "AMAZON EBS IF IT IS EBS-BACKED!!\n" + + "All data on spot-instance slaves will be lost.\n" + "Stop cluster " + cluster_name + " (y/N): ") if response == "y": (master_nodes, slave_nodes) = get_existing_cluster( @@ -743,7 +753,10 @@ def real_main(): print "Stopping slaves..." for inst in slave_nodes: if inst.state not in ["shutting-down", "terminated"]: - inst.stop() + if inst.spot_instance_request_id: + inst.terminate() + else: + inst.stop() elif action == "start": (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)