spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joshro...@apache.org
Subject git commit: Spark-3213 Fixes issue with spark-ec2 not detecting slaves created with "Launch More like this"
Date Wed, 27 Aug 2014 21:26:21 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 90f8f3eed -> 3cb4e1718


Spark-3213 Fixes issue with spark-ec2 not detecting slaves created with "Launch More like
this"

... copy the spark_cluster_tag from a spot instance requests over to the instances.

Author: Vida Ha <vida@databricks.com>

Closes #2163 from vidaha/vida/spark-3213 and squashes the following commits:

5070a70 [Vida Ha] Spark-3214 Fix issue with spark-ec2 not detecting slaves created with 'Launch
More Like This' and using Spot Requests

(cherry picked from commit 7faf755ae4f0cf510048e432340260a6e609066d)
Signed-off-by: Josh Rosen <joshrosen@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3cb4e171
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3cb4e171
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3cb4e171

Branch: refs/heads/branch-1.1
Commit: 3cb4e1718f40a18e3d19a33fd627960687bbcb6c
Parents: 90f8f3e
Author: Vida Ha <vida@databricks.com>
Authored: Wed Aug 27 14:26:06 2014 -0700
Committer: Josh Rosen <joshrosen@apache.org>
Committed: Wed Aug 27 14:26:16 2014 -0700

----------------------------------------------------------------------
 ec2/spark_ec2.py | 45 +++++++++++++++++++++++++--------------------
 1 file changed, 25 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3cb4e171/ec2/spark_ec2.py
----------------------------------------------------------------------
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 58261e2..afef4ef 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -40,7 +40,6 @@ from boto import ec2
 # A URL prefix from which to fetch AMI information
 AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/v2/ami-list"
 
-
 class UsageError(Exception):
     pass
 
@@ -450,38 +449,45 @@ def launch_cluster(conn, opts, cluster_name):
         print "Launched master in %s, regid = %s" % (zone, master_res.id)
 
     # Give the instances descriptive names
-    # TODO: Add retry logic for tagging with name since it's used to identify a cluster.
     for master in master_nodes:
         name = '{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)
-        for i in range(0, 5):
-            try:
-                master.add_tag(key='Name', value=name)
-            except:
-                print "Failed attempt %i of 5 to tag %s" % ((i + 1), name)
-                if (i == 5):
-                    raise "Error - failed max attempts to add name tag"
-                time.sleep(5)
-
+        tag_instance(master, name)
 
     for slave in slave_nodes:
         name = '{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)
-        for i in range(0, 5):
-            try:
-                slave.add_tag(key='Name', value=name)
-            except:
-                print "Failed attempt %i of 5 to tag %s" % ((i + 1), name)
-                if (i == 5):
-                    raise "Error - failed max attempts to add name tag"
-                time.sleep(5)
+        tag_instance(slave, name)
 
     # Return all the instances
     return (master_nodes, slave_nodes)
 
+def tag_instance(instance, name):
+    for i in range(0, 5):
+        try:
+            instance.add_tag(key='Name', value=name)
+        except:
+            print "Failed attempt %i of 5 to tag %s" % ((i + 1), name)
+            if (i == 5):
+                raise "Error - failed max attempts to add name tag"
+            time.sleep(5)
 
 # Get the EC2 instances in an existing cluster if available.
 # Returns a tuple of lists of EC2 instance objects for the masters and slaves
 def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
     print "Searching for existing cluster " + cluster_name + "..."
+    # Search all the spot instance requests, and copy any tags from the spot instance request
to the cluster.
+    spot_instance_requests = conn.get_all_spot_instance_requests()
+    for req in spot_instance_requests:
+        if req.state != u'active':
+            continue
+        name = req.tags.get(u'Name', "")
+        if name.startswith(cluster_name):
+            reservations = conn.get_all_instances(instance_ids=[req.instance_id])
+            for res in reservations:
+                active = [i for i in res.instances if is_active(i)]
+                for instance in active:
+                    if (instance.tags.get(u'Name') == None):
+                        tag_instance(instance, name)
+    # Now proceed to detect master and slaves instances.
     reservations = conn.get_all_instances()
     master_nodes = []
     slave_nodes = []
@@ -504,7 +510,6 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
             print >> sys.stderr, "ERROR: Could not find any existing cluster"
         sys.exit(1)
 
-
 # Deploy configuration files and run setup scripts on a newly launched
 # or started EC2 cluster.
 def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message