storm-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [storm] Ethanlm commented on a change in pull request #3096: STORM-3480 Implement One Worker Per Executor RAS Option
Date Wed, 07 Aug 2019 20:29:38 GMT
Ethanlm commented on a change in pull request #3096: STORM-3480 Implement One Worker Per Executor
RAS Option
URL: https://github.com/apache/storm/pull/3096#discussion_r311748333
 
 

 ##########
 File path: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
 ##########
 @@ -133,50 +232,91 @@ public void testDefaultResourceAwareStrategySharedMemory() {
         INimbus iNimbus = new INimbusTest();
         Map<String, SupervisorDetails> supMap = genSupervisors(4, 4, 500, 2000);
         Config conf = createClusterConfig(cpuPercent, memoryOnHeap, memoryOffHeap, null);
-        
+
         conf.put(Config.TOPOLOGY_PRIORITY, 0);
         conf.put(Config.TOPOLOGY_NAME, "testTopology");
         conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 2000);
+        conf.put(Config.TOPOLOGY_RAS_ONE_WORKER_PER_EXECUTOR, oneWorkerPerExecutor);
         TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormToplogy,
0,
                 genExecsAndComps(stormToplogy), CURRENT_TIME, "user");
 
         Topologies topologies = new Topologies(topo);
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()),
supMap, new HashMap<>(), topologies, conf);
 
         scheduler = new ResourceAwareScheduler();
-
         scheduler.prepare(conf);
         scheduler.schedule(topologies, cluster);
-        
+
         for (Entry<String, SupervisorResources> entry: cluster.getSupervisorsResourcesMap().entrySet())
{
             String supervisorId = entry.getKey();
             SupervisorResources resources = entry.getValue();
             assertTrue(supervisorId, resources.getTotalCpu() >= resources.getUsedCpu());
             assertTrue(supervisorId, resources.getTotalMem() >= resources.getUsedMem());
         }
 
-        // Everything should fit in a single slot
-        int totalNumberOfTasks = (spoutParallelism + (boltParallelism * numBolts));
-        double totalExpectedCPU = totalNumberOfTasks * cpuPercent;
-        double totalExpectedOnHeap = (totalNumberOfTasks * memoryOnHeap) + sharedOnHeap;
-        double totalExpectedWorkerOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWorker;
-        
-        SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
-        assertEquals(1, assignment.getSlots().size());
-        WorkerSlot ws = assignment.getSlots().iterator().next();
-        String nodeId = ws.getNodeId();
-        assertEquals(1, assignment.getNodeIdToTotalSharedOffHeapMemory().size());
-        assertEquals(sharedOffHeapNode, assignment.getNodeIdToTotalSharedOffHeapMemory().get(nodeId),
0.01);
-        assertEquals(1, assignment.getScheduledResources().size());
-        WorkerResources resources = assignment.getScheduledResources().get(ws);
-        assertEquals(totalExpectedCPU, resources.get_cpu(), 0.01);
-        assertEquals(totalExpectedOnHeap, resources.get_mem_on_heap(), 0.01);
-        assertEquals(totalExpectedWorkerOffHeap, resources.get_mem_off_heap(), 0.01);
-        assertEquals(sharedOnHeap, resources.get_shared_mem_on_heap(), 0.01);
-        assertEquals(sharedOffHeapWorker, resources.get_shared_mem_off_heap(), 0.01);
+        if (!oneWorkerPerExecutor) {
+            // Everything should fit in a single slot
+            int totalNumberOfTasks = (spoutParallelism + (boltParallelism * numBolts));
+            double totalExpectedCPU = totalNumberOfTasks * cpuPercent;
+            double totalExpectedOnHeap = (totalNumberOfTasks * memoryOnHeap) + sharedOnHeap;
+            double totalExpectedWorkerOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWorker;
+
+            SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
+            assertThat(assignment.getSlots().size(), is(1));
+            WorkerSlot ws = assignment.getSlots().iterator().next();
+            String nodeId = ws.getNodeId();
+            assertThat(assignment.getNodeIdToTotalSharedOffHeapNodeMemory().size(), is(1));
+            assertThat(assignment.getNodeIdToTotalSharedOffHeapNodeMemory().get(nodeId),
closeTo(sharedOffHeapNode, 0.01));
+            assertThat(assignment.getScheduledResources().size(), is(1));
+            WorkerResources resources = assignment.getScheduledResources().get(ws);
+            assertThat(resources.get_cpu(), closeTo(totalExpectedCPU, 0.01));
+            assertThat(resources.get_mem_on_heap(), closeTo(totalExpectedOnHeap, 0.01));
+            assertThat(resources.get_mem_off_heap(), closeTo(totalExpectedWorkerOffHeap,
0.01));
+            assertThat(resources.get_shared_mem_on_heap(), closeTo(sharedOnHeap, 0.01));
+            assertThat(resources.get_shared_mem_off_heap(), closeTo(sharedOffHeapWorker,
0.01));
+        } else {
+            // one worker per executor
+            // [3,3] [7,7], [0,0] [2,2] [6,6] [1,1] [5,5] [4,4] sorted executor ordering
+            // spout  [0,0] [1,1]
+            // bolt-1 [2,2] [3,3]
+            // bolt-2 [6,6] [7,7]
+            // bolt-3 [4,4] [5,5]
+
+            // expect 8 workers over 2 nodes
+            // node r000s000 workers: bolt-1 bolt-2 spout bolt-1 (no memory sharing)
+            // node r000s001 workers: bolt-2 spout bolt-3 bolt-3 (no memory sharing)
+            int totalNumberOfTasks = (spoutParallelism + (boltParallelism * numBolts));
 
 Review comment:
   This line can be put before `if .. else. ..`. And don't need `( .. )`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message