tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject tajo git commit: TAJO-2112: Improve disk load, when queries run simultaneously.
Date Tue, 19 Apr 2016 05:02:33 GMT
Repository: tajo
Updated Branches:
  refs/heads/master fe3663411 -> 71d2825cd


TAJO-2112: Improve disk load, when queries run simultaneously.

Closes #995


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/71d2825c
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/71d2825c
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/71d2825c

Branch: refs/heads/master
Commit: 71d2825cd66591cd7f15f15fa74b0f000a5c2590
Parents: fe36634
Author: Jinho Kim <jhkim@apache.org>
Authored: Tue Apr 19 14:01:45 2016 +0900
Committer: Jinho Kim <jhkim@apache.org>
Committed: Tue Apr 19 14:01:45 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../org/apache/tajo/cli/tools/TajoAdmin.java    |  13 +--
 .../java/org/apache/tajo/conf/TajoConf.java     |   6 +-
 tajo-common/src/main/proto/tajo_protos.proto    |   1 -
 .../master/scheduler/TestSimpleScheduler.java   |   6 +-
 .../org/apache/tajo/resource/TestResources.java |  10 +-
 .../tajo/worker/MockNodeResourceManager.java    |   2 +-
 .../tajo/worker/TestNodeResourceManager.java    |   1 -
 .../org/apache/tajo/worker/TestTaskManager.java |   9 +-
 .../NonForwardQueryResultSystemScanner.java     |   4 -
 .../org/apache/tajo/master/rm/NodeStatus.java   |  14 +++
 .../tajo/master/scheduler/SimpleScheduler.java  |   6 +-
 .../tajo/querymaster/DefaultTaskScheduler.java  |  37 ++++---
 .../tajo/querymaster/QueryMasterTask.java       |   4 +-
 .../java/org/apache/tajo/querymaster/Task.java  |   2 +-
 .../resource/DefaultResourceCalculator.java     |   3 -
 .../org/apache/tajo/resource/NodeResource.java  |  56 +++-------
 .../org/apache/tajo/resource/NodeResources.java |  27 ++---
 .../apache/tajo/worker/NodeResourceManager.java | 102 +++++++++++++++----
 .../apache/tajo/worker/NodeStatusUpdater.java   |   2 +-
 .../org/apache/tajo/worker/TaskExecutor.java    |  24 ++---
 .../org/apache/tajo/worker/TaskManager.java     |   2 +-
 .../event/NodeResourceDeallocateEvent.java      |  17 ++--
 .../tajo/worker/event/TaskStartEvent.java       |  12 +--
 .../tajo/ws/rs/responses/WorkerResponse.java    |   2 -
 tajo-core/src/main/proto/ResourceProtos.proto   |   1 +
 .../resources/webapps/worker/querytasks.jsp     |   4 +-
 tajo-dist/src/main/conf/tajo-site.xml.template  |   4 +-
 .../configuration/worker_configuration.rst      |  39 +++----
 .../org/apache/tajo/storage/DataLocation.java   |  13 +++
 30 files changed, 234 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index ac1a8aa..ce8994d 100644
