tajo-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jinho kim <jh...@gruter.com>
Subject Re: Question about disk-aware scheduling in tajo
Date Thu, 13 Feb 2014 14:46:49 GMT
Hi Min,

Here are my comments
If you have any questions, please let me know

Thanks.


> Hi Jihoon,
> 
> Thank you for you answer. However, seem you didn't answer that how tajo use
> disk information to balance the io overhead.
> 
> And still can't understand the details,  quite complex to me, especially
> the class TaskBlockLocation
> 
> 
> public static class TaskBlockLocation {
>    // This is a mapping from diskId to a list of pending task, right?
>    private HashMap<Integer, LinkedList<QueryUnitAttemptId>>
> unAssignedTaskMap =
Yes, this is a list of pending task of a host. (initial time)

hostA(TaskBlockLocation) ------- Disk1 --------Task1(HDFS replecation-1), 2
                                                     |
                                                 Disk2 --------Task3,4


hostB -------------------------- Disk1 --------Task1(HDFS replecation-2), 3
                                                     |
                                                 Disk2 --------Task2,4


>        new HashMap<Integer, LinkedList<QueryUnitAttemptId>>();
>   // How can I return a Task to the container according to the diskId?
>    private HashMap<ContainerId, Integer> assignedContainerMap = new
> HashMap<ContainerId, Integer>();

When available container(unique per host) are request a task, a container assign lowest concurrency
disk.(runtime)

>    private TreeMap<Integer, Integer> volumeUsageMap = new TreeMap<Integer,
> Integer>();
>    private String host;
> 
>    public TaskBlockLocation(String host){
>      this.host = host;
>    }
> 
>    public void addQueryUnitAttemptId(Integer volumeId, QueryUnitAttemptId
> attemptId){
>      LinkedList<QueryUnitAttemptId> list = unAssignedTaskMap.get(volumeId);
>      if (list == null) {
>        list = new LinkedList<QueryUnitAttemptId>();
>        unAssignedTaskMap.put(volumeId, list);
>      }
>      list.add(attemptId);
> 
>      if(!volumeUsageMap.containsKey(volumeId))
> volumeUsageMap.put(volumeId, 0);
>    }
> 
>    public LinkedList<QueryUnitAttemptId>
> getQueryUnitAttemptIdList(ContainerId containerId){
>      Integer volumeId;
> 
>      if (!assignedContainerMap.containsKey(containerId)) {
>        // assign a new container to a volume with the lowest concurrency,
> right?
>        volumeId = assignVolumeId();

Yes, it is

>        assignedContainerMap.put(containerId, volumeId);
>      } else {
>        volumeId = assignedContainerMap.get(containerId);
>      }
> 
>      LinkedList<QueryUnitAttemptId> list = null;
>      if (unAssignedTaskMap.size() >  0) {
>        int retry = unAssignedTaskMap.size();
>        do {
>          list = unAssignedTaskMap.get(volumeId);
>          if (list == null || list.size() == 0) {
>            //clean and reassign remaining volume
>            unAssignedTaskMap.remove(volumeId);
>            volumeUsageMap.remove(volumeId);
>            if (volumeId < 0) break; //  processed all block on disk
> 
>            // WHY THIS LINE ASSIGN A VOLUMEID AGAIN?
>            volumeId = assignVolumeId();
>            // WHY THIS LINE PUT AGAIN?
>            // if the container is a new container, does it put twice??
>            assignedContainerMap.put(containerId, volumeId);

Case 1 :
	disks greater than containers. 
        ex)    host ------- container1 ---------- disk1 ---- tasks
                                                                      |
                                                                   disk2 ---- tasks
 
Case 2 :
	This is unknown disk(-1) for the remote task. because all local block is processed on disk
	if container is not assigned a remote task, end of all tasks will be deadlock
        ex)
		hostA ------- container1 ---------- disk1( new assign -1) ---- remote pending tasks(zero
local pending task)
                                        |                             
				    container2 ---------- disk2 ---- tasks


		hostB ------- container1 ---------- disk1 ---- tasks (will decreased pending task)

--Jinho

2014. 2. 13., 오후 5:29, Min Zhou 작성:

> Hi Jihoon,
> 
> Thank you for you answer. However, seem you didn't answer that how tajo use
> disk information to balance the io overhead.
> 
> And still can't understand the details,  quite complex to me, especially
> the class TaskBlockLocation
> 
> 
> public static class TaskBlockLocation {
>    // This is a mapping from diskId to a list of pending task, right?
>    private HashMap<Integer, LinkedList<QueryUnitAttemptId>>
> unAssignedTaskMap =
>        new HashMap<Integer, LinkedList<QueryUnitAttemptId>>();
>   // How can I return a Task to the container according to the diskId?
>    private HashMap<ContainerId, Integer> assignedContainerMap = new
> HashMap<ContainerId, Integer>();
>    private TreeMap<Integer, Integer> volumeUsageMap = new TreeMap<Integer,
> Integer>();
>    private String host;
> 
>    public TaskBlockLocation(String host){
>      this.host = host;
>    }
> 
>    public void addQueryUnitAttemptId(Integer volumeId, QueryUnitAttemptId
> attemptId){
>      LinkedList<QueryUnitAttemptId> list = unAssignedTaskMap.get(volumeId);
>      if (list == null) {
>        list = new LinkedList<QueryUnitAttemptId>();
>        unAssignedTaskMap.put(volumeId, list);
>      }
>      list.add(attemptId);
> 
>      if(!volumeUsageMap.containsKey(volumeId))
> volumeUsageMap.put(volumeId, 0);
>    }
> 
>    public LinkedList<QueryUnitAttemptId>
> getQueryUnitAttemptIdList(ContainerId containerId){
>      Integer volumeId;
> 
>      if (!assignedContainerMap.containsKey(containerId)) {
>        // assign a new container to a volume with the lowest concurrency,
> right?
>        volumeId = assignVolumeId();
>        assignedContainerMap.put(containerId, volumeId);
>      } else {
>        volumeId = assignedContainerMap.get(containerId);
>      }
> 
>      LinkedList<QueryUnitAttemptId> list = null;
>      if (unAssignedTaskMap.size() >  0) {
>        int retry = unAssignedTaskMap.size();
>        do {
>          list = unAssignedTaskMap.get(volumeId);
>          if (list == null || list.size() == 0) {
>            //clean and reassign remaining volume
>            unAssignedTaskMap.remove(volumeId);
>            volumeUsageMap.remove(volumeId);
>            if (volumeId < 0) break; //  processed all block on disk
> 
>            // WHY THIS LINE ASSIGN A VOLUMEID AGAIN?
>            volumeId = assignVolumeId();
>            // WHY THIS LINE PUT AGAIN?
>            // if the container is a new container, does it put twice??
>            assignedContainerMap.put(containerId, volumeId);
>            retry--;
>          } else {
>            break;
>          }
>        } while (retry > 0);
>      }
>      return list;
>    }
> 
>    public Integer assignVolumeId(){
>      Map.Entry<Integer, Integer> volumeEntry = null;
> 
>      // choose a volume with the lowest concurrency, right?
>      for (Map.Entry<Integer, Integer> entry : volumeUsageMap.entrySet()) {
>        if(volumeEntry == null) volumeEntry = entry;
> 
>        if (volumeEntry.getValue() >= entry.getValue()) {
>          volumeEntry = entry;
>        }
>      }
> 
>      if(volumeEntry != null){
>        volumeUsageMap.put(volumeEntry.getKey(), volumeEntry.getValue() +
> 1);
>        LOG.info("Assigned host : " + host + " Volume : " +
> volumeEntry.getKey() + ", Concurrency : "
>            + volumeUsageMap.get(volumeEntry.getKey()));
>        return volumeEntry.getKey();
>      } else {
>         return -1;  // processed all block on disk
>      }
>    }
> 
>    public String getHost() {
>      return host;
>    }
>  }
> 
> This class maintains a mapping (assignedContainerMap) from containerId to
> the assigned diskId, How can I retrieve a task based on the diskId to the
> container?
> 
> 
> Thanks,
> Min
> 
> 
> On Wed, Feb 12, 2014 at 10:17 PM, Jihoon Son <jihoonson@apache.org> wrote:
> 
>> Hi, Min.
>> 
>> In DefaultTaskScheduler, each container is mapped to each disk of all nodes
>> in a cluster. When a container requests a task, DefaultTaskScheduler
>> selects a closest task and assigns it to the container. This process works
>> for only the local reads. The disk volume information is not considered for
>> remote reads.
>> 
>> In my opinion, this is enough for us because there are few remote tasks in
>> each sub query. From a test on an in-house cluster composed of 32 nodes,
>> the ratio of remote tasks to whole tasks was only about 0.17% (The query
>> was 'select l_orderkey from lineitem', and the volume of the lineitem table
>> was about 1TB.). Since the number of tasks was very small, there were small
>> disk contentions.
>> 
>> Hope that answers your questions.
>> Thanks,
>> Jihoon
>> 
>> 2014-02-13 11:00 GMT+09:00 Min Zhou <coderplay@gmail.com>:
>> 
>>> Hi all,
>>> 
>>> Tajo leverages the feature supported by HDFS-3672, which exposes the disk
>>> volume id of each hdfs data block.  I already found the related code in
>>> DefaultTaskScheduler.assignToLeafTasks,  can anyone explain the logic for
>>> me?  What the scheduler do when the hdfs read is a remote read on the
>>> other
>>> machine's disk?
>>> 
>>> 
>>> Thanks,
>>> Min
>>> --
>>> My research interests are distributed systems, parallel computing and
>>> bytecode based virtual machine.
>>> 
>>> My profile:
>>> http://www.linkedin.com/in/coderplay
>>> My blog:
>>> http://coderplay.javaeye.com
>>> 
>> 
> 
> 
> 
> -- 
> My research interests are distributed systems, parallel computing and
> bytecode based virtual machine.
> 
> My profile:
> http://www.linkedin.com/in/coderplay
> My blog:
> http://coderplay.javaeye.com



Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message