aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wick...@apache.org
Subject git commit: combine finalization_wait when combining tasks
Date Mon, 22 Sep 2014 17:18:00 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 54f7bede4 -> 741f9dba5


combine finalization_wait when combining tasks

Problem: combine_task and concat_task helpers lose the finalization_wait
information from all but the first task Resolution: use sum/max of
finialization_wait values in concat_task/combine_task

Testing Done:
./pants src/test/python:all -vxs

Reviewed at https://reviews.apache.org/r/24752/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/741f9dba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/741f9dba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/741f9dba

Branch: refs/heads/master
Commit: 741f9dba593006306a1eab52a3b661faf123467c
Parents: 54f7bed
Author: Matthew Jeffryes <mjeffryes@twitter.com>
Authored: Mon Sep 22 10:17:53 2014 -0700
Committer: Brian Wickman <wickman@apache.org>
Committed: Mon Sep 22 10:17:53 2014 -0700

----------------------------------------------------------------------
 .../apache/thermos/config/schema_helpers.py     | 26 +++++++++++++++++---
 .../python/apache/thermos/config/test_schema.py |  6 +++--
 2 files changed, 26 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/741f9dba/src/main/python/apache/thermos/config/schema_helpers.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/config/schema_helpers.py b/src/main/python/apache/thermos/config/schema_helpers.py
index 3feef02..d88792a 100644
--- a/src/main/python/apache/thermos/config/schema_helpers.py
+++ b/src/main/python/apache/thermos/config/schema_helpers.py
@@ -53,6 +53,10 @@ class Units(object):
   """Helpers for base units of Tasks and Processes."""
 
   @classmethod
+  def safe_get(cls, unit):
+    return 0 if unit is Empty else unit.get()
+
+  @classmethod
   def optional_resources(cls, resources):
     return Resources() if resources is Empty else resources
 
@@ -60,7 +64,7 @@ class Units(object):
   def resources_sum(cls, *resources):
     """Add two Resources objects together."""
     def add_unit(f1, f2):
-      return (0 if f1 is Empty else f1.get()) + (0 if f2 is Empty else f2.get())
+      return cls.safe_get(f1) + cls.safe_get(f2)
 
     def add(r1, r2):
       return Resources(cpu=add_unit(r1.cpu(), r2.cpu()),
@@ -70,11 +74,16 @@ class Units(object):
     return reduce(add, map(cls.optional_resources, resources), Resources(cpu=0, ram=0, disk=0))
 
   @classmethod
+  def finalization_wait_sum(cls, waits):
+    """Return a finalization_wait that is the sum of the inputs"""
+    return sum(map(cls.safe_get, waits))
+
+  @classmethod
   def resources_max(cls, resources):
     """Return a Resource object that is the maximum of the inputs along each
       resource dimension."""
     def max_unit(f1, f2):
-      return max(0 if f1 is Empty else f1.get(), 0 if f2 is Empty else f2.get())
+      return max(cls.safe_get(f1), cls.safe_get(f2))
 
     def resource_max(r1, r2):
       return Resources(cpu=max_unit(r1.cpu(), r2.cpu()),
@@ -85,6 +94,11 @@ class Units(object):
         map(cls.optional_resources, resources), Resources(cpu=0, ram=0, disk=0))
 
   @classmethod
+  def finalization_wait_max(cls, waits):
+    """Return a finalization_wait that is the maximum of the inputs"""
+    return max([0] + map(cls.safe_get, waits))
+
+  @classmethod
   def processes_merge(cls, tasks):
     """Return a deduped list of the processes from all tasks."""
     return list(set(itertools.chain.from_iterable(task.processes() for task in tasks)))
@@ -142,6 +156,7 @@ class Tasks(object):
     return base(
       resources=Units.resources_sum(*(task.resources() for task in tasks)),
       constraints=Units.constraints_merge(tasks),
+      finalization_wait=Units.finalization_wait_max(task.finalization_wait() for task in
tasks),
       **kw
     )
 
@@ -152,7 +167,6 @@ class Tasks(object):
     if len(tasks) == 0:
       return Task()
     base = cls._combine_processes(*tasks)
-    base = base(resources=Units.resources_max(task.resources() for task in tasks))
     base_constraints = Units.constraints_merge(tasks)
     # TODO(wickman) be smarter about this in light of existing constraints
     for (t1, t2) in zip(tasks[0:-1], tasks[1:]):
@@ -160,7 +174,11 @@ class Tasks(object):
         for p2 in t2.processes():
           if p1 != p2:
             base_constraints.extend(Processes.order(p1, p2))
-    return base(constraints=base_constraints, **kw)
+    return base(
+        resources=Units.resources_max(task.resources() for task in tasks),
+        constraints=base_constraints,
+        finalization_wait=Units.finalization_wait_sum(task.finalization_wait() for task in
tasks),
+        **kw)
 
   @classmethod
   def simple(cls, name, command):

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/741f9dba/src/test/python/apache/thermos/config/test_schema.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/thermos/config/test_schema.py b/src/test/python/apache/thermos/config/test_schema.py
index 85b04c0..7e18674 100644
--- a/src/test/python/apache/thermos/config/test_schema.py
+++ b/src/test/python/apache/thermos/config/test_schema.py
@@ -75,9 +75,9 @@ def test_combine_tasks():
   r111 = Units.resources_sum(r100, r010, r001)
 
   t1 = Task(name="p1p2", processes=[p1, p2], constraints=order(p1, p2),
-            resources=Units.resources_sum(r100, r010))
+            resources=Units.resources_sum(r100, r010), finalization_wait=60)
   t2 = Task(name="p3p4", processes=[p3, p4], constraints=order(p3, p4),
-            resources=r001)
+            resources=r001, finalization_wait=45)
 
   assert combine_tasks() == Task()
   assert combine_tasks(t1) == t1
@@ -88,6 +88,7 @@ def test_combine_tasks():
   assert t3.resources() == r111
   assert set(t3.processes()) == set([p1, p2, p3, p4])
   assert set(t3.constraints()) == set(order(p1, p2) + order(p3, p4))
+  assert t3.finalization_wait().get() == t1.finalization_wait().get()
 
   t4 = concat_tasks(t1, t2)
   assert t4.name() == t2.name()
@@ -96,6 +97,7 @@ def test_combine_tasks():
   assert set(t4.constraints()) == set(
       order(p1, p2) + order(p3, p4) + order(p1, p3) + order(p1, p4) +
       order(p2, p3) + order(p2, p4))
+  assert t4.finalization_wait().get() == t1.finalization_wait().get() + t2.finalization_wait().get()
 
 
 def test_simple_task():


Mime
View raw message