apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Isha Arkatkar <i...@datatorrent.com>
Subject Support for Anti-Affinity in Apex
Date Tue, 19 Jan 2016 21:21:41 GMT
Hi all,

   We want add support for Anti-affinity in Apex to allow applications to
launch specific physical operators on different nodes(APEXCORE-10
<https://issues.apache.org/jira/browse/APEXCORE-10>). Want to request your
suggestions/ideas for the same!

  The reasons for using anti-affinity in operators could be: to ensure
reliability, for performance reasons (such as application may not want 2
i/o intensive operators to land on the same node to improve performance) or
for some application specific constraints(for example,  2 partitions cannot
be run on the same node since they use same port number). This is the
general rationale for adding Anti-affinity support.

Since, Yarn does not support anti-affinity yet (YARN-1042
<https://issues.apache.org/jira/browse/YARN-1042>), we need to implement
the logic in AM. Wanted to get your views on following aspects for this
implementation:

*1. How to specify anti-affinity for physical operators/partitions in
application:*
    One way for this is to have an attribute for setting anti-affinity at
the logical operator context. And an operator can set this attribute with
list of operator names which should not be collocated.
     Consider dag with 3 operators:
     TestOperator o1 = dag.addOperator("O1", new TestOperator());
     TestOperator o2 = dag.addOperator("O2", new TestOperator());
     TestOperator o3 = dag.addOperator("O3", new TestOperator());

 To set anti-affinity for O1 operator:
    dag.setAttribute(o1, OperatorContext.ANTI_AFFINITY, new
ArrayList<String>(Arrays.asList("O2", "O3")));
     This would mean O1 should not be allocated on nodes containing
operators O2 and O3. This applies to all allocated partitions of O1, O2, O3.

   Also, if same operator name is part of anti-affinity list, it means
partitions of the operator should not be allocated on the same node.
example:
    dag.setAttribute(o2, OperatorContext.ANTI_AFFINITY, new
ArrayList<String>(Arrays.asList("O2")));
    This indicates anti-affinity between all partitions of O2. i.e. all
partitions of O2 should be launched on different nodes.

   Based on the anti-affinity attribute specified for logical operator,
during physical plan creation, we can add this list to each PTContainer.
This in turn will be available for Stram for sending container requests
accordingly.

   Please suggest if there is a better way to express this intent.

*2. How to implement anti-affinity in AM*
   There are 2 ways we can implement this:
  * a. Blacklisting of nodes: *We can group the physical container requests
based on anti-affinity requirements and send allocation requests for
containers in groups. After first group is done, blacklist the nodes before
sending second group of container requests. This will ensure that the
containers with anti-affinity requirements  will be allocated on different
nodes.
*   b. Node specific container request: *Explore and create a map of nodes
present in the cluster and send allocation request for container on a
specific node, honoring anti-affinity. There are couple of open Yarn Jiras
for node specific container requests: YARN-1412
<https://issues.apache.org/jira/browse/YARN-1412>, YARN-2027
<https://issues.apache.org/jira/browse/YARN-2027>. So, need to check if
this is a plausible approach.

*3. Strict Vs Relaxed anti-affinity*
  Depending on cluster resources availability, it may not be possible to
honor all anti-affinity requirements specified.
*Strict Anti-affinity:* AM will keep trying to allocate containers as per
anti-affinity requirements indefinitely. This behavior will be similar to
how an application shows in ACCEPTED state, till resources are available to
launch in cluster.
*Relaxed Anti-affinity:* AM will drop the anti-affinity constraint after a
certain timeout.

We need a way to set this attribute through application. (Either in
operator context or in DAGContext for application wide setting.)

*4. How do we unit test this feature*
  We could use Mockito for mocking Yarn behaviors and test only AM
implementation, since it may not be easy to simulate some scenarios
manually in cluster. Please suggest if there are better ways to test this.

Please suggest improvements or any other ideas on all of the above.

Thanks!
Isha

P.S. Sorry for long email. Please let me know if I should start separate
threads for any of the above points.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message