Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 05A7E10C81 for ; Tue, 3 Dec 2013 11:51:51 +0000 (UTC) Received: (qmail 84207 invoked by uid 500); 3 Dec 2013 11:51:37 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 84125 invoked by uid 500); 3 Dec 2013 11:51:35 -0000 Mailing-List: contact commits-help@tajo.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.incubator.apache.org Delivered-To: mailing list commits@tajo.incubator.apache.org Received: (qmail 84097 invoked by uid 99); 3 Dec 2013 11:51:31 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Dec 2013 11:51:31 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 03 Dec 2013 11:51:29 +0000 Received: (qmail 79617 invoked by uid 99); 3 Dec 2013 11:50:57 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Dec 2013 11:50:57 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 9C50991B679; Tue, 3 Dec 2013 11:50:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jihoonson@apache.org To: commits@tajo.incubator.apache.org Date: Tue, 03 Dec 2013 11:51:02 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [07/18] TAJO-317: Improve TajoResourceManager to support more elaborate resource management. (Keuntae Park via jihoon) X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestTajoResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestTajoResourceManager.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestTajoResourceManager.java new file mode 100644 index 0000000..428bf46 --- /dev/null +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestTajoResourceManager.java @@ -0,0 +1,390 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master; + +import com.google.protobuf.RpcCallback; +import org.apache.hadoop.yarn.proto.YarnProtos; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryId; +import org.apache.tajo.QueryIdFactory; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ipc.TajoMasterProtocol.*; +import org.apache.tajo.master.rm.TajoWorkerResourceManager; +import org.apache.tajo.master.rm.WorkerResource; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestTajoResourceManager { + private final PrimitiveProtos.BoolProto BOOL_TRUE = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build(); + private final PrimitiveProtos.BoolProto BOOL_FALSE = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build(); + + TajoConf tajoConf; + TajoWorkerResourceManager tajoWorkerResourceManager; + long queryIdTime = System.currentTimeMillis(); + int numWorkers = 5; + float workerDiskSlots = 5.0f; + int workerMemoryMB = 512 * 10; + WorkerResourceAllocationResponse response; + + private void initResourceManager(boolean queryMasterMode) throws Exception { + tajoConf = new org.apache.tajo.conf.TajoConf(); + + tajoConf.setFloatVar(TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT, 0.0f); + tajoConf.setIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB, 512); + + tajoWorkerResourceManager = new TajoWorkerResourceManager(tajoConf); + + for(int i = 0; i < numWorkers; i++) { + ServerStatusProto.System system = ServerStatusProto.System.newBuilder() + .setAvailableProcessors(1) + .setFreeMemoryMB(workerMemoryMB) + .setMaxMemoryMB(workerMemoryMB) + .setTotalMemoryMB(workerMemoryMB) + .build(); + + ServerStatusProto.JvmHeap jvmHeap = ServerStatusProto.JvmHeap.newBuilder() + .setFreeHeap(workerMemoryMB) + .setMaxHeap(workerMemoryMB) + .setTotalHeap(workerMemoryMB) + .build(); + + ServerStatusProto.Disk disk = ServerStatusProto.Disk.newBuilder() + .setAbsolutePath("/") + .setFreeSpace(0) + .setTotalSpace(0) + .setUsableSpace(0) + .build(); + + List disks = new ArrayList(); + + disks.add(disk); + + ServerStatusProto serverStatus = ServerStatusProto.newBuilder() + .setQueryMasterMode(queryMasterMode ? BOOL_TRUE : BOOL_FALSE) + .setTaskRunnerMode(BOOL_TRUE) + .setDiskSlots(workerDiskSlots) + .setMemoryResourceMB(workerMemoryMB) + .setJvmHeap(jvmHeap) + .setSystem(system) + .addAllDisk(disks) + .setRunningTaskNum(0) + .build(); + + TajoHeartbeat tajoHeartbeat = TajoHeartbeat.newBuilder() + .setTajoWorkerHost("host" + (i + 1)) + .setQueryId(QueryIdFactory.newQueryId(queryIdTime, i + 1).getProto()) + .setTajoQueryMasterPort(21000) + .setPeerRpcPort(29000 + i) + .setTajoWorkerHttpPort(28080 + i) + .setServerStatus(serverStatus) + .build(); + + tajoWorkerResourceManager.workerHeartbeat(tajoHeartbeat); + } + } + + + @Test + public void testHeartbeat() throws Exception { + initResourceManager(false); + assertEquals(numWorkers, tajoWorkerResourceManager.getWorkers().size()); + for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) { + assertEquals(workerMemoryMB, eachWorker.getAvailableMemoryMB()); + assertEquals(workerDiskSlots, eachWorker.getAvailableDiskSlots(), 0); + } + } + + @Test + public void testMemoryResource() throws Exception { + initResourceManager(false); + + final int minMemory = 256; + final int maxMemory = 512; + float diskSlots = 1.0f; + + QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 1); + ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(queryId); + + WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder() + .setResourceRequestPriority(ResourceRequestPriority.MEMORY) + .setNumContainers(60) + .setExecutionBlockId(ebId.getProto()) + .setMaxDiskSlotPerContainer(diskSlots) + .setMinDiskSlotPerContainer(diskSlots) + .setMinMemoryMBPerContainer(minMemory) + .setMaxMemoryMBPerContainer(maxMemory) + .build(); + + final Object monitor = new Object(); + final List containerIds = new ArrayList(); + + + RpcCallback callBack = new RpcCallback() { + + @Override + public void run(WorkerResourceAllocationResponse response) { + TestTajoResourceManager.this.response = response; + synchronized(monitor) { + monitor.notifyAll(); + } + } + }; + + tajoWorkerResourceManager.allocateWorkerResources(request, callBack); + synchronized(monitor) { + monitor.wait(); + } + + + // assert after callback + int totalUsedMemory = 0; + int totalUsedDisks = 0; + for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) { + assertEquals(0, eachWorker.getAvailableMemoryMB()); + assertEquals(0, eachWorker.getAvailableDiskSlots(), 0); + assertEquals(5.0f, eachWorker.getUsedDiskSlots(), 0); + + totalUsedMemory += eachWorker.getUsedMemoryMB(); + totalUsedDisks += eachWorker.getUsedDiskSlots(); + } + + assertEquals(workerMemoryMB * numWorkers, totalUsedMemory); + assertEquals(workerDiskSlots * numWorkers, totalUsedDisks, 0); + + assertEquals(numWorkers * 10, response.getWorkerAllocatedResourceList().size()); + + for(WorkerAllocatedResource eachResource: response.getWorkerAllocatedResourceList()) { + assertTrue( + eachResource.getAllocatedMemoryMB() >= minMemory && eachResource.getAllocatedMemoryMB() <= maxMemory); + containerIds.add(eachResource.getContainerId()); + } + + for(YarnProtos.ContainerIdProto eachContainerId: containerIds) { + tajoWorkerResourceManager.releaseWorkerResource(ebId, eachContainerId); + } + + for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) { + assertEquals(workerMemoryMB, eachWorker.getAvailableMemoryMB()); + assertEquals(0, eachWorker.getUsedMemoryMB()); + + assertEquals(workerDiskSlots, eachWorker.getAvailableDiskSlots(), 0); + assertEquals(0.0f, eachWorker.getUsedDiskSlots(), 0); + } + } + + @Test + public void testMemoryNotCommensurable() throws Exception { + initResourceManager(false); + + final int minMemory = 200; + final int maxMemory = 500; + float diskSlots = 1.0f; + + QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 2); + ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(queryId); + + int requiredContainers = 60; + + int numAllocatedContainers = 0; + + int loopCount = 0; + while(true) { + WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder() + .setResourceRequestPriority(ResourceRequestPriority.MEMORY) + .setNumContainers(requiredContainers - numAllocatedContainers) + .setExecutionBlockId(ebId.getProto()) + .setMaxDiskSlotPerContainer(diskSlots) + .setMinDiskSlotPerContainer(diskSlots) + .setMinMemoryMBPerContainer(minMemory) + .setMaxMemoryMBPerContainer(maxMemory) + .build(); + + final Object monitor = new Object(); + + RpcCallback callBack = new RpcCallback() { + @Override + public void run(WorkerResourceAllocationResponse response) { + TestTajoResourceManager.this.response = response; + synchronized(monitor) { + monitor.notifyAll(); + } + } + }; + + tajoWorkerResourceManager.allocateWorkerResources(request, callBack); + synchronized(monitor) { + monitor.wait(); + } + + numAllocatedContainers += TestTajoResourceManager.this.response.getWorkerAllocatedResourceList().size(); + + //release resource + for(WorkerAllocatedResource eachResource: TestTajoResourceManager.this.response.getWorkerAllocatedResourceList()) { + assertTrue( + eachResource.getAllocatedMemoryMB() >= minMemory && eachResource.getAllocatedMemoryMB() <= maxMemory); + tajoWorkerResourceManager.releaseWorkerResource(ebId, eachResource.getContainerId()); + } + + for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) { + assertEquals(0, eachWorker.getUsedMemoryMB()); + assertEquals(workerMemoryMB, eachWorker.getAvailableMemoryMB()); + + assertEquals(0.0f, eachWorker.getUsedDiskSlots(), 0); + assertEquals(workerDiskSlots, eachWorker.getAvailableDiskSlots(), 0); + } + + loopCount++; + + if(loopCount == 2) { + assertEquals(requiredContainers, numAllocatedContainers); + break; + } + } + + for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) { + assertEquals(0, eachWorker.getUsedMemoryMB()); + assertEquals(workerMemoryMB, eachWorker.getAvailableMemoryMB()); + + assertEquals(0.0f, eachWorker.getUsedDiskSlots(), 0); + assertEquals(workerDiskSlots, eachWorker.getAvailableDiskSlots(), 0); + } + } + + @Test + public void testDiskResource() throws Exception { + initResourceManager(false); + final float minDiskSlots = 1.0f; + final float maxDiskSlots = 2.0f; + int memoryMB = 256; + + QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 3); + ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(queryId); + + WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder() + .setResourceRequestPriority(ResourceRequestPriority.DISK) + .setNumContainers(60) + .setExecutionBlockId(ebId.getProto()) + .setMaxDiskSlotPerContainer(maxDiskSlots) + .setMinDiskSlotPerContainer(minDiskSlots) + .setMinMemoryMBPerContainer(memoryMB) + .setMaxMemoryMBPerContainer(memoryMB) + .build(); + + final Object monitor = new Object(); + final List containerIds = new ArrayList(); + + + RpcCallback callBack = new RpcCallback() { + + @Override + public void run(WorkerResourceAllocationResponse response) { + TestTajoResourceManager.this.response = response; + synchronized(monitor) { + monitor.notifyAll(); + } + } + }; + + tajoWorkerResourceManager.allocateWorkerResources(request, callBack); + synchronized(monitor) { + monitor.wait(); + } + for(WorkerAllocatedResource eachResource: response.getWorkerAllocatedResourceList()) { + assertTrue("AllocatedDiskSlot:" + eachResource.getAllocatedDiskSlots(), + eachResource.getAllocatedDiskSlots() >= minDiskSlots && + eachResource.getAllocatedDiskSlots() <= maxDiskSlots); + containerIds.add(eachResource.getContainerId()); + } + + // assert after callback + int totalUsedDisks = 0; + for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) { + //each worker allocated 3 container (2 disk slot = 2, 1 disk slot = 1) + assertEquals(0, eachWorker.getAvailableDiskSlots(), 0); + assertEquals(5.0f, eachWorker.getUsedDiskSlots(), 0); + assertEquals(256 * 3, eachWorker.getUsedMemoryMB()); + + totalUsedDisks += eachWorker.getUsedDiskSlots(); + } + + assertEquals(workerDiskSlots * numWorkers, totalUsedDisks, 0); + + assertEquals(numWorkers * 3, response.getWorkerAllocatedResourceList().size()); + + for(YarnProtos.ContainerIdProto eachContainerId: containerIds) { + tajoWorkerResourceManager.releaseWorkerResource(ebId, eachContainerId); + } + + for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) { + assertEquals(workerMemoryMB, eachWorker.getAvailableMemoryMB()); + assertEquals(0, eachWorker.getUsedMemoryMB()); + + assertEquals(workerDiskSlots, eachWorker.getAvailableDiskSlots(), 0); + assertEquals(0.0f, eachWorker.getUsedDiskSlots(), 0); + } + } + + @Test + public void testQueryMasterResource() throws Exception { + initResourceManager(true); + + int qmDefaultMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB); + float qmDefaultDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT); + + QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 4); + + tajoWorkerResourceManager.allocateQueryMaster(queryId); + + // assert after callback + int totalUsedMemory = 0; + int totalUsedDisks = 0; + for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) { + if(eachWorker.getUsedMemoryMB() > 0) { + //worker which allocated querymaster + assertEquals(qmDefaultMemoryMB, eachWorker.getUsedMemoryMB()); + assertEquals(qmDefaultDiskSlots, eachWorker.getUsedDiskSlots(), 0); + } else { + assertEquals(0, eachWorker.getUsedMemoryMB()); + assertEquals(0, eachWorker.getUsedDiskSlots(), 0); + } + + totalUsedMemory += eachWorker.getUsedMemoryMB(); + totalUsedDisks += eachWorker.getUsedDiskSlots(); + } + + assertEquals(qmDefaultMemoryMB, totalUsedMemory); + assertEquals(qmDefaultDiskSlots, totalUsedDisks, 0); + + //release + tajoWorkerResourceManager.stopQueryMaster(queryId); + for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) { + assertEquals(0, eachWorker.getUsedMemoryMB()); + assertEquals(0, eachWorker.getUsedDiskSlots(), 0); + totalUsedMemory += eachWorker.getUsedMemoryMB(); + totalUsedDisks += eachWorker.getUsedDiskSlots(); + } + } +}