tajo-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hyunsik Choi <hyun...@apache.org>
Subject Re: Question about disk-aware scheduling in tajo
Date Thu, 13 Feb 2014 10:18:53 GMT
Hi Min,

Above all, I'm very sorry for lack of documentations on the source code. So
far, we have developed Tajo with insufficient documentations by only
pursuiting a quick and dirty manner. We should fill more documentations on
the source code.

I'm going to explain how Tajo uses disk volume locality. Before this
explanation, I would like to explain the node locality that you may already
know. Similar to MR, Tajo also uses three level locality for each task. For
each task, the scheduler finds local node, closest rack, and random node
sequentially. In Tajo, the scheduler additionally finds the local volume
prior to finding the local node.

The important thing is that we don't need to aware of actual disk volume
IDs in each local node, and we just assigne disk volumes to TaskRunners in
a node in a round robin manner. It would be sufficient to improve the load
balancing by considering disk volume.

Initially, TaskRunners are not mapped to disk volumes in each worker. The
mapping occurs dynamically in the scheduler. For example, there are 6 local
tasks for some node N1 which has 3 disk volumes (v1, v2, and v3). Also,
three TaskRunners (T1, T2, and T3) will be running on the node N1.

When tasks are added to the scheduler, the scheduler gets the disk volume
id from each task. As you know, each volume is just an integer which is
just a logical identifier for just distinguishing different disk volumes.
Then, the scheduler builds a map between disk volume ids (obtained from
BlockStorageLocation) in each node and a list of tasks
(DefaultTaskScheduler::addQueryUnitAttemptId). In other words, each entry
in the map consists of one disk volume id and a list of tasks corresponding
to the disk volume.

When the first task is requested from a TaskRunner T1 in node N1, the
scheduler just assignes the first disk volume v1 to T1, and then it
schedules one task which belongs to the disk volume v1. Later, a task is
requested from a different TaskRunner T2 from node N1, the schedules
assignes the second disk volume v2 to T2, and then it schedules a task
which belongs to the disk volume v2.  Also, a task request is given from T1
again, the scheduler schedules one task in the disk volume v1 to T1 because
T1 is already mapped to v1.

Like MR, Tajo uses a dynamic scheduling, and it works very well in the
environments where each node has different performance disks. If you have
additional question, please feel free to ask.

Also, I'll create a Jira issue to add this explain to DefaultTaskScheduler.

- hyunsik



On Thu, Feb 13, 2014 at 5:29 PM, Min Zhou <coderplay@gmail.com> wrote:

