apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject incubator-apex-core git commit: APEXCORE-439 Added re-deploying of dependencies of new operators generated as a result of dynamic re-partitioning
Date Mon, 25 Apr 2016 22:06:06 GMT
Repository: incubator-apex-core
Updated Branches:
  refs/heads/release-3.3 80b9ddcd7 -> 64a344b09


APEXCORE-439 Added re-deploying of dependencies of new operators generated as a result of
dynamic re-partitioning


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/64a344b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/64a344b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/64a344b0

Branch: refs/heads/release-3.3
Commit: 64a344b091b284d17b65b47cdc20614bece547d6
Parents: 80b9ddc
Author: Pramod Immaneni <pramod@datatorrent.com>
Authored: Sat Apr 23 10:52:52 2016 -0700
Committer: Thomas Weise <thomas@datatorrent.com>
Committed: Mon Apr 25 15:05:16 2016 -0700

----------------------------------------------------------------------
 .../stram/plan/physical/PhysicalPlan.java       | 15 +++-
 .../stram/plan/physical/PhysicalPlanTest.java   | 92 ++++++++++++++++++++
 2 files changed, 104 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/64a344b0/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index c696224..6b4378b 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -976,11 +976,20 @@ public class PhysicalPlan implements Serializable
     Set<PTContainer> releaseContainers = Sets.newHashSet();
     assignContainers(newContainers, releaseContainers);
     updatePartitionsInfoForPersistOperator(this.dag);
-    this.undeployOpers.removeAll(newOpers.keySet());
-    //make sure all the new operators are included in deploy operator list
-    this.deployOpers.addAll(this.newOpers.keySet());
+
+    // redeploy dependencies of the new operators excluding the new operators themselves
+    Set<PTOperator> ndeps = getDependents(this.newOpers.keySet());
+    this.undeployOpers.addAll(ndeps);
+    this.undeployOpers.removeAll(this.newOpers.keySet());
+
     // include downstream dependencies of affected operators into redeploy
     Set<PTOperator> deployOperators = this.getDependents(this.deployOpers);
+    // include new operators and their dependencies for deployment
+    deployOperators.addAll(ndeps);
+
+    //make sure all the new operators are included in deploy operator list
+    this.deployOpers.addAll(this.newOpers.keySet());
+
     ctx.deploy(releaseContainers, this.undeployOpers, newContainers, deployOperators);
     this.newOpers.clear();
     this.deployOpers.clear();

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/64a344b0/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
index 6731a12..2fd8798 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
@@ -1484,6 +1484,98 @@ public class PhysicalPlanTest
 
   }
 
+  /**
+   * Test covering scenario when only new partitions are added during dynamic partitioning
and there
+   * are no changes to existing partitions and partition mapping
+   */
+  @Test
+  public void testAugmentedDynamicPartitioning()
+  {
+    LogicalPlan dag = new LogicalPlan();
+
+    TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
+    dag.setAttribute(o1, OperatorContext.PARTITIONER, new TestAugmentingPartitioner<TestGeneratorInputOperator>(3));
+    dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)new
PartitioningTest.PartitionLoadWatch()));
+    OperatorMeta o1Meta = dag.getMeta(o1);
+
+    GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
+    OperatorMeta o2Meta = dag.getMeta(o2);
+
+    dag.addStream("o1.outport1", o1.outport, o2.inport1);
+
+    int maxContainers = 10;
+    dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, maxContainers);
+
+    TestPlanContext ctx = new TestPlanContext();
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
+
+    PhysicalPlan plan = new PhysicalPlan(dag, ctx);
+    Assert.assertEquals("number of containers", 5, plan.getContainers().size());
+
+    List<PTOperator> o1ops = plan.getOperators(o1Meta);
+    Assert.assertEquals("number of o1 operators", 3, o1ops.size());
+
+    List<PTOperator> o2ops = plan.getOperators(o2Meta);
+    Assert.assertEquals("number of o2 operators", 1, o2ops.size());
+
+    List<PTOperator> uops = plan.getMergeOperators(o1Meta);
+
+    Set<PTOperator> expUndeploy = Sets.newLinkedHashSet();
+    expUndeploy.addAll(plan.getOperators(o2Meta));
+    expUndeploy.addAll(uops);
+
+    for (int i = 0; i < 2; ++i) {
+      PartitioningTest.PartitionLoadWatch.put(o1ops.get(i), 1);
+      plan.onStatusUpdate(o1ops.get(i));
+    }
+
+    ctx.backupRequests = 0;
+    ctx.events.remove(0).run();
+
+    Assert.assertEquals("number of containers", 7, plan.getContainers().size());
+
+    Assert.assertEquals("undeployed opertors", expUndeploy, ctx.undeploy);
+  }
+
+  private class TestAugmentingPartitioner<T> implements Partitioner<T>
+  {
+
+    int initalPartitionCount = 1;
+
+    private TestAugmentingPartitioner(int initialPartitionCount)
+    {
+      this.initalPartitionCount = initialPartitionCount;
+    }
+
+    @Override
+    public Collection<Partition<T>> definePartitions(Collection<Partition<T>>
partitions, PartitioningContext context)
+    {
+      Collection<Partition<T>> newPartitions = Lists.newArrayList(partitions);
+      int numTotal = partitions.size();
+      Partition<T> first = partitions.iterator().next();
+      if (first.getStats() == null) {
+        // Initial partition
+        numTotal = initalPartitionCount;
+      } else {
+        for (Partition<T> p : partitions) {
+          // Assumption load is non-negative
+          numTotal += p.getLoad();
+        }
+      }
+      T paritionable = first.getPartitionedInstance();
+      for (int i = partitions.size(); i < numTotal; ++i) {
+        newPartitions.add(new DefaultPartition<T>(paritionable));
+      }
+      return newPartitions;
+    }
+
+    @Override
+    public void partitioned(Map<Integer, Partition<T>> partitions)
+    {
+
+    }
+  }
+
   @Test
   public void testCascadingUnifier() {
 


Mime
View raw message