tajo-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hyunsik Choi <hyun...@apache.org>
Subject Re: Question about disk-aware scheduling in tajo
Date Wed, 19 Feb 2014 13:38:41 GMT
Henry,

Thank you for your comment :)

In my opinion, this explanation is too specific, and the comments on the
source code is more accessible for this kind of explanation So, now, I'll
add this explanation on the source code as comments. Later, I'll try to
create some design documentations.

Thanks,
Hyunsik Choi


On Wed, Feb 19, 2014 at 10:44 AM, Henry Saputra <henry.saputra@gmail.com>wrote:

> Hyunsik,
>
> I am +1 this is worthy of a wiki page or separate page to explain the
> technical flow. Tracing the flow in the code is confusing.
>
> Maybe similar to the doc like in YARN [1] page (the more details the
> better ^_^)
>
>
> - Henry
>
> [1]
> http://hadoop.apache.org/docs/current2/hadoop-yarn/hadoop-yarn-site/YARN.html
>
> On Thu, Feb 13, 2014 at 2:18 AM, Hyunsik Choi <hyunsik@apache.org> wrote:
> > Hi Min,
> >
> > Above all, I'm very sorry for lack of documentations on the source code.
> So
> > far, we have developed Tajo with insufficient documentations by only
> > pursuiting a quick and dirty manner. We should fill more documentations
> on
> > the source code.
> >
> > I'm going to explain how Tajo uses disk volume locality. Before this
> > explanation, I would like to explain the node locality that you may
> already
> > know. Similar to MR, Tajo also uses three level locality for each task.
> For
> > each task, the scheduler finds local node, closest rack, and random node
> > sequentially. In Tajo, the scheduler additionally finds the local volume
> > prior to finding the local node.
> >
> > The important thing is that we don't need to aware of actual disk volume
> > IDs in each local node, and we just assigne disk volumes to TaskRunners
> in
> > a node in a round robin manner. It would be sufficient to improve the
> load
> > balancing by considering disk volume.
> >
> > Initially, TaskRunners are not mapped to disk volumes in each worker. The
> > mapping occurs dynamically in the scheduler. For example, there are 6
> local
> > tasks for some node N1 which has 3 disk volumes (v1, v2, and v3). Also,
> > three TaskRunners (T1, T2, and T3) will be running on the node N1.
> >
> > When tasks are added to the scheduler, the scheduler gets the disk volume
> > id from each task. As you know, each volume is just an integer which is
> > just a logical identifier for just distinguishing different disk volumes.
> > Then, the scheduler builds a map between disk volume ids (obtained from
> > BlockStorageLocation) in each node and a list of tasks
> > (DefaultTaskScheduler::addQueryUnitAttemptId). In other words, each entry
> > in the map consists of one disk volume id and a list of tasks
> corresponding
> > to the disk volume.
> >
> > When the first task is requested from a TaskRunner T1 in node N1, the
> > scheduler just assignes the first disk volume v1 to T1, and then it
> > schedules one task which belongs to the disk volume v1. Later, a task is
> > requested from a different TaskRunner T2 from node N1, the schedules
> > assignes the second disk volume v2 to T2, and then it schedules a task
> > which belongs to the disk volume v2.  Also, a task request is given from
> T1
> > again, the scheduler schedules one task in the disk volume v1 to T1
> because
> > T1 is already mapped to v1.
> >
> > Like MR, Tajo uses a dynamic scheduling, and it works very well in the
> > environments where each node has different performance disks. If you have
> > additional question, please feel free to ask.
> >
> > Also, I'll create a Jira issue to add this explain to
> DefaultTaskScheduler.
> >
> > - hyunsik
> >
> >
> >
> > On Thu, Feb 13, 2014 at 5:29 PM, Min Zhou <coderplay@gmail.com> wrote:
> >
> >> Hi Jihoon,
> >>
> >> Thank you for you answer. However, seem you didn't answer that how tajo
> use
> >> disk information to balance the io overhead.
> >>
> >> And still can't understand the details,  quite complex to me, especially
> >> the class TaskBlockLocation
> >>
> >>
> >> public static class TaskBlockLocation {
> >>     // This is a mapping from diskId to a list of pending task, right?
> >>     private HashMap<Integer, LinkedList<QueryUnitAttemptId>>
> >> unAssignedTaskMap =
> >>         new HashMap<Integer, LinkedList<QueryUnitAttemptId>>();
> >>    // How can I return a Task to the container according to the diskId?
> >>     private HashMap<ContainerId, Integer> assignedContainerMap = new
> >> HashMap<ContainerId, Integer>();
> >>     private TreeMap<Integer, Integer> volumeUsageMap = new
> TreeMap<Integer,
> >> Integer>();
> >>     private String host;
> >>
> >>     public TaskBlockLocation(String host){
> >>       this.host = host;
> >>     }
> >>
> >>     public void addQueryUnitAttemptId(Integer volumeId,
> QueryUnitAttemptId
> >> attemptId){
> >>       LinkedList<QueryUnitAttemptId> list =
> >> unAssignedTaskMap.get(volumeId);
> >>       if (list == null) {
> >>         list = new LinkedList<QueryUnitAttemptId>();
> >>         unAssignedTaskMap.put(volumeId, list);
> >>       }
> >>       list.add(attemptId);
> >>
> >>       if(!volumeUsageMap.containsKey(volumeId))
> >> volumeUsageMap.put(volumeId, 0);
> >>     }
> >>
> >>     public LinkedList<QueryUnitAttemptId>
> >> getQueryUnitAttemptIdList(ContainerId containerId){
> >>       Integer volumeId;
> >>
> >>       if (!assignedContainerMap.containsKey(containerId)) {
> >>         // assign a new container to a volume with the lowest
> concurrency,
> >> right?
> >>         volumeId = assignVolumeId();
> >>         assignedContainerMap.put(containerId, volumeId);
> >>       } else {
> >>         volumeId = assignedContainerMap.get(containerId);
> >>       }
> >>
> >>       LinkedList<QueryUnitAttemptId> list = null;
> >>       if (unAssignedTaskMap.size() >  0) {
> >>         int retry = unAssignedTaskMap.size();
> >>         do {
> >>           list = unAssignedTaskMap.get(volumeId);
> >>           if (list == null || list.size() == 0) {
> >>             //clean and reassign remaining volume
> >>             unAssignedTaskMap.remove(volumeId);
> >>             volumeUsageMap.remove(volumeId);
> >>             if (volumeId < 0) break; //  processed all block on disk
> >>
> >>             // WHY THIS LINE ASSIGN A VOLUMEID AGAIN?
> >>             volumeId = assignVolumeId();
> >>             // WHY THIS LINE PUT AGAIN?
> >>             // if the container is a new container, does it put twice??
> >>             assignedContainerMap.put(containerId, volumeId);
> >>             retry--;
> >>           } else {
> >>             break;
> >>           }
> >>         } while (retry > 0);
> >>       }
> >>       return list;
> >>     }
> >>
> >>     public Integer assignVolumeId(){
> >>       Map.Entry<Integer, Integer> volumeEntry = null;
> >>
> >>       // choose a volume with the lowest concurrency, right?
> >>       for (Map.Entry<Integer, Integer> entry :
> volumeUsageMap.entrySet()) {
> >>         if(volumeEntry == null) volumeEntry = entry;
> >>
> >>         if (volumeEntry.getValue() >= entry.getValue()) {
> >>           volumeEntry = entry;
> >>         }
> >>       }
> >>
> >>       if(volumeEntry != null){
> >>         volumeUsageMap.put(volumeEntry.getKey(), volumeEntry.getValue()
> +
> >> 1);
> >>         LOG.info("Assigned host : " + host + " Volume : " +
> >> volumeEntry.getKey() + ", Concurrency : "
> >>             + volumeUsageMap.get(volumeEntry.getKey()));
> >>         return volumeEntry.getKey();
> >>       } else {
> >>          return -1;  // processed all block on disk
> >>       }
> >>     }
> >>
> >>     public String getHost() {
> >>       return host;
> >>     }
> >>   }
> >>
> >> This class maintains a mapping (assignedContainerMap) from containerId
> to
> >> the assigned diskId, How can I retrieve a task based on the diskId to
> the
> >> container?
> >>
> >>
> >> Thanks,
> >> Min
> >>
> >>
> >> On Wed, Feb 12, 2014 at 10:17 PM, Jihoon Son <jihoonson@apache.org>
> wrote:
> >>
> >> > Hi, Min.
> >> >
> >> > In DefaultTaskScheduler, each container is mapped to each disk of all
> >> nodes
> >> > in a cluster. When a container requests a task, DefaultTaskScheduler
> >> > selects a closest task and assigns it to the container. This process
> >> works
> >> > for only the local reads. The disk volume information is not
> considered
> >> for
> >> > remote reads.
> >> >
> >> > In my opinion, this is enough for us because there are few remote
> tasks
> >> in
> >> > each sub query. From a test on an in-house cluster composed of 32
> nodes,
> >> > the ratio of remote tasks to whole tasks was only about 0.17% (The
> query
> >> > was 'select l_orderkey from lineitem', and the volume of the lineitem
> >> table
> >> > was about 1TB.). Since the number of tasks was very small, there were
> >> small
> >> > disk contentions.
> >> >
> >> > Hope that answers your questions.
> >> > Thanks,
> >> > Jihoon
> >> >
> >> > 2014-02-13 11:00 GMT+09:00 Min Zhou <coderplay@gmail.com>:
> >> >
> >> > > Hi all,
> >> > >
> >> > > Tajo leverages the feature supported by HDFS-3672, which exposes the
> >> disk
> >> > > volume id of each hdfs data block.  I already found the related
> code in
> >> > > DefaultTaskScheduler.assignToLeafTasks,  can anyone explain the
> logic
> >> for
> >> > > me?  What the scheduler do when the hdfs read is a remote read on
> the
> >> > > other
> >> > > machine's disk?
> >> > >
> >> > >
> >> > > Thanks,
> >> > > Min
> >> > > --
> >> > > My research interests are distributed systems, parallel computing
> and
> >> > > bytecode based virtual machine.
> >> > >
> >> > > My profile:
> >> > > http://www.linkedin.com/in/coderplay
> >> > > My blog:
> >> > > http://coderplay.javaeye.com
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> My research interests are distributed systems, parallel computing and
> >> bytecode based virtual machine.
> >>
> >> My profile:
> >> http://www.linkedin.com/in/coderplay
> >> My blog:
> >> http://coderplay.javaeye.com
> >>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message