storm-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [storm] srdo commented on a change in pull request #3096: STORM-3480 Implement One Worker Per Executor RAS Option
Date Sat, 03 Aug 2019 08:46:56 GMT
srdo commented on a change in pull request #3096: STORM-3480 Implement One Worker Per Executor
RAS Option
URL: https://github.com/apache/storm/pull/3096#discussion_r310344488
 
 

 ##########
 File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
 ##########
 @@ -104,79 +107,214 @@ public void cleanup() {
         }
     }
 
-    /**
-     * test if the scheduling logic for the DefaultResourceAwareStrategy is correct
+    /*
+     * test assigned memory with shared memory types and oneWorkerPerExecutor
      */
     @Test
-    public void testDefaultResourceAwareStrategySharedMemory() {
-        int spoutParallelism = 2;
-        int boltParallelism = 2;
-        int numBolts = 3;
+    public void testMultipleSharedMemoryWithOneWorkerPerExecutor() {
+        int spoutParallelism = 4;
         double cpuPercent = 10;
         double memoryOnHeap = 10;
         double memoryOffHeap = 10;
-        double sharedOnHeap = 500;
-        double sharedOffHeapNode = 700;
-        double sharedOffHeapWorker = 500;
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout("spout", new TestSpout(),
-                spoutParallelism);
-        builder.setBolt("bolt-1", new TestBolt(),
-                boltParallelism).addSharedMemory(new SharedOffHeapWithinWorker(sharedOffHeapWorker,
"bolt-1 shared off heap worker")).shuffleGrouping("spout");
-        builder.setBolt("bolt-2", new TestBolt(),
-                boltParallelism).addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapNode,
"bolt-2 shared node")).shuffleGrouping("bolt-1");
-        builder.setBolt("bolt-3", new TestBolt(),
-                boltParallelism).addSharedMemory(new SharedOnHeap(sharedOnHeap, "bolt-3 shared
worker")).shuffleGrouping("bolt-2");
-
-        StormTopology stormToplogy = builder.createTopology();
-
-        INimbus iNimbus = new INimbusTest();
-        Map<String, SupervisorDetails> supMap = genSupervisors(4, 4, 500, 2000);
-        Config conf = createClusterConfig(cpuPercent, memoryOnHeap, memoryOffHeap, null);
-        
-        conf.put(Config.TOPOLOGY_PRIORITY, 0);
-        conf.put(Config.TOPOLOGY_NAME, "testTopology");
-        conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 2000);
-        TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormToplogy,
0,
-                genExecsAndComps(stormToplogy), CURRENT_TIME, "user");
-
-        Topologies topologies = new Topologies(topo);
-        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()),
supMap, new HashMap<>(), topologies, conf);
+        double sharedOnHeap = 450;
+        double sharedOffHeapNode = 600;
+        double sharedOffHeapWorker = 400;
+
+        for (sharedMemoryTypes memoryType : sharedMemoryTypes.values()) {
+            TopologyBuilder builder = new TopologyBuilder();
+            switch (memoryType) {
+                case sharedOffHeapNodeType:
+                    builder.setSpout("spout", new TestSpout(), spoutParallelism).
+                            addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapNode,
"spout shared node"));
+                    break;
+                case sharedOffHeapWorkerType:
+                    builder.setSpout("spout", new TestSpout(), spoutParallelism).
+                            addSharedMemory(new SharedOffHeapWithinWorker(sharedOffHeapWorker,
"spout shared off heap worker"));
+                    break;
+                case sharedOnHeapType:
+                    builder.setSpout("spout", new TestSpout(), spoutParallelism).
+                            addSharedMemory(new SharedOnHeap(sharedOnHeap, "spout shared
worker"));
+                    break;
+            }
+            StormTopology stormToplogy = builder.createTopology();
+            INimbus iNimbus = new INimbusTest();
+            Map<String, SupervisorDetails> supMap = genSupervisors(4, 4, 500, 1000);
+            Config conf = createClusterConfig(cpuPercent, memoryOnHeap, memoryOffHeap, null);
+
+            conf.put(Config.TOPOLOGY_PRIORITY, 0);
+            conf.put(Config.TOPOLOGY_NAME, "testTopology");
+            conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 2000);
+            conf.put(Config.TOPOLOGY_RAS_ONE_WORKER_PER_EXECUTOR, true);
+            TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormToplogy,
0,
+                    genExecsAndComps(stormToplogy), CURRENT_TIME, "user");
+
+            Topologies topologies = new Topologies(topo);
+            Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()),
supMap, new HashMap<>(), topologies, conf);
+
+            scheduler = new ResourceAwareScheduler();
+            scheduler.prepare(conf);
+            scheduler.schedule(topologies, cluster);
+
+            TopologyResources topologyResources = cluster.getTopologyResourcesMap().get(topo.getId());
+            SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
+            long numNodes = assignment.getSlotToExecutors().keySet().stream().map(ws ->
ws.getNodeId()).distinct().count();
+
+            switch (memoryType) {
+                case sharedOffHeapNodeType:
+                    // 4 workers on single node. OffHeapNode memory is shared
+                    assertThat(topologyResources.getAssignedMemOnHeap(), closeTo(spoutParallelism
* memoryOnHeap, 0.01));
+                    assertThat(topologyResources.getAssignedMemOffHeap(), closeTo(spoutParallelism
* memoryOffHeap + sharedOffHeapNode, 0.01));
+                    assertThat(topologyResources.getAssignedSharedMemOnHeap(), closeTo(0,
0.01));
+                    assertThat(topologyResources.getAssignedSharedMemOffHeap(), closeTo(sharedOffHeapNode,
0.01));
+                    assertThat(topologyResources.getAssignedNonSharedMemOnHeap(), closeTo(spoutParallelism
* memoryOnHeap, 0.01));
+                    assertThat(topologyResources.getAssignedNonSharedMemOffHeap(), closeTo(spoutParallelism
* memoryOffHeap, 0.01));
+                    assertThat(numNodes, is(1L));
+                    assertThat(cluster.getAssignedNumWorkers(topo), is(spoutParallelism));
+                    break;
+                case sharedOffHeapWorkerType:
+                    // 4 workers on 2 nodes. OffHeapWorker memory not shared -- consumed
4x, once for each worker)
+                    assertThat(topologyResources.getAssignedMemOnHeap(), closeTo(spoutParallelism
* memoryOnHeap, 0.01));
+                    assertThat(topologyResources.getAssignedMemOffHeap(), closeTo(spoutParallelism
* (memoryOffHeap + sharedOffHeapWorker), 0.01));
+                    assertThat(topologyResources.getAssignedSharedMemOnHeap(), closeTo(0,
0.01));
+                    assertThat(topologyResources.getAssignedSharedMemOffHeap(), closeTo(spoutParallelism
* sharedOffHeapWorker, 0.01));
+                    assertThat(topologyResources.getAssignedNonSharedMemOnHeap(), closeTo(spoutParallelism
* memoryOnHeap, 0.01));
+                    assertThat(topologyResources.getAssignedNonSharedMemOffHeap(), closeTo(spoutParallelism
* memoryOffHeap, 0.01));
+                    assertThat(numNodes, is(2L));
+                    assertThat(cluster.getAssignedNumWorkers(topo), is(spoutParallelism));
+                    break;
+                case sharedOnHeapType:
+                    // 4 workers on 2 nodes. onHeap memory not shared -- consumed 4x, once
for each worker
+                    assertThat(topologyResources.getAssignedMemOnHeap(), closeTo(spoutParallelism
* (memoryOnHeap + sharedOnHeap), 0.01));
+                    assertThat(topologyResources.getAssignedMemOffHeap(), closeTo(spoutParallelism
* memoryOffHeap, 0.01));
+                    assertThat(topologyResources.getAssignedSharedMemOnHeap(), closeTo(spoutParallelism
* sharedOnHeap, 0.01));
+                    assertThat(topologyResources.getAssignedSharedMemOffHeap(), closeTo(0,
0.01));
+                    assertThat(topologyResources.getAssignedNonSharedMemOnHeap(), closeTo(spoutParallelism
* memoryOnHeap, 0.01));
+                    assertThat(topologyResources.getAssignedNonSharedMemOffHeap(), closeTo(spoutParallelism
* memoryOffHeap, 0.01));
+                    assertThat(numNodes, is(2L));
+                    assertThat(cluster.getAssignedNumWorkers(topo), is(spoutParallelism));
+                    break;
+            }
+        }
+    }
 
-        scheduler = new ResourceAwareScheduler();
+    /**
+     * test if the scheduling shared memory is correct with/without oneWorkerPerExecutor
enabled
+     */
+    @Test
+    public void testDefaultResourceAwareStrategySharedMemory() {
+        boolean oneWorkerPerExecutorValues[] = {false, true};
+        for (boolean oneWorkerPerExecutor : oneWorkerPerExecutorValues) {
 
 Review comment:
   Same here, this should also be using `@ParameterizedTest`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message