tajo-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Henry Saputra <henry.sapu...@gmail.com>
Subject Re: Question about disk-aware scheduling in tajo
Date Wed, 19 Feb 2014 01:44:48 GMT
Hyunsik,

I am +1 this is worthy of a wiki page or separate page to explain the
technical flow. Tracing the flow in the code is confusing.

Maybe similar to the doc like in YARN [1] page (the more details the better ^_^)


- Henry

[1] http://hadoop.apache.org/docs/current2/hadoop-yarn/hadoop-yarn-site/YARN.html

On Thu, Feb 13, 2014 at 2:18 AM, Hyunsik Choi <hyunsik@apache.org> wrote:
> Hi Min,
>
> Above all, I'm very sorry for lack of documentations on the source code. So
> far, we have developed Tajo with insufficient documentations by only
> pursuiting a quick and dirty manner. We should fill more documentations on
> the source code.
>
> I'm going to explain how Tajo uses disk volume locality. Before this
> explanation, I would like to explain the node locality that you may already
> know. Similar to MR, Tajo also uses three level locality for each task. For
> each task, the scheduler finds local node, closest rack, and random node
> sequentially. In Tajo, the scheduler additionally finds the local volume
> prior to finding the local node.
>
> The important thing is that we don't need to aware of actual disk volume
> IDs in each local node, and we just assigne disk volumes to TaskRunners in
> a node in a round robin manner. It would be sufficient to improve the load
> balancing by considering disk volume.
>
> Initially, TaskRunners are not mapped to disk volumes in each worker. The
> mapping occurs dynamically in the scheduler. For example, there are 6 local
> tasks for some node N1 which has 3 disk volumes (v1, v2, and v3). Also,
> three TaskRunners (T1, T2, and T3) will be running on the node N1.
>
> When tasks are added to the scheduler, the scheduler gets the disk volume
> id from each task. As you know, each volume is just an integer which is
> just a logical identifier for just distinguishing different disk volumes.
> Then, the scheduler builds a map between disk volume ids (obtained from
> BlockStorageLocation) in each node and a list of tasks
> (DefaultTaskScheduler::addQueryUnitAttemptId). In other words, each entry
> in the map consists of one disk volume id and a list of tasks corresponding
> to the disk volume.
>
> When the first task is requested from a TaskRunner T1 in node N1, the
> scheduler just assignes the first disk volume v1 to T1, and then it
> schedules one task which belongs to the disk volume v1. Later, a task is
> requested from a different TaskRunner T2 from node N1, the schedules
> assignes the second disk volume v2 to T2, and then it schedules a task
> which belongs to the disk volume v2.  Also, a task request is given from T1
> again, the scheduler schedules one task in the disk volume v1 to T1 because
> T1 is already mapped to v1.
>
> Like MR, Tajo uses a dynamic scheduling, and it works very well in the
> environments where each node has different performance disks. If you have
> additional question, please feel free to ask.
>
> Also, I'll create a Jira issue to add this explain to DefaultTaskScheduler.
>
> - hyunsik
>
>
>
> On Thu, Feb 13, 2014 at 5:29 PM, Min Zhou <coderplay@gmail.com> wrote:
>
>> Hi Jihoon,
>>
>> Thank you for you answer. However, seem you didn't answer that how tajo use
>> disk information to balance the io overhead.
>>
>> And still can't understand the details,  quite complex to me, especially
>> the class TaskBlockLocation
>>
>>
>> public static class TaskBlockLocation {
>>     // This is a mapping from diskId to a list of pending task, right?
>>     private HashMap<Integer, LinkedList<QueryUnitAttemptId>>
>> unAssignedTaskMap =
>>         new HashMap<Integer, LinkedList<QueryUnitAttemptId>>();
>>    // How can I return a Task to the container according to the diskId?
>>     private HashMap<ContainerId, Integer> assignedContainerMap = new
>> HashMap<ContainerId, Integer>();
>>     private TreeMap<Integer, Integer> volumeUsageMap = new TreeMap<Integer,
>> Integer>();
>>     private String host;
>>
>>     public TaskBlockLocation(String host){
>>       this.host = host;
>>     }
>>
>>     public void addQueryUnitAttemptId(Integer volumeId, QueryUnitAttemptId
>> attemptId){
>>       LinkedList<QueryUnitAttemptId> list =
>> unAssignedTaskMap.get(volumeId);
>>       if (list == null) {
>>         list = new LinkedList<QueryUnitAttemptId>();
>>         unAssignedTaskMap.put(volumeId, list);
>>       }
>>       list.add(attemptId);
>>
>>       if(!volumeUsageMap.containsKey(volumeId))
>> volumeUsageMap.put(volumeId, 0);
>>     }
>>
>>     public LinkedList<QueryUnitAttemptId>
>> getQueryUnitAttemptIdList(ContainerId containerId){
>>       Integer volumeId;
>>
>>       if (!assignedContainerMap.containsKey(containerId)) {
>>         // assign a new container to a volume with the lowest concurrency,
>> right?
>>         volumeId = assignVolumeId();
>>         assignedContainerMap.put(containerId, volumeId);
>>       } else {
>>         volumeId = assignedContainerMap.get(containerId);
>>       }
>>
>>       LinkedList<QueryUnitAttemptId> list = null;
>>       if (unAssignedTaskMap.size() >  0) {
>>         int retry = unAssignedTaskMap.size();
>>         do {
>>           list = unAssignedTaskMap.get(volumeId);
>>           if (list == null || list.size() == 0) {
>>             //clean and reassign remaining volume
>>             unAssignedTaskMap.remove(volumeId);
>>             volumeUsageMap.remove(volumeId);
>>             if (volumeId < 0) break; //  processed all block on disk
>>
>>             // WHY THIS LINE ASSIGN A VOLUMEID AGAIN?
>>             volumeId = assignVolumeId();
>>             // WHY THIS LINE PUT AGAIN?
>>             // if the container is a new container, does it put twice??
>>             assignedContainerMap.put(containerId, volumeId);
>>             retry--;
>>           } else {
>>             break;
>>           }
>>         } while (retry > 0);
>>       }
>>       return list;
>>     }
>>
>>     public Integer assignVolumeId(){
>>       Map.Entry<Integer, Integer> volumeEntry = null;
>>
>>       // choose a volume with the lowest concurrency, right?
>>       for (Map.Entry<Integer, Integer> entry : volumeUsageMap.entrySet()) {
>>         if(volumeEntry == null) volumeEntry = entry;
>>
>>         if (volumeEntry.getValue() >= entry.getValue()) {
>>           volumeEntry = entry;
>>         }
>>       }
>>
>>       if(volumeEntry != null){
>>         volumeUsageMap.put(volumeEntry.getKey(), volumeEntry.getValue() +
>> 1);
>>         LOG.info("Assigned host : " + host + " Volume : " +
>> volumeEntry.getKey() + ", Concurrency : "
>>             + volumeUsageMap.get(volumeEntry.getKey()));
>>         return volumeEntry.getKey();
>>       } else {
>>          return -1;  // processed all block on disk
>>       }
>>     }
>>
>>     public String getHost() {
>>       return host;
>>     }
>>   }
>>
>> This class maintains a mapping (assignedContainerMap) from containerId to
>> the assigned diskId, How can I retrieve a task based on the diskId to the
>> container?
>>
>>
>> Thanks,
>> Min
>>
>>
>> On Wed, Feb 12, 2014 at 10:17 PM, Jihoon Son <jihoonson@apache.org> wrote:
>>
>> > Hi, Min.
>> >
>> > In DefaultTaskScheduler, each container is mapped to each disk of all
>> nodes
>> > in a cluster. When a container requests a task, DefaultTaskScheduler
>> > selects a closest task and assigns it to the container. This process
>> works
>> > for only the local reads. The disk volume information is not considered
>> for
>> > remote reads.
>> >
>> > In my opinion, this is enough for us because there are few remote tasks
>> in
>> > each sub query. From a test on an in-house cluster composed of 32 nodes,
>> > the ratio of remote tasks to whole tasks was only about 0.17% (The query
>> > was 'select l_orderkey from lineitem', and the volume of the lineitem
>> table
>> > was about 1TB.). Since the number of tasks was very small, there were
>> small
>> > disk contentions.
>> >
>> > Hope that answers your questions.
>> > Thanks,
>> > Jihoon
>> >
>> > 2014-02-13 11:00 GMT+09:00 Min Zhou <coderplay@gmail.com>:
>> >
>> > > Hi all,
>> > >
>> > > Tajo leverages the feature supported by HDFS-3672, which exposes the
>> disk
>> > > volume id of each hdfs data block.  I already found the related code in
>> > > DefaultTaskScheduler.assignToLeafTasks,  can anyone explain the logic
>> for
>> > > me?  What the scheduler do when the hdfs read is a remote read on the
>> > > other
>> > > machine's disk?
>> > >
>> > >
>> > > Thanks,
>> > > Min
>> > > --
>> > > My research interests are distributed systems, parallel computing and
>> > > bytecode based virtual machine.
>> > >
>> > > My profile:
>> > > http://www.linkedin.com/in/coderplay
>> > > My blog:
>> > > http://coderplay.javaeye.com
>> > >
>> >
>>
>>
>>
>> --
>> My research interests are distributed systems, parallel computing and
>> bytecode based virtual machine.
>>
>> My profile:
>> http://www.linkedin.com/in/coderplay
>> My blog:
>> http://coderplay.javaeye.com
>>

Mime
View raw message