--- a/CHANGES
+++ b/CHANGES
@@ -10,6 +10,8 @@ Release 0.12.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-2112: Improve disk load, when queries run simultaneously. (jinho)
+
     TAJO-2104: Implement Identifier which supports quotation information. 
     (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java
index 79d96e1..84e4ef5 100644
--- a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java
+++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java
@@ -344,16 +344,13 @@ public class TajoAdmin {
 
   private void writeWorkerInfo(Writer writer, List<WorkerResourceInfo> workers) throws ParseException,
       IOException, ServiceException, SQLException {
-    String fmtWorkerLine = "%1$-25s %2$-5s %3$-5s %4$-10s %5$-10s %6$-12s %7$-10s%n";
+    String fmtWorkerLine = "%1$-25s %2$-5s %3$-5s %4$-10s %5$-12s %6$-10s%n";
     String line = String.format(fmtWorkerLine,
-        "Worker", "Port", "Tasks",
-        "Mem", "Disk",
-        "Cpu", "Status");
+        "Worker", "Port", "Tasks", "Mem", "Cpu", "Status");
     writer.write(line);
     line = String.format(fmtWorkerLine,
         DASHLINE_LEN25, DASHLINE_LEN5, DASHLINE_LEN5,
-        DASHLINE_LEN10, DASHLINE_LEN10,
-        DASHLINE_LEN12, DASHLINE_LEN10);
+        DASHLINE_LEN10, DASHLINE_LEN12, DASHLINE_LEN10);
     writer.write(line);
 
     for (WorkerResourceInfo worker : workers) {
@@ -361,14 +358,12 @@ public class TajoAdmin {
       String workerHost = String.format("%s:%d", connInfo.getHost(), connInfo.getPeerRpcPort());
       String mem = String.format("%d/%d", worker.getAvailableResource().getMemory(),
           worker.getTotalResource().getMemory());
-      String disk = String.format("%d/%d", worker.getAvailableResource().getDisks(),
-          worker.getTotalResource().getDisks());
       String cpu = String.format("%d/%d", worker.getAvailableResource().getVirtualCores(),
           worker.getTotalResource().getVirtualCores());
       line = String.format(fmtWorkerLine, workerHost,
           connInfo.getPullServerPort(),
           worker.getNumRunningTasks(),
-          mem, disk, cpu, worker.getWorkerStatus());
+          mem, cpu, worker.getWorkerStatus());
       writer.write(line);
     }
     writer.write("\n\n");

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 24a5520..2e2fb18 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -186,14 +186,10 @@ public class TajoConf extends Configuration {
         Runtime.getRuntime().availableProcessors(), Validators.min("2")), // 1qm + 1task
     WORKER_RESOURCE_AVAILABLE_MEMORY_MB("tajo.worker.resource.memory-mb", 1500, Validators.min("64")),
 
-    WORKER_RESOURCE_AVAILABLE_DISKS("tajo.worker.resource.disks", 2, Validators.min("1")),
-
     WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM("tajo.worker.resource.disk.parallel-execution.num", 2,
         Validators.min("1")),
 
-    WORKER_RESOURCE_DFS_DIR_AWARE("tajo.worker.resource.dfs-dir-aware", false, Validators.bool()),
-
-    WORKER_HEARTBEAT_QUEUE_THRESHOLD_RATE("tajo.worker.heartbeat.queue.threshold-rate", 0.3f, Validators.min("0")),//30%
+    WORKER_HEARTBEAT_QUEUE_THRESHOLD_RATE("tajo.worker.heartbeat.queue.threshold-rate", 0.1f, Validators.min("0")),//10%
     WORKER_HEARTBEAT_IDLE_INTERVAL("tajo.worker.heartbeat.idle.interval", 10 * 1000),  // 10 sec
     WORKER_HEARTBEAT_ACTIVE_INTERVAL("tajo.worker.heartbeat.active.interval", 1000),  // 1 sec
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-common/src/main/proto/tajo_protos.proto
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/proto/tajo_protos.proto b/tajo-common/src/main/proto/tajo_protos.proto
index 4d7925d..1795107 100644
--- a/tajo-common/src/main/proto/tajo_protos.proto
+++ b/tajo-common/src/main/proto/tajo_protos.proto
@@ -67,7 +67,6 @@ message WorkerConnectionInfoProto {
 message NodeResourceProto {
   optional int32 memory = 1;
   optional int32 virtual_cores = 2;
-  optional int32 disks = 3;
 }
 
 enum CodecType {

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-core-tests/src/test/java/org/apache/tajo/master/scheduler/TestSimpleScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/master/scheduler/TestSimpleScheduler.java b/tajo-core-tests/src/test/java/org/apache/tajo/master/scheduler/TestSimpleScheduler.java
index ade7023..41ebd67 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/master/scheduler/TestSimpleScheduler.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/master/scheduler/TestSimpleScheduler.java
@@ -75,7 +75,7 @@ public class TestSimpleScheduler {
   @Before
   public void setup() {
     conf = new TajoConf();
-    nodeResource = NodeResource.createResource(2000, 2, 3);
+    nodeResource = NodeResource.createResource(2000, 3);
     service = new CompositeService(TestSimpleScheduler.class.getSimpleName()) {
 
       @Override
@@ -206,7 +206,7 @@ public class TestSimpleScheduler {
     targetWorkers.add(workerEntry.getKey());
 
     NodeResource expectResource = NodeResources.multiply(scheduler.getMinimumResourceCapability(), requestNum);
-    assertTrue(NodeResources.fitsIn(expectResource, workerEntry.getValue().getAvailableResource()));
+    assertTrue(NodeResources.fitsIn(expectResource, workerEntry.getValue().getReservedResource()));
 
     QueryId queryId = QueryIdFactory.newQueryId(System.nanoTime(), 0);
     NodeResourceRequest requestProto = createResourceRequest(queryId, requestNum, targetWorkers);
@@ -293,7 +293,7 @@ public class TestSimpleScheduler {
     public void stopQuery(QueryId queryId) {
       queryInfoMap.remove(queryId);
       AllocationResourceProto allocationResourceProto = qmAllocationMap.remove(queryId);
-      NodeResources.addTo(rmContext.getNodes().get(allocationResourceProto.getWorkerId()).getAvailableResource(),
+      NodeResources.addTo(rmContext.getNodes().get(allocationResourceProto.getWorkerId()).getReservedResource(),
           new NodeResource(allocationResourceProto.getResource()));
       super.stopQuery(queryId);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-core-tests/src/test/java/org/apache/tajo/resource/TestResources.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/resource/TestResources.java b/tajo-core-tests/src/test/java/org/apache/tajo/resource/TestResources.java
index eb0d732..7be0b26 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/resource/TestResources.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/resource/TestResources.java
@@ -28,12 +28,10 @@ import static org.junit.Assert.*;
 public class TestResources {
   @Test
   public void testFitsIn() {
-    assertTrue(fitsIn(createResource(512, 1, 1), createResource(1024, 2, 1)));
-    assertTrue(fitsIn(createResource(1024, 2, 1), createResource(1024, 2, 1)));
-    assertFalse(fitsIn(createResource(1024, 2, 1), createResource(512, 1, 1)));
-    assertFalse(fitsIn(createResource(512, 2, 1), createResource(1024, 1, 1)));
-    assertFalse(fitsIn(createResource(1024, 1, 1), createResource(512, 2, 1)));
-    assertFalse(fitsIn(createResource(512, 1, 2), createResource(512, 1, 1)));
+    assertTrue(fitsIn(createResource(512), createResource(1024)));
+    assertTrue(fitsIn(createResource(1024), createResource(1024)));
+    assertFalse(fitsIn(createResource(1024), createResource(512)));
+    assertFalse(fitsIn(createResource(512, 2), createResource(512)));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java
index 786498e..71dbd2d 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java
@@ -50,7 +50,7 @@ public class MockNodeResourceManager extends NodeResourceManager {
   }
 
   @Override
-  protected void startTask(TaskRequestProto request, NodeResource resource) {
+  protected void startTask(TaskRequestProto request, Allocation resource) {
     if(enableTaskHandlerEvent) {
       super.startTask(request, resource);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
index 3ddae7e..3901cdb 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
@@ -77,7 +77,6 @@ public class TestNodeResourceManager {
     conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES, 4);
     conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB,
         taskMemory * conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES));
-    conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS, 4);
     conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM, 1);
     conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM, 2);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestTaskManager.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestTaskManager.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestTaskManager.java
index e908e46..5a05789 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestTaskManager.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestTaskManager.java
@@ -21,11 +21,15 @@ package org.apache.tajo.worker;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.tajo.*;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.resource.NodeResource;
 import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.worker.NodeResourceManager.Allocation;
 import org.apache.tajo.worker.event.ExecutionBlockStopEvent;
 import org.apache.tajo.worker.event.NodeResourceAllocateEvent;
 import org.apache.tajo.worker.event.TaskStartEvent;
@@ -64,7 +68,6 @@ public class TestTaskManager {
     conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES, 4);
     conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB,
         taskMemory * conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES));
-    conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS, 4);
     conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM, 1);
     conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM, 2);
 
@@ -166,7 +169,7 @@ public class TestTaskManager {
         MockNodeResourceManager.createTaskRequests(ebId, taskMemory, 1).poll();
 
     taskDispatcher.getEventHandler().handle(new TaskStartEvent(requestProto.getTaskRequest(),
-        new NodeResource(requestProto.getResource())));
+        new Allocation(new NodeResource(requestProto.getResource()))));
 
     assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
     assertNotNull(taskManager.getExecutionBlockContext(ebId));

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
index 4d93017..c7b9754 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
@@ -495,10 +495,6 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult
           aTuple.put(fieldId, DatumFactory.createInt8(used.getMemory() * 1048576l));
         } else if ("total_mem".equalsIgnoreCase(column.getSimpleName())) {
           aTuple.put(fieldId, DatumFactory.createInt8(total.getMemory() * 1048576l));
-        } else if ("used_disk".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt4(used.getDisks()));
-        } else if ("total_disk".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt4(total.getDisks()));
         } else if ("running_tasks".equalsIgnoreCase(column.getSimpleName())) {
           aTuple.put(fieldId, DatumFactory.createInt4(aNodeStatus.getNumRunningTasks()));
         } else if ("last_heartbeat_ts".equalsIgnoreCase(column.getSimpleName())) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeStatus.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeStatus.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeStatus.java
index 63e4d52..f976d2b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeStatus.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeStatus.java
@@ -67,6 +67,9 @@ public class NodeStatus implements EventHandler<NodeEvent>, Comparable<NodeStatu
     }
   }
 
+  /** Reserved resources for scheduler calculation. */
+  private final NodeResource reservedResource;
+
   /** Available resources on the node. */
   private final NodeResource availableResource;
 
@@ -125,6 +128,7 @@ public class NodeStatus implements EventHandler<NodeEvent>, Comparable<NodeStatu
     this.lastHeartbeatTime = System.currentTimeMillis();
     this.totalResourceCapability = totalResourceCapability;
     this.availableResource = NodeResources.clone(totalResourceCapability);
+    this.reservedResource = NodeResources.clone(availableResource);
   }
 
   public int getWorkerId() {
@@ -177,6 +181,15 @@ public class NodeStatus implements EventHandler<NodeEvent>, Comparable<NodeStatu
   }
 
   /**
+   * Get current reserved resources on the node.
+   *
+   * @return current reserved resources on the node.
+   */
+  public NodeResource getReservedResource() {
+    return this.reservedResource;
+  }
+
+  /**
    * Get total resources on the node.
    *
    * @return total resources on the node.
@@ -239,6 +252,7 @@ public class NodeStatus implements EventHandler<NodeEvent>, Comparable<NodeStatu
     setNumRunningTasks(statusEvent.getRunningTaskNum());
     setNumRunningQueryMaster(statusEvent.getRunningQMNum());
     NodeResources.update(availableResource, statusEvent.getAvailableResource());
+    NodeResources.update(reservedResource, availableResource);
 
     if(statusEvent.getTotalResource() != null) {
       NodeResources.update(totalResourceCapability, statusEvent.getTotalResource());

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleScheduler.java
index 699461f..98c61d5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleScheduler.java
@@ -94,7 +94,7 @@ public class SimpleScheduler extends AbstractQueryScheduler {
     NodeResource resource = NodeResources.createResource(0);
     NodeResource totalResource = NodeResources.createResource(0);
     for (NodeStatus nodeStatus : getRMContext().getNodes().values()) {
-      NodeResources.addTo(resource, nodeStatus.getAvailableResource());
+      NodeResources.addTo(resource, nodeStatus.getReservedResource());
       NodeResources.addTo(totalResource, nodeStatus.getTotalResourceCapability());
 
     }
@@ -229,9 +229,9 @@ public class SimpleScheduler extends AbstractQueryScheduler {
           LOG.warn("Can't find the node. id :" + workerId);
           continue;
         } else {
-          if (NodeResources.fitsIn(capacity, nodeStatus.getAvailableResource())) {
+          if (NodeResources.fitsIn(capacity, nodeStatus.getReservedResource())) {
             NodeResources.subtractFrom(getClusterResource(), capacity);
-            NodeResources.subtractFrom(nodeStatus.getAvailableResource(), capacity);
+            NodeResources.subtractFrom(nodeStatus.getReservedResource(), capacity);
             allocatedResources++;
             resourceBuilder.setResource(capacity.getProto());
             resourceBuilder.setWorkerId(workerId);

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
index c1dd0a8..cdf7c5e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
@@ -42,6 +42,7 @@ import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
 import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
 import org.apache.tajo.plan.serder.LogicalNodeSerializer;
+import org.apache.tajo.resource.NodeResource;
 import org.apache.tajo.resource.NodeResources;
 import org.apache.tajo.rpc.AsyncRpcClient;
 import org.apache.tajo.rpc.CallFuture;
@@ -314,7 +315,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
 
     CallFuture<NodeResourceResponse> callBack = new CallFuture<>();
     NodeResourceRequest.Builder request = NodeResourceRequest.newBuilder();
-    request.setCapacity(NodeResources.createResource(minTaskMemory, isLeaf ? 1 : 0).getProto())
+    request.setCapacity(NodeResources.createResource(minTaskMemory).getProto())
         .setNumContainers(requestContainerNum)
         .setPriority(stage.getPriority())
         .setQueryId(context.getMasterContext().getQueryId().getProto())
@@ -394,8 +395,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
     private SortedMap<Integer, Integer> diskVolumeLoads = new TreeMap<>();
     /** The total number of remain tasks in this host */
     private AtomicInteger remainTasksNum = new AtomicInteger(0);
-    public static final int REMOTE = -2;
-
 
     public HostVolumeMapping(String host, String rack){
       this.host = host;
@@ -471,7 +470,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
     private synchronized TaskAttemptId getAndRemove(int volumeId){
       TaskAttemptId taskAttemptId = null;
       if(!unassignedTaskForEachVolume.containsKey(volumeId)) {
-        if (volumeId > REMOTE) {
+        if (volumeId > DataLocation.REMOTE_VOLUME_ID) {
           diskVolumeLoads.remove(volumeId);
         }
         return taskAttemptId;
@@ -513,7 +512,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
 
       if(tasks.isEmpty()){
         unassignedTaskForEachVolume.remove(volumeId);
-        if (volumeId > REMOTE) {
+        if (volumeId > DataLocation.REMOTE_VOLUME_ID) {
           diskVolumeLoads.remove(volumeId);
         }
       }
@@ -532,12 +531,12 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
         concurrency = diskVolumeLoads.get(volumeId) + 1;
       }
 
-      if (volumeId > -1) {
+      if (volumeId > DataLocation.UNKNOWN_VOLUME_ID) {
         info(LOG, "Assigned host : " + host + ", Volume : " + volumeId + ", Concurrency : " + concurrency);
-      } else if (volumeId == -1) {
+      } else if (volumeId == DataLocation.UNKNOWN_VOLUME_ID) {
         // this case is disabled namenode block meta or compressed text file or amazon s3
         info(LOG, "Assigned host : " + host + ", Unknown Volume : " + volumeId + ", Concurrency : " + concurrency);
-      } else if (volumeId == REMOTE) {
+      } else if (volumeId == DataLocation.REMOTE_VOLUME_ID) {
         // this case has processed all block on host and it will be assigned to remote
         info(LOG, "Assigned host : " + host + ", Remaining local tasks : " + getRemainingLocalTaskSize()
             + ", Remote Concurrency : " + concurrency + ", Unassigned volumes: " + unassignedTaskForEachVolume.size());
@@ -569,7 +568,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
       for (Map.Entry<Integer, Integer> entry : diskVolumeLoads.entrySet()) {
         if(volumeEntry == null) volumeEntry = entry;
 
-        if (entry.getKey() != REMOTE && volumeEntry.getValue() >= entry.getValue()) {
+        if (entry.getKey() != DataLocation.REMOTE_VOLUME_ID && volumeEntry.getValue() >= entry.getValue()) {
           volumeEntry = entry;
         }
       }
@@ -577,12 +576,12 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
       if(volumeEntry != null){
         return volumeEntry.getKey();
       } else {
-        return REMOTE;
+        return DataLocation.REMOTE_VOLUME_ID;
       }
     }
 
     public int getRemoteConcurrency(){
-      return getVolumeConcurrency(REMOTE);
+      return getVolumeConcurrency(DataLocation.REMOTE_VOLUME_ID);
     }
 
     public int getVolumeConcurrency(int volumeId){
@@ -825,9 +824,10 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
         // disk or host-local allocation
         //////////////////////////////////////////////////////////////////////
         TaskAttemptId attemptId = allocateLocalTask(host);
+        int assignedVolume = DataLocation.REMOTE_VOLUME_ID;
+        HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
 
         if (attemptId == null) { // if a local task cannot be found
-          HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
 
           if(!taskRequests.isEmpty()) { //if other requests remains, move to remote list for better locality
             remoteTaskRequests.add(taskRequest);
@@ -848,7 +848,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
                 continue;
               } else {
                 // assign to remote volume
-                hostVolumeMapping.increaseConcurrency(HostVolumeMapping.REMOTE);
+                hostVolumeMapping.increaseConcurrency(assignedVolume);
               }
             }
           }
@@ -869,10 +869,15 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
           }
 
           if (attemptId != null && hostVolumeMapping != null) {
-            hostVolumeMapping.lastAssignedVolumeId.put(attemptId, HostVolumeMapping.REMOTE);
+            hostVolumeMapping.lastAssignedVolumeId.put(attemptId, assignedVolume);
           }
           rackAssign++;
         } else {
+          if(hostVolumeMapping != null){
+            //Set to real volume id
+            assignedVolume = hostVolumeMapping.lastAssignedVolumeId.get(attemptId);
+          }
+
           localAssign++;
         }
 
@@ -888,6 +893,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
               stage.getDataChannel(), stage.getBlock().getEnforcer(),
               queryMasterHostAndPort);
 
+          NodeResource resource = new NodeResource(taskRequest.getResponseProto().getResource());
+
           if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) {
             taskAssign.setInterQuery();
           }
@@ -895,7 +902,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
           //TODO send batch request
           BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder();
           requestProto.addTaskRequest(TaskAllocationProto.newBuilder()
-              .setResource(taskRequest.getResponseProto().getResource())
+              .setResource(resource.getProto()).setVolumeId(assignedVolume)
               .setTaskRequest(taskAssign.getProto()).build());
 
           requestProto.setExecutionBlockId(attemptId.getTaskId().getExecutionBlockId().getProto());

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index 6030d90..8eaab3f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@ -58,6 +58,7 @@ import org.apache.tajo.storage.Tablespace;
 import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.util.RpcParameterFactory;
 import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.NodeResourceManager.Allocation;
 import org.apache.tajo.worker.event.NodeResourceDeallocateEvent;
 import org.apache.tajo.worker.event.NodeResourceEvent;
 import org.apache.tajo.worker.event.NodeStatusEvent;
@@ -176,7 +177,8 @@ public class QueryMasterTask extends CompositeService {
     EventHandler handler = getQueryTaskContext().getQueryMasterContext().getWorkerContext().
         getNodeResourceManager().getDispatcher().getEventHandler();
 
-    handler.handle(new NodeResourceDeallocateEvent(allocation, NodeResourceEvent.ResourceType.QUERY_MASTER));
+    handler.handle(new NodeResourceDeallocateEvent(new Allocation(allocation),
+        NodeResourceEvent.ResourceType.QUERY_MASTER));
 
     //flush current node resource
     handler.handle(new NodeStatusEvent(NodeStatusEvent.EventType.FLUSH_REPORTS));

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
index 466f6c9..f8b89f1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
@@ -336,7 +336,7 @@ public class Task implements EventHandler<TaskEvent> {
       diskIds = ((FileFragment)fragment).getDiskIds();
     }
     for (int i = 0; i < hosts.length; i++) {
-      dataLocations.add(new DataLocation(hosts[i], diskIds == null ? -1 : diskIds[i]));
+      dataLocations.add(new DataLocation(hosts[i], diskIds == null ? DataLocation.UNKNOWN_VOLUME_ID : diskIds[i]));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java b/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java
index 63a5965..028a0ed 100644
--- a/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java
@@ -29,9 +29,6 @@ public class DefaultResourceCalculator extends ResourceCalculator {
   public int computeAvailableContainers(NodeResource available, NodeResource required) {
     int availableContainer = Math.min(available.getMemory() / required.getMemory(),
         available.getVirtualCores() / required.getVirtualCores());
-    if (required.getDisks() > 0) {
-      availableContainer = Math.min(availableContainer, available.getDisks() / required.getDisks());
-    }
     return availableContainer;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java
index c7fe55b..2bbaf61 100644
--- a/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java
@@ -29,9 +29,9 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
  * <p><code>NodeResource</code> models a set of computer resources in the
  * cluster.</p>
  * <p/>
- * <p>Currently it models  <em>memory</em> and <em>disk</em> and <em>CPU</em>.</p>
+ * <p>Currently it models  <em>memory</em> and <em>CPU</em>.</p>
  * <p/>
- * <p>The unit for memory is megabytes. The unit for disks is the number of disk.
+ * <p>The unit for memory is megabytes.
  * CPU is modeled with virtual cores (vcores), a unit for expressing parallelism.
  * A node's capacity should be configured with virtual cores equal to its number of physical cores.
  * A task should be requested with the number of cores it can saturate.</p>
@@ -41,28 +41,23 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 public class NodeResource implements ProtoObject<TajoProtos.NodeResourceProto>, Comparable<NodeResource> {
 
   private volatile int memory;
-  private volatile int disks;
   private volatile int vCores;
 
   private static AtomicIntegerFieldUpdater MEMORY_UPDATER;
-  private static AtomicIntegerFieldUpdater DISKS_UPDATER;
   private static AtomicIntegerFieldUpdater VCORES_UPDATER;
 
   static {
     MEMORY_UPDATER = PlatformDependent.newAtomicIntegerFieldUpdater(NodeResource.class, "memory");
     if (MEMORY_UPDATER == null) {
       MEMORY_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NodeResource.class, "memory");
-      DISKS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NodeResource.class, "disks");
       VCORES_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NodeResource.class, "vCores");
     } else {
-      DISKS_UPDATER = PlatformDependent.newAtomicIntegerFieldUpdater(NodeResource.class, "disks");
       VCORES_UPDATER = PlatformDependent.newAtomicIntegerFieldUpdater(NodeResource.class, "vCores");
     }
   }
 
   public NodeResource(TajoProtos.NodeResourceProto proto) {
     setMemory(proto.getMemory());
-    setDisks(proto.getDisks());
     setVirtualCores(proto.getVirtualCores());
   }
 
@@ -70,8 +65,8 @@ public class NodeResource implements ProtoObject<TajoProtos.NodeResourceProto>,
 
   }
 
-  public static NodeResource createResource(int memory,  int disks, int vCores) {
-    return new NodeResource().setMemory(memory).setDisks(disks).setVirtualCores(vCores);
+  public static NodeResource createResource(int memory,  int vCores) {
+    return new NodeResource().setMemory(memory).setVirtualCores(vCores);
   }
 
   /**
@@ -94,27 +89,6 @@ public class NodeResource implements ProtoObject<TajoProtos.NodeResourceProto>,
     return this;
   }
 
-
-  /**
-   * Get <em>number of disks</em> of the resource.
-   *
-   * @return <em>number of disks</em> of the resource
-   */
-  public int getDisks() {
-    return disks;
-  }
-
-  /**
-   * Set <em>number of disks </em> of the resource.
-   *
-   * @param disks <em>number of disks</em> of the resource
-   */
-  @SuppressWarnings("unchecked")
-  public NodeResource setDisks(int disks) {
-    DISKS_UPDATER.lazySet(this, disks);
-    return this;
-  }
-
   /**
    * Get <em>number of virtual cpu cores</em> of the resource.
    * Virtual cores are a unit for expressing CPU parallelism. A node's capacity
@@ -141,15 +115,12 @@ public class NodeResource implements ProtoObject<TajoProtos.NodeResourceProto>,
   @Override
   public TajoProtos.NodeResourceProto getProto() {
     TajoProtos.NodeResourceProto.Builder builder = TajoProtos.NodeResourceProto.newBuilder();
-    builder.setMemory(memory)
-        .setDisks(disks)
-        .setVirtualCores(vCores);
-    return builder.build();
+    return builder.setMemory(memory).setVirtualCores(vCores).build();
   }
 
   @Override
   public int hashCode() {
-    return Objects.hashCode(getMemory(), getDisks(), getVirtualCores());
+    return Objects.hashCode(getMemory(), getVirtualCores());
   }
 
   @Override
@@ -161,20 +132,17 @@ public class NodeResource implements ProtoObject<TajoProtos.NodeResourceProto>,
     if (!(obj instanceof NodeResource))
       return false;
     NodeResource other = (NodeResource) obj;
-    if (getMemory() != other.getMemory() ||
-        getDisks() != other.getDisks() ||
-        getVirtualCores() != other.getVirtualCores()) {
-      return false;
+    if (getMemory() == other.getMemory() &&
+        getVirtualCores() == other.getVirtualCores()) {
+      return true;
     }
-    return true;
+    return false;
   }
 
   @Override
   public int compareTo(NodeResource other) {
     int diff = this.getMemory() - other.getMemory();
-    if (diff == 0) {
-      diff = this.getDisks() - other.getDisks();
-    }
+
     if (diff == 0) {
       diff = this.getVirtualCores() - other.getVirtualCores();
     }
@@ -183,6 +151,6 @@ public class NodeResource implements ProtoObject<TajoProtos.NodeResourceProto>,
 
   @Override
   public String toString() {
-    return "(Memory:" + getMemory() + ", Disks:" + getDisks() + ", vCores:" + getVirtualCores() + ")";
+    return "(Memory:" + getMemory() + ", vCores:" + getVirtualCores() + ")";
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-core/src/main/java/org/apache/tajo/resource/NodeResources.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/resource/NodeResources.java b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResources.java
index 01e9dcf..c30c755 100644
--- a/tajo-core/src/main/java/org/apache/tajo/resource/NodeResources.java
+++ b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResources.java
@@ -22,29 +22,24 @@ package org.apache.tajo.resource;
 public class NodeResources {
 
   public static NodeResource createResource(int memory) {
-    return createResource(memory, 0);
+    return NodeResource.createResource(memory, (memory > 0) ? 1 : 0);
   }
 
-  public static NodeResource createResource(int memory, int disks) {
-    return NodeResource.createResource(memory, disks, (memory > 0) ? 1 : 0);
-  }
-
-  public static NodeResource createResource(int memory, int disks, int vCores) {
-    return NodeResource.createResource(memory, disks, vCores);
+  public static NodeResource createResource(int memory, int vCores) {
+    return NodeResource.createResource(memory, vCores);
   }
 
   public static NodeResource clone(NodeResource res) {
-    return NodeResource.createResource(res.getMemory(), res.getDisks(), res.getVirtualCores());
+    return NodeResource.createResource(res.getMemory(), res.getVirtualCores());
   }
 
   public static NodeResource update(NodeResource lhs, NodeResource rhs) {
-    return lhs.setMemory(rhs.getMemory()).setDisks(rhs.getDisks()).setVirtualCores(rhs.getVirtualCores());
+    return lhs.setMemory(rhs.getMemory()).setVirtualCores(rhs.getVirtualCores());
   }
 
   public static NodeResource addTo(NodeResource lhs, NodeResource rhs) {
     lhs.setMemory(lhs.getMemory() + rhs.getMemory())
-        .setVirtualCores(lhs.getVirtualCores() + rhs.getVirtualCores())
-        .setDisks(lhs.getDisks() + rhs.getDisks());
+        .setVirtualCores(lhs.getVirtualCores() + rhs.getVirtualCores());
     return lhs;
   }
 
@@ -54,8 +49,7 @@ public class NodeResources {
 
   public static NodeResource subtractFrom(NodeResource lhs, NodeResource rhs) {
     lhs.setMemory(lhs.getMemory() - rhs.getMemory())
-        .setVirtualCores(lhs.getVirtualCores() - rhs.getVirtualCores())
-        .setDisks(lhs.getDisks() - rhs.getDisks());
+        .setVirtualCores(lhs.getVirtualCores() - rhs.getVirtualCores());
     return lhs;
   }
 
@@ -65,8 +59,7 @@ public class NodeResources {
 
   public static NodeResource multiplyTo(NodeResource lhs, double by) {
     lhs.setMemory((int) (lhs.getMemory() * by))
-        .setVirtualCores((int) (lhs.getVirtualCores() * by))
-        .setDisks((int) (lhs.getDisks() * by));
+        .setVirtualCores((int) (lhs.getVirtualCores() * by));
     return lhs;
   }
 
@@ -87,7 +80,6 @@ public class NodeResources {
   public static NodeResource multiplyAndRoundDown(NodeResource lhs, double by) {
     NodeResource out = clone(lhs);
     out.setMemory((int)(lhs.getMemory() * by));
-    out.setDisks((int)(lhs.getDisks() * by));
     out.setVirtualCores((int)(lhs.getVirtualCores() * by));
     return out;
   }
@@ -177,19 +169,16 @@ public class NodeResources {
 
   public static boolean fitsIn(NodeResource smaller, NodeResource bigger) {
     return smaller.getMemory() <= bigger.getMemory() &&
-        smaller.getDisks() <= bigger.getDisks() &&
         smaller.getVirtualCores() <= bigger.getVirtualCores();
   }
 
   public static NodeResource componentwiseMin(NodeResource lhs, NodeResource rhs) {
     return createResource(Math.min(lhs.getMemory(), rhs.getMemory()),
-        Math.min(lhs.getDisks(), rhs.getDisks()),
         Math.min(lhs.getVirtualCores(), rhs.getVirtualCores()));
   }
 
   public static NodeResource componentwiseMax(NodeResource lhs, NodeResource rhs) {
     return createResource(Math.max(lhs.getMemory(), rhs.getMemory()),
-        Math.max(lhs.getDisks(), rhs.getDisks()),
         Math.max(lhs.getVirtualCores(), rhs.getVirtualCores()));
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java
index 4f87dac..0125ff5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.worker;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -28,11 +29,12 @@ import org.apache.tajo.TajoConstants;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.resource.NodeResource;
 import org.apache.tajo.resource.NodeResources;
-import org.apache.tajo.storage.DiskUtil;
+import org.apache.tajo.storage.DataLocation;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.event.*;
 
+import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.tajo.ResourceProtos.*;
@@ -43,10 +45,12 @@ public class NodeResourceManager extends AbstractService implements EventHandler
   private final Dispatcher dispatcher;
   private final TajoWorker.WorkerContext workerContext;
   private final AtomicInteger runningQueryMasters = new AtomicInteger(0);
+  private final HashMap<Integer, AtomicInteger> volumeMap = Maps.newHashMap();
   private NodeResource totalResource;
   private NodeResource availableResource;
   private TajoConf tajoConf;
   private boolean enableTest;
+  private int diskParallels;
 
   public NodeResourceManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext) {
     super(NodeResourceManager.class.getName());
@@ -63,6 +67,7 @@ public class NodeResourceManager extends AbstractService implements EventHandler
     validateConf(tajoConf);
     this.enableTest = conf.get(TajoConstants.TEST_KEY, Boolean.FALSE.toString())
         .equalsIgnoreCase(Boolean.TRUE.toString());
+    this.diskParallels = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM);
     super.serviceInit(conf);
     LOG.info("Initialized NodeResourceManager for " + totalResource);
   }
@@ -77,10 +82,11 @@ public class NodeResourceManager extends AbstractService implements EventHandler
           NodeResourceAllocateEvent allocateEvent = TUtil.checkTypeAndGet(event, NodeResourceAllocateEvent.class);
           BatchAllocationResponse.Builder response = BatchAllocationResponse.newBuilder();
           for (TaskAllocationProto request : allocateEvent.getRequest().getTaskRequestList()) {
-            NodeResource resource = new NodeResource(request.getResource());
-            if (allocate(resource)) {
+            Allocation allocation = new Allocation(request);
+
+            if (allocate(allocation)) {
               //send task start event to TaskExecutor
-              startTask(request.getTaskRequest(), resource);
+              startTask(request.getTaskRequest(), allocation);
             } else {
               // reject the exceeded requests
               response.addCancellationTask(request);
@@ -92,8 +98,8 @@ public class NodeResourceManager extends AbstractService implements EventHandler
           QMResourceAllocateEvent allocateEvent = TUtil.checkTypeAndGet(event, QMResourceAllocateEvent.class);
           // allocate query master resource
 
-          NodeResource resource = new NodeResource(allocateEvent.getRequest().getResource());
-          if (allocate(resource)) {
+          Allocation allocation = new Allocation(new NodeResource(allocateEvent.getRequest().getResource()));
+          if (allocate(allocation)) {
             allocateEvent.getCallback().run(TajoWorker.TRUE_PROTO);
             runningQueryMasters.incrementAndGet();
           } else {
@@ -104,7 +110,7 @@ public class NodeResourceManager extends AbstractService implements EventHandler
       }
       case DEALLOCATE: {
         NodeResourceDeallocateEvent deallocateEvent = TUtil.checkTypeAndGet(event, NodeResourceDeallocateEvent.class);
-        release(deallocateEvent.getResource());
+        release(deallocateEvent.getAllocation());
 
         if (deallocateEvent.getResourceType() == NodeResourceEvent.ResourceType.QUERY_MASTER) {
           runningQueryMasters.decrementAndGet();
@@ -133,8 +139,29 @@ public class NodeResourceManager extends AbstractService implements EventHandler
     return runningQueryMasters.get();
   }
 
-  private boolean allocate(NodeResource resource) {
+  private boolean allocate(Allocation allocation) {
+
+    if (allocation.hasVolumeId() && allocation.getVolumeId() > DataLocation.UNKNOWN_VOLUME_ID) {
+      int volumeId = allocation.getVolumeId();
+
+      if (!volumeMap.containsKey(volumeId)) {
+        AtomicInteger load = new AtomicInteger();
+        volumeMap.put(volumeId, load);
+      }
+
+      //This load is measured by counting how many number of tasks are running.
+      if (volumeMap.get(volumeId).get() < diskParallels && allocateResource(allocation.getResource())) {
+        volumeMap.get(volumeId).incrementAndGet();
+        return true;
+      } else {
+        return false;
+      }
+    } else {
+      return allocateResource(allocation.getResource());
+    }
+  }
 
+  private boolean allocateResource(NodeResource resource) {
     if (NodeResources.fitsIn(resource, availableResource) && checkFreeHeapMemory(resource)) {
       NodeResources.subtractFrom(availableResource, resource);
       return true;
@@ -147,12 +174,17 @@ public class NodeResourceManager extends AbstractService implements EventHandler
     return true;
   }
 
-  protected void startTask(TaskRequestProto request, NodeResource resource) {
-    workerContext.getTaskManager().getDispatcher().getEventHandler().handle(new TaskStartEvent(request, resource));
+  @SuppressWarnings("unchecked")
+  protected void startTask(TaskRequestProto request, Allocation allocation) {
+    workerContext.getTaskManager().getDispatcher().getEventHandler().handle(new TaskStartEvent(request, allocation));
   }
 
-  private void release(NodeResource resource) {
-    NodeResources.addTo(availableResource, resource);
+  private void release(Allocation allocation) {
+    NodeResources.addTo(availableResource, allocation.getResource());
+
+    if (allocation.hasVolumeId() && allocation.getVolumeId() > DataLocation.UNKNOWN_VOLUME_ID) {
+      volumeMap.get(allocation.getVolumeId()).decrementAndGet();
+    }
   }
 
   private NodeResource createWorkerResource(TajoConf conf) {
@@ -168,15 +200,7 @@ public class NodeResourceManager extends AbstractService implements EventHandler
     }
 
     int vCores = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
-    int disks = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS);
-
-    int dataNodeStorageSize = DiskUtil.getDataNodeStorageSize();
-    if (conf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DFS_DIR_AWARE) && dataNodeStorageSize > 0) {
-      disks = dataNodeStorageSize;
-    }
-
-    int diskParallels = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM);
-    return NodeResource.createResource(memoryMb, disks * diskParallels, vCores);
+    return NodeResource.createResource(memoryMb, vCores);
   }
 
   private void validateConf(TajoConf conf) {
@@ -197,4 +221,40 @@ public class NodeResourceManager extends AbstractService implements EventHandler
           + ", max should be no smaller than min.");
     }
   }
+
+  public static class Allocation {
+    private NodeResource resource;
+    private int volumeId;
+
+    public Allocation(NodeResource resource) {
+      this(resource, DataLocation.UNSET_VOLUME_ID);
+    }
+
+    public Allocation(TaskAllocationProto taskAllocation) {
+      this(new NodeResource(taskAllocation.getResource()),
+          taskAllocation.hasVolumeId() ? taskAllocation.getVolumeId() : DataLocation.UNSET_VOLUME_ID);
+    }
+
+    public Allocation(NodeResource resource, int volumeId) {
+      this.volumeId = volumeId;
+      this.resource = resource;
+    }
+
+    public NodeResource getResource() {
+      return resource;
+    }
+
+    public int getVolumeId() {
+      return volumeId;
+    }
+
+    public boolean hasVolumeId() {
+      return volumeId != DataLocation.UNSET_VOLUME_ID;
+    }
+
+    @Override
+    public String toString() {
+      return "Resource: " + resource + (hasVolumeId() ? "VolumeId: " + volumeId : "");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
index 162e5e9..ab7e6ed 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
@@ -91,7 +91,7 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
   public void serviceStart() throws Exception {
     DefaultResourceCalculator calculator = new DefaultResourceCalculator();
     int maxContainer = calculator.computeAvailableContainers(workerContext.getNodeResourceManager().getTotalResource(),
-        NodeResources.createResource(systemConf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY), 1));
+        NodeResources.createResource(systemConf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY)));
 
     // if resource changed over than 30%, send reports
     float queueingRate = systemConf.getFloatVar(TajoConf.ConfVars.WORKER_HEARTBEAT_QUEUE_THRESHOLD_RATE);

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
index 7476580..d1271f8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
@@ -32,11 +32,11 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.query.TaskRequestImpl;
 import org.apache.tajo.exception.TajoInternalError;
-import org.apache.tajo.resource.NodeResource;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.event.NodeResourceDeallocateEvent;
 import org.apache.tajo.worker.event.NodeResourceEvent;
 import org.apache.tajo.worker.event.TaskStartEvent;
+import org.apache.tajo.worker.NodeResourceManager.Allocation;
 
 import java.io.IOException;
 import java.util.List;
@@ -54,7 +54,7 @@ public class TaskExecutor extends AbstractService implements EventHandler<TaskSt
   private static final Log LOG = LogFactory.getLog(TaskExecutor.class);
 
   private final TajoWorker.WorkerContext workerContext;
-  private final Map<TaskAttemptId, NodeResource> allocatedResourceMap;
+  private final Map<TaskAttemptId, Allocation> allocatedResourceMap;
   private final BlockingQueue<Task> taskQueue;
   private final AtomicInteger runningTasks;
   private List<ExecutorService> fetcherThreadPoolList;
@@ -136,19 +136,19 @@ public class TaskExecutor extends AbstractService implements EventHandler<TaskSt
 
   @SuppressWarnings("unchecked")
   protected void releaseResource(TaskAttemptId taskId) {
-    NodeResource resource =  allocatedResourceMap.remove(taskId);
+    Allocation allocation =  allocatedResourceMap.remove(taskId);
 
-    if(resource != null) {
-      releaseResource(resource);
+    if(allocation != null) {
+      releaseResource(allocation);
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Task resource " + taskId + " is released. (" + resource + ")");
+        LOG.debug("Task resource " + taskId + " is released. (" + allocation + ")");
       }
     }
   }
 
-  protected void releaseResource(NodeResource resource) {
+  protected void releaseResource(Allocation allocation) {
     workerContext.getNodeResourceManager().getDispatcher().getEventHandler().handle(
-        new NodeResourceDeallocateEvent(resource, NodeResourceEvent.ResourceType.TASK));
+        new NodeResourceDeallocateEvent(allocation, NodeResourceEvent.ResourceType.TASK));
   }
 
   protected Task createTask(ExecutionBlockContext executionBlockContext,
@@ -169,22 +169,22 @@ public class TaskExecutor extends AbstractService implements EventHandler<TaskSt
   @Override
   public void handle(TaskStartEvent event) {
 
-    allocatedResourceMap.put(event.getTaskAttemptId(), event.getAllocatedResource());
+    allocatedResourceMap.put(event.getTaskAttemptId(), event.getAllocation());
 
     ExecutionBlockContext context = workerContext.getTaskManager().getExecutionBlockContext(
         event.getTaskAttemptId().getTaskId().getExecutionBlockId());
 
     try {
+      runningTasks.incrementAndGet();
       Task task = createTask(context, event.getTaskRequest());
       if (task != null) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Arrival task: " + task.getTaskContext().getTaskId() +
-              ", allocated resource: " + event.getAllocatedResource());
+              ", allocated resource: " + event.getAllocation());
         }
         taskQueue.put(task);
-        runningTasks.incrementAndGet();
       } else {
-        LOG.warn("Release duplicate task resource: " + event.getAllocatedResource());
+        LOG.warn("Release duplicate task resource: " + event.getAllocation());
         stopTask(event.getTaskAttemptId());
       }
     } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
index 9e2e9e8..61174b3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
@@ -184,7 +184,7 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan
           }
         } catch (Throwable e) {
           LOG.fatal(e.getMessage(), e);
-          getTaskExecutor().releaseResource(taskStartEvent.getAllocatedResource());
+          getTaskExecutor().releaseResource(taskStartEvent.getAllocation());
           getWorkerContext().getTaskManager().getDispatcher().getEventHandler()
               .handle(new ExecutionBlockErrorEvent(taskStartEvent.getExecutionBlockId(), e));
           break;

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
index d8841a2..fe6d978 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
@@ -18,23 +18,24 @@
 
 package org.apache.tajo.worker.event;
 
-import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.NodeResourceProto;
 import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.worker.NodeResourceManager.Allocation;
 
 public class NodeResourceDeallocateEvent extends NodeResourceEvent {
 
-  private NodeResource resource;
+  private Allocation allocation;
 
-  public NodeResourceDeallocateEvent(TajoProtos.NodeResourceProto proto, ResourceType resourceType) {
-    this(new NodeResource(proto), resourceType);
+  public NodeResourceDeallocateEvent(NodeResourceProto proto, ResourceType resourceType) {
+    this(new Allocation(new NodeResource(proto)), resourceType);
   }
 
-  public NodeResourceDeallocateEvent(NodeResource resource, ResourceType resourceType) {
+  public NodeResourceDeallocateEvent(Allocation resource, ResourceType resourceType) {
     super(EventType.DEALLOCATE, resourceType);
-    this.resource = resource;
+    this.allocation = resource;
   }
 
-  public NodeResource getResource() {
-    return resource;
+  public Allocation getAllocation() {
+    return allocation;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java
index 1fb0c49..f4667bd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java
@@ -20,26 +20,26 @@ package org.apache.tajo.worker.event;
 
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.worker.NodeResourceManager.Allocation;
 
 import static org.apache.tajo.ResourceProtos.TaskRequestProto;
 
 public class TaskStartEvent extends TaskManagerEvent {
 
-  private NodeResource allocatedResource;
+  private Allocation allocation;
   private TaskRequestProto taskRequest;
   private TaskAttemptId taskAttemptId;
 
   public TaskStartEvent(TaskRequestProto taskRequest,
-                        NodeResource allocatedResource) {
+                        Allocation allocation) {
     super(EventType.TASK_START);
     this.taskRequest = taskRequest;
-    this.allocatedResource = allocatedResource;
+    this.allocation = allocation;
     this.taskAttemptId = new TaskAttemptId(taskRequest.getId());
   }
 
-  public NodeResource getAllocatedResource() {
-    return allocatedResource;
+  public Allocation getAllocation() {
+    return allocation;
   }
 
   public TaskRequestProto getTaskRequest() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerResponse.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerResponse.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerResponse.java
index d4c62e4..3beb9db 100644
--- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerResponse.java
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerResponse.java
@@ -52,8 +52,6 @@ public class WorkerResponse {
   private WorkerResponse(NodeResource total, NodeResource available, int numRunningTasks, int numQueryMasterTasks) {
     this.cpuCoreSlots = total.getVirtualCores();
     this.memoryMB = total.getMemory();
-    this.diskSlots = total.getDisks();
-    this.usedDiskSlots = available.getDisks();
     this.usedMemoryMB = available.getMemory();
     this.usedCpuCoreSlots = available.getVirtualCores();
     this.numRunningTasks = numRunningTasks;

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-core/src/main/proto/ResourceProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/ResourceProtos.proto b/tajo-core/src/main/proto/ResourceProtos.proto
index 74a475e..a920c4f 100644
--- a/tajo-core/src/main/proto/ResourceProtos.proto
+++ b/tajo-core/src/main/proto/ResourceProtos.proto
@@ -58,6 +58,7 @@ message ExecutionBlockListProto {
 message TaskAllocationProto {
   required TaskRequestProto task_request = 1;
   required NodeResourceProto resource = 2;
+  optional int32 volumeId = 3;
 }
 
 message TaskRequestProto {

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
index d5e9a9f..6400cde 100644
--- a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
@@ -173,7 +173,9 @@
   <table border="1" width="100%" class="border_table">
     <tr><td align='right' width='180px'>Status:</td><td><%=stage.getState()%></td></tr>
     <tr><td align='right'>Started:</td><td><%=df.format(stage.getStartTime())%> ~ <%=stage.getFinishTime() == 0 ? "-" : df.format(stage.getFinishTime())%></td></tr>
-    <tr><td align='right'># Tasks:</td><td><%=numTasks%> (Local Tasks: <%=stage.getTaskScheduler().getHostLocalAssigned()%>, Rack Local Tasks: <%=stage.getTaskScheduler().getRackLocalAssigned()%>)</td></tr>
+    <tr><td align='right'># Tasks:</td><td><%=numTasks%> (Local Tasks: <%=stage.getTaskScheduler().getHostLocalAssigned()%>,
+      Rack Local Tasks: <%=stage.getTaskScheduler().getRackLocalAssigned()%>,
+      Canceled Attempt: <%=stage.getTaskScheduler().getCancellation() %>)</td></tr>
     <tr><td align='right'>Progress:</td><td><%=JSPUtil.percentFormat((float) (totalProgress / numTasks))%>%</td></tr>
     <tr><td align='right'># Shuffles:</td><td><%=numShuffles%></td></tr>
     <tr><td align='right'>Input Bytes:</td><td><%=FileUtil.humanReadableByteCount(totalInputBytes, false) + " (" + nf.format(totalInputBytes) + " B)"%></td></tr>

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-dist/src/main/conf/tajo-site.xml.template
----------------------------------------------------------------------
diff --git a/tajo-dist/src/main/conf/tajo-site.xml.template b/tajo-dist/src/main/conf/tajo-site.xml.template
index 4982cb2..026e6ff 100644
--- a/tajo-dist/src/main/conf/tajo-site.xml.template
+++ b/tajo-dist/src/main/conf/tajo-site.xml.template
@@ -71,9 +71,9 @@
 </property>
 
 <property>
-  <name>tajo.worker.resource.disks</name>
+  <name>tajo.worker.resource.disk.parallel-execution.num</name>
   <value>2</value>
-  <description>Available disk capacity (usually number of disks)</description>
+  <description>Number of scan concurrency per disk</description>
 </property>
 
 <property>

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-docs/src/main/sphinx/configuration/worker_configuration.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/configuration/worker_configuration.rst b/tajo-docs/src/main/sphinx/configuration/worker_configuration.rst
index 7a49e9c..36d248f 100644
--- a/tajo-docs/src/main/sphinx/configuration/worker_configuration.rst
+++ b/tajo-docs/src/main/sphinx/configuration/worker_configuration.rst
@@ -44,7 +44,7 @@ Worker Resources
 
 Each worker can execute multiple tasks simultaneously.
 
-In Tajo, users can specify the number of cpu cores, the total size of memory and the number of disks for each worker. Available resources affect how many tasks are executed simultaneously.
+In Tajo, users can specify the number of cpu cores, the total size of memory for each worker. Available resources affect how many tasks are executed simultaneously.
 CPU cores are a unit for expressing CPU parallelism, the unit for memory is megabytes and the unit for disks is the number of disk
 
 In order to specify the resource capacity of each worker, you should add the following configs to ``tajo-site.xml`` :
@@ -54,7 +54,6 @@ In order to specify the resource capacity of each worker, you should add the fol
 ===================================  =============   ======================   =================================
   tajo.worker.resource.cpu-cores       Integer         available cpu-cores      the number of cpu cores
   tajo.worker.resource.memory-mb       Integer         available jvm heap       memory size (MB)
-  tajo.worker.resource.disks           Integer         2                        the number of disks
   tajo.task.resource.min.memory-mb     Integer         1000                     minimum allocatable memory per task
   tajo.qm.resource.min.memory-mb       Integer         500                      minimum allocatable memory per query
 ===================================  =============   ======================   =================================
@@ -66,14 +65,14 @@ In order to specify the resource capacity of each worker, you should add the fol
 
 .. note::
 
-  If ``tajo.worker.resource.dfs-dir-aware`` is set to ``true`` in ``tajo-site.xml``, the worker will aware of and use the number of HDFS datanode's data dirs in the node.
-  In other words, ``tajo.worker.resource.disks`` is ignored.
+  If ``dfs.datanode.hdfs-blocks-metadata.enabled`` is set to ``true`` in ``hdfs-site.xml``, Tajo worker will do better task scheduling by considering disks load.
+  The config ``tajo.worker.resource.disk.parallel-execution.num`` determines the number of scan concurrency per disk on HDFS datanode. Usually SATA DISK case, we recommend ``2`` per disk.
 
 ------------
- Example
+ Examples
 ------------
 
-Assume that you want to give 15GB Jvm heap, 2GB memory per task, 4 disks, and 12 cores on each worker. The example configuration is as follows:
+Assume that you want to give 15GB Jvm heap, 2GB memory per task, 2 SSD disks, and 12 cores on each worker. The example configuration is as follows:
 
 ``tajo-env.sh``
 
@@ -95,20 +94,20 @@ Assume that you want to give 15GB Jvm heap, 2GB memory per task, 4 disks, and 12
     <name>tajo.task.resource.min.memory-mb</name>
     <value>2000</value>
   </property>
-  
+
   <property>
-    <name>tajo.worker.resource.disks</name>
-    <value>4</value>
+    <name>tajo.worker.resource.disk.parallel-execution.num</name>
+    <value>6</value>
   </property>
 
-
 * Example with HDFS
+Assume that you want to give 64GB Jvm heap, 4GB memory per task, 12 SAS disks, and 24 cores on each worker with HDFS. The example configuration is as follows:
 
 ``tajo-env.sh``
 
 .. code-block:: bash
 
-  export TAJO_WORKER_HEAPSIZE=15000
+  export TAJO_WORKER_HEAPSIZE=64000
 
 
 ``tajo-site.xml``
@@ -117,11 +116,20 @@ Assume that you want to give 15GB Jvm heap, 2GB memory per task, 4 disks, and 12
 
    <property>
     <name>tajo.task.resource.min.memory-mb</name>
-    <value>2000</value>
+    <value>4000</value>
   </property>
 
   <property>
-    <name>tajo.worker.resource.dfs-dir-aware</name>
+    <name>tajo.worker.resource.disk.parallel-execution.num</name>
+    <value>2</value>
+  </property>
+
+``hdfs-site.xml``
+
+.. code-block:: xml
+
+  <property>
+    <name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
     <value>true</value>
   </property>
 
@@ -142,9 +150,4 @@ Assume that you want to give 15GB Jvm heap, 2GB memory per task, 4 disks, and 12
    <property>
     <name>tajo.task.resource.min.memory-mb</name>
     <value>2000</value>
-  </property>
-
-  <property>
-    <name>tajo.worker.resource.disk.parallel-execution.num</name>
-    <value>4</value>
   </property>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/71d2825c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DataLocation.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DataLocation.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DataLocation.java
index 8841a31..263a480 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DataLocation.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DataLocation.java
@@ -19,6 +19,10 @@
 package org.apache.tajo.storage;
 
 public class DataLocation {
+  public static final int UNSET_VOLUME_ID = Integer.MIN_VALUE;
+  public static final int UNKNOWN_VOLUME_ID = -1;
+  public static final int REMOTE_VOLUME_ID = -2;
+
   private String host;
   private int volumeId;
 
@@ -31,6 +35,15 @@ public class DataLocation {
     return host;
   }
 
+  /**
+   * <h3>Volume id</h3>
+   * Volume id is an integer. Each volume id identifies each disk volume.
+   *
+   * This volume id can be obtained from org.apache.hadoop.fs.BlockStorageLocation#getVolumeIds()}.
+   * HDFS cannot give any volume id due to unknown reason
+   * and disabled config 'dfs.client.file-block-locations.enabled'.
+   * In this case, the volume id will be -1 or other native integer.
+   */
   public int getVolumeId() {
     return volumeId;
   }


Mime
View raw message