tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject git commit: TAJO-743: Change the default resource allocation policy of leaf tasks. (jinho)
Date Thu, 10 Apr 2014 06:06:02 GMT
Repository: tajo
Updated Branches:
  refs/heads/branch-0.8.0 70aca839c -> 6824aab67


TAJO-743: Change the default resource allocation policy of leaf tasks. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/6824aab6
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/6824aab6
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/6824aab6

Branch: refs/heads/branch-0.8.0
Commit: 6824aab67f1266983810eb3ed4d12102d7aa0b87
Parents: 70aca83
Author: jinossy <jinossy@gmail.com>
Authored: Thu Apr 10 15:05:11 2014 +0900
Committer: jinossy <jinossy@gmail.com>
Committed: Thu Apr 10 15:05:11 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../java/org/apache/tajo/conf/TajoConf.java     |  3 +-
 .../tajo/worker/TajoResourceAllocator.java      |  4 ++-
 .../java/org/apache/tajo/worker/TajoWorker.java |  9 ++++++
 .../org/apache/tajo/storage/v2/DiskUtil.java    | 32 ++++++++++++--------
 5 files changed, 36 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/6824aab6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ea82b7d..f2ff384 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -143,6 +143,8 @@ Release 0.8.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-743: Change the default resource allocation policy of leaf tasks. (jinho)
+
     TAJO-717: Improve file splitting for large number of splits. (jinho)
 
     TAJO-356: Improve TajoClient to directly get query results in the first request.

http://git-wip-us.apache.org/repos/asf/tajo/blob/6824aab6/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index e0f62f7..ce671ef 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -101,6 +101,7 @@ public class TajoConf extends Configuration {
     WORKER_RESOURCE_AVAILABLE_MEMORY_MB("tajo.worker.resource.memory-mb", 1024),
     WORKER_RESOURCE_AVAILABLE_DISKS("tajo.worker.resource.disks", 1.0f),
     WORKER_EXECUTION_MAX_SLOTS("tajo.worker.parallel-execution.max-num", 2),
+    WORKER_RESOURCE_DFS_DIR_AWARE("tajo.worker.resource.dfs-dir-aware", false),
 
     // Tajo Worker Dedicated Resources
     WORKER_RESOURCE_DEDICATED("tajo.worker.resource.dedicated", false),
@@ -227,7 +228,7 @@ public class TajoConf extends Configuration {
     //////////////////////////////////
     // Task Configuration
     TASK_DEFAULT_MEMORY("tajo.task.memory-slot-mb.default", 512),
-    TASK_DEFAULT_DISK("tajo.task.disk-slot.default", 1.0f),
+    TASK_DEFAULT_DISK("tajo.task.disk-slot.default", 0.5f),
     TASK_DEFAULT_SIZE("tajo.task.size-mb", 128),
     //////////////////////////////////
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/6824aab6/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index c082de1..81a9ed5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -92,6 +92,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
                                            int numTasks,
                                            int memoryMBPerTask) {
     //TODO consider disk slot
+
     TajoMasterProtocol.ClusterResourceSummary clusterResource = workerContext.getClusterResource();
     int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB() /
memoryMBPerTask;
     clusterSlots =  Math.max(1, clusterSlots - 1); // reserve query master slot
@@ -225,7 +226,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
               .setMinMemoryMBPerContainer(requiredMemoryMB)
               .setMaxMemoryMBPerContainer(requiredMemoryMB)
               .setNumContainers(event.getRequiredNum())
-              .setResourceRequestPriority(TajoMasterProtocol.ResourceRequestPriority.MEMORY)
+              .setResourceRequestPriority(!event.isLeafQuery() ? TajoMasterProtocol.ResourceRequestPriority.MEMORY
+                  : TajoMasterProtocol.ResourceRequestPriority.DISK)
               .setMinDiskSlotPerContainer(requiredDiskSlots)
               .setMaxDiskSlotPerContainer(requiredDiskSlots)
               .setExecutionBlockId(event.getExecutionBlockId().getProto())

http://git-wip-us.apache.org/repos/asf/tajo/blob/6824aab6/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 2f763e3..65f9866 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -126,6 +126,11 @@ public class TajoWorker extends CompositeService {
 
   private TajoSystemMetrics workerSystemMetrics;
 
+  private static final float HDFS_DATANODE_STORAGE_SIZE;
+
+  static {
+    HDFS_DATANODE_STORAGE_SIZE = DiskUtil.getDataNodeStorageSize();
+  }
   public TajoWorker() throws Exception {
     super(TajoWorker.class.getName());
   }
@@ -525,6 +530,10 @@ public class TajoWorker extends CompositeService {
         workerMemoryMB = systemConf.getIntVar(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB);
         workerCpuCoreNum = systemConf.getIntVar(ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
         workerDiskSlots = systemConf.getFloatVar(ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS);
+
+        if(systemConf.getBoolVar(ConfVars.WORKER_RESOURCE_DFS_DIR_AWARE) && HDFS_DATANODE_STORAGE_SIZE
> 0){
+          workerDiskSlots = HDFS_DATANODE_STORAGE_SIZE;
+        }
       }
 
       systemInfo = TajoMasterProtocol.ServerStatusProto.System.newBuilder()

http://git-wip-us.apache.org/repos/asf/tajo/blob/6824aab6/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
index bb90c39..d5873bb 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
@@ -18,17 +18,15 @@
 
 package org.apache.tajo.storage.v2;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.common.Util;
+
+import java.io.*;
+import java.net.URI;
+import java.util.*;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
 
 public class DiskUtil {
 
@@ -189,7 +187,17 @@ public class DiskUtil {
 			}
 		}
 	}
-	
+
+  public static int getDataNodeStorageSize(){
+    return getStorageDirs().size();
+  }
+
+  public static List<URI> getStorageDirs(){
+    Configuration conf = new HdfsConfiguration();
+    Collection<String> dirNames = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
+    return Util.stringCollectionAsURIs(dirNames);
+  }
+
 	public static void main(String[] args) throws Exception {
 		System.out.println("/dev/sde1".split("/").length);
 		for(String eachToken: "/dev/sde1".split("/")) {


Mime
View raw message