> Hi Jihoon,
>
> Thank you for you answer. However, seem you didn't answer that how tajo use
> disk information to balance the io overhead.
>
> And still can't understand the details,  quite complex to me, especially
> the class TaskBlockLocation
>
>
> public static class TaskBlockLocation {
>     // This is a mapping from diskId to a list of pending task, right?
>     private HashMap<Integer, LinkedList<QueryUnitAttemptId>>
> unAssignedTaskMap =
>         new HashMap<Integer, LinkedList<QueryUnitAttemptId>>();
>    // How can I return a Task to the container according to the diskId?
>     private HashMap<ContainerId, Integer> assignedContainerMap = new
> HashMap<ContainerId, Integer>();
>     private TreeMap<Integer, Integer> volumeUsageMap = new TreeMap<Integer,
> Integer>();
>     private String host;
>
>     public TaskBlockLocation(String host){
>       this.host = host;
>     }
>
>     public void addQueryUnitAttemptId(Integer volumeId, QueryUnitAttemptId
> attemptId){
>       LinkedList<QueryUnitAttemptId> list =
> unAssignedTaskMap.get(volumeId);
>       if (list == null) {
>         list = new LinkedList<QueryUnitAttemptId>();
>         unAssignedTaskMap.put(volumeId, list);
>       }
>       list.add(attemptId);
>
>       if(!volumeUsageMap.containsKey(volumeId))
> volumeUsageMap.put(volumeId, 0);
>     }
>
>     public LinkedList<QueryUnitAttemptId>
> getQueryUnitAttemptIdList(ContainerId containerId){
>       Integer volumeId;
>
>       if (!assignedContainerMap.containsKey(containerId)) {
>         // assign a new container to a volume with the lowest concurrency,
> right?
>         volumeId = assignVolumeId();
>         assignedContainerMap.put(containerId, volumeId);
>       } else {
>         volumeId = assignedContainerMap.get(containerId);
>       }
>
>       LinkedList<QueryUnitAttemptId> list = null;
>       if (unAssignedTaskMap.size() >  0) {
>         int retry = unAssignedTaskMap.size();
>         do {
>           list = unAssignedTaskMap.get(volumeId);
>           if (list == null || list.size() == 0) {
>             //clean and reassign remaining volume
>             unAssignedTaskMap.remove(volumeId);
>             volumeUsageMap.remove(volumeId);
>             if (volumeId < 0) break; //  processed all block on disk
>
>             // WHY THIS LINE ASSIGN A VOLUMEID AGAIN?
>             volumeId = assignVolumeId();
>             // WHY THIS LINE PUT AGAIN?
>             // if the container is a new container, does it put twice??
>             assignedContainerMap.put(containerId, volumeId);
>             retry--;
>           } else {
>             break;
>           }
>         } while (retry > 0);
>       }
>       return list;
>     }
>
>     public Integer assignVolumeId(){
>       Map.Entry<Integer, Integer> volumeEntry = null;
>
>       // choose a volume with the lowest concurrency, right?
>       for (Map.Entry<Integer, Integer> entry : volumeUsageMap.entrySet()) {
>         if(volumeEntry == null) volumeEntry = entry;
>
>         if (volumeEntry.getValue() >= entry.getValue()) {
>           volumeEntry = entry;
>         }
>       }
>
>       if(volumeEntry != null){
>         volumeUsageMap.put(volumeEntry.getKey(), volumeEntry.getValue() +
> 1);
>         LOG.info("Assigned host : " + host + " Volume : " +
> volumeEntry.getKey() + ", Concurrency : "
>             + volumeUsageMap.get(volumeEntry.getKey()));
>         return volumeEntry.getKey();
>       } else {
>          return -1;  // processed all block on disk
>       }
>     }
>
>     public String getHost() {
>       return host;
>     }
>   }
>
> This class maintains a mapping (assignedContainerMap) from containerId to
> the assigned diskId, How can I retrieve a task based on the diskId to the
> container?
>
>
> Thanks,
> Min
>
>
> On Wed, Feb 12, 2014 at 10:17 PM, Jihoon Son <jihoonson@apache.org> wrote:
>
> > Hi, Min.
> >
> > In DefaultTaskScheduler, each container is mapped to each disk of all
> nodes
> > in a cluster. When a container requests a task, DefaultTaskScheduler
> > selects a closest task and assigns it to the container. This process
> works
> > for only the local reads. The disk volume information is not considered
> for
> > remote reads.
> >
> > In my opinion, this is enough for us because there are few remote tasks
> in
> > each sub query. From a test on an in-house cluster composed of 32 nodes,
> > the ratio of remote tasks to whole tasks was only about 0.17% (The query
> > was 'select l_orderkey from lineitem', and the volume of the lineitem
> table
> > was about 1TB.). Since the number of tasks was very small, there were
> small
> > disk contentions.
> >
> > Hope that answers your questions.
> > Thanks,
> > Jihoon
> >
> > 2014-02-13 11:00 GMT+09:00 Min Zhou <coderplay@gmail.com>:
> >
> > > Hi all,
> > >
> > > Tajo leverages the feature supported by HDFS-3672, which exposes the
> disk
> > > volume id of each hdfs data block.  I already found the related code in
> > > DefaultTaskScheduler.assignToLeafTasks,  can anyone explain the logic
> for
> > > me?  What the scheduler do when the hdfs read is a remote read on the
> > > other
> > > machine's disk?
> > >
> > >
> > > Thanks,
> > > Min
> > > --
> > > My research interests are distributed systems, parallel computing and
> > > bytecode based virtual machine.
> > >
> > > My profile:
> > > http://www.linkedin.com/in/coderplay
> > > My blog:
> > > http://coderplay.javaeye.com
> > >
> >
>
>
>
> --
> My research interests are distributed systems, parallel computing and
> bytecode based virtual machine.
>
> My profile:
> http://www.linkedin.com/in/coderplay
> My blog:
> http://coderplay.javaeye.com
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message