tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject git commit: TAJO-314: Make TaskScheduler be pluggable. (jihoon)
Date Mon, 25 Nov 2013 09:02:34 GMT
Updated Branches:
  refs/heads/master 6565d2bf1 -> 92674542d


TAJO-314: Make TaskScheduler be pluggable. (jihoon)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/92674542
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/92674542
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/92674542

Branch: refs/heads/master
Commit: 92674542d03e0c3109950cd3da5f16d1c786425d
Parents: 6565d2b
Author: Jihoon Son <jihoonson@apache.org>
Authored: Mon Nov 25 18:01:39 2013 +0900
Committer: Jihoon Son <jihoonson@apache.org>
Committed: Mon Nov 25 18:02:17 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../tajo/master/AbstractTaskScheduler.java      |  40 ++
 .../tajo/master/DefaultTaskScheduler.java       | 543 ++++++++++++++++++
 .../org/apache/tajo/master/TaskScheduler.java   |  26 -
 .../tajo/master/TaskSchedulerFactory.java       |  74 +++
 .../apache/tajo/master/TaskSchedulerImpl.java   | 544 -------------------
 .../master/event/DefaultTaskSchedulerEvent.java |  91 ++++
 .../master/event/TaskAttemptAssignedEvent.java  |   1 +
 .../master/event/TaskAttemptScheduleEvent.java  |  37 ++
 .../event/TaskAttemptStatusUpdateEvent.java     |   1 +
 .../tajo/master/event/TaskCompletionEvent.java  |   1 +
 .../tajo/master/event/TaskFatalErrorEvent.java  |   1 +
 .../tajo/master/event/TaskScheduleEvent.java    |  69 ---
 .../tajo/master/event/TaskSchedulerEvent.java   |   4 +-
 .../master/event/TaskSchedulerEventFactory.java |  67 +++
 .../querymaster/QueryMasterManagerService.java  |   4 +-
 .../tajo/master/querymaster/QueryUnit.java      |  12 +-
 .../master/querymaster/QueryUnitAttempt.java    |  30 +-
 .../tajo/master/querymaster/Repartitioner.java  |  14 +-
 .../tajo/master/querymaster/SubQuery.java       |  12 +-
 .../tajo/worker/TajoWorkerManagerService.java   |   9 -
 .../src/main/resources/tajo-default.xml         |  24 +
 22 files changed, 913 insertions(+), 693 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/92674542/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ea7cf8d..e1a824d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,8 @@ Release 0.8.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-314: Make TaskScheduler be pluggable. (jihoon)
+
     TAJO-325: QueryState.NEW and QueryState.INIT should be combined into one
     state. (Min Zhou via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/92674542/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
new file mode 100644
index 0000000..ed2529a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tajo.master.event.TaskRequestEvent;
+import org.apache.tajo.master.event.TaskSchedulerEvent;
+
+
+public abstract class AbstractTaskScheduler extends AbstractService
+    implements EventHandler<TaskSchedulerEvent> {
+
+  /**
+   * Construct the service.
+   *
+   * @param name service name
+   */
+  public AbstractTaskScheduler(String name) {
+    super(name);
+  }
+
+  public abstract void handleTaskRequestEvent(TaskRequestEvent event);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/92674542/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
new file mode 100644
index 0000000..d432017
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -0,0 +1,543 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.query.QueryUnitRequest;
+import org.apache.tajo.engine.query.QueryUnitRequestImpl;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.event.DefaultTaskSchedulerEvent;
+import org.apache.tajo.master.event.TaskAttemptAssignedEvent;
+import org.apache.tajo.master.event.TaskRequestEvent;
+import org.apache.tajo.master.event.TaskSchedulerEvent;
+import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.master.querymaster.QueryUnit;
+import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.storage.DataLocation;
+import org.apache.tajo.util.NetUtils;
+
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
+public class DefaultTaskScheduler extends AbstractTaskScheduler {
+  private static final Log LOG = LogFactory.getLog(DefaultTaskSchedulerEvent.class);
+
+  private final QueryMasterTask.QueryMasterTaskContext context;
+  private TajoAsyncDispatcher dispatcher;
+
+  private Thread eventHandlingThread;
+  private Thread schedulingThread;
+  private volatile boolean stopEventHandling;
+
+  BlockingQueue<TaskSchedulerEvent> eventQueue
+      = new LinkedBlockingQueue<TaskSchedulerEvent>();
+
+  private ScheduledRequests scheduledRequests;
+  private TaskRequests taskRequests;
+
+  private int hostLocalAssigned = 0;
+  private int rackLocalAssigned = 0;
+  private int totalAssigned = 0;
+
+  public DefaultTaskScheduler(QueryMasterTask.QueryMasterTaskContext context) {
+    super(DefaultTaskScheduler.class.getName());
+    this.context = context;
+    this.dispatcher = context.getDispatcher();
+  }
+
+  @Override
+  public void init(Configuration conf) {
+
+    scheduledRequests = new ScheduledRequests();
+    taskRequests  = new TaskRequests();
+
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    LOG.info("Start TaskScheduler");
+    this.eventHandlingThread = new Thread() {
+      public void run() {
+
+        TaskSchedulerEvent event;
+        while(!stopEventHandling && !Thread.currentThread().isInterrupted()) {
+          try {
+            event = eventQueue.take();
+            handleEvent(event);
+          } catch (InterruptedException e) {
+            //LOG.error("Returning, iterrupted : " + e);
+            break;
+          }
+        }
+        LOG.info("TaskScheduler eventHandlingThread stopped");
+      }
+    };
+
+    this.eventHandlingThread.start();
+
+    this.schedulingThread = new Thread() {
+      public void run() {
+
+        while(!stopEventHandling && !Thread.currentThread().isInterrupted()) {
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException e) {
+            break;
+          }
+
+          schedule();
+        }
+        LOG.info("TaskScheduler schedulingThread stopped");
+      }
+    };
+
+    this.schedulingThread.start();
+    super.start();
+  }
+
+  private static final QueryUnitAttemptId NULL_ATTEMPT_ID;
+  public static final TajoWorkerProtocol.QueryUnitRequestProto stopTaskRunnerReq;
+  static {
+    ExecutionBlockId nullSubQuery = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
+    NULL_ATTEMPT_ID = QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(nullSubQuery, 0), 0);
+
+    TajoWorkerProtocol.QueryUnitRequestProto.Builder builder =
+        TajoWorkerProtocol.QueryUnitRequestProto.newBuilder();
+    builder.setId(NULL_ATTEMPT_ID.getProto());
+    builder.setShouldDie(true);
+    builder.setOutputTable("");
+    builder.setSerializedData("");
+    builder.setClusteredOutput(false);
+    stopTaskRunnerReq = builder.build();
+  }
+
+  @Override
+  public void stop() {
+    stopEventHandling = true;
+    eventHandlingThread.interrupt();
+    schedulingThread.interrupt();
+
+    // Return all of request callbacks instantly.
+    for (TaskRequestEvent req : taskRequests.taskRequestQueue) {
+      req.getCallback().run(stopTaskRunnerReq);
+    }
+
+    LOG.info("Task Scheduler stopped");
+    super.stop();
+  }
+
+  private void handleEvent(TaskSchedulerEvent event) {
+    if (event.getType() == EventType.T_SCHEDULE) {
+      DefaultTaskSchedulerEvent castEvent = (DefaultTaskSchedulerEvent) event;
+      if (castEvent.isLeafQuery()) {
+        scheduledRequests.addLeafTask(castEvent);
+      } else {
+        scheduledRequests.addNonLeafTask(castEvent);
+      }
+    }
+  }
+
+  List<TaskRequestEvent> taskRequestEvents = new ArrayList<TaskRequestEvent>();
+  public void schedule() {
+
+    if (taskRequests.size() > 0) {
+      if (scheduledRequests.leafTaskNum() > 0) {
+        LOG.debug("Try to schedule tasks with taskRequestEvents: " +
+            taskRequests.size() + ", LeafTask Schedule Request: " +
+            scheduledRequests.leafTaskNum());
+        taskRequests.getTaskRequests(taskRequestEvents,
+            scheduledRequests.leafTaskNum());
+        LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents ");
+        if (taskRequestEvents.size() > 0) {
+          scheduledRequests.assignToLeafTasks(taskRequestEvents);
+          taskRequestEvents.clear();
+        }
+      }
+    }
+
+    if (taskRequests.size() > 0) {
+      if (scheduledRequests.nonLeafTaskNum() > 0) {
+        LOG.debug("Try to schedule tasks with taskRequestEvents: " +
+            taskRequests.size() + ", NonLeafTask Schedule Request: " +
+            scheduledRequests.nonLeafTaskNum());
+        taskRequests.getTaskRequests(taskRequestEvents,
+            scheduledRequests.nonLeafTaskNum());
+        scheduledRequests.assignToNonLeafTasks(taskRequestEvents);
+        taskRequestEvents.clear();
+      }
+    }
+  }
+
+  @Override
+  public void handle(TaskSchedulerEvent event) {
+    int qSize = eventQueue.size();
+    if (qSize != 0 && qSize % 1000 == 0) {
+      LOG.info("Size of event-queue in YarnRMContainerAllocator is " + qSize);
+    }
+    int remCapacity = eventQueue.remainingCapacity();
+    if (remCapacity < 1000) {
+      LOG.warn("Very low remaining capacity in the event-queue "
+          + "of YarnRMContainerAllocator: " + remCapacity);
+    }
+
+    try {
+      eventQueue.put(event);
+    } catch (InterruptedException e) {
+      throw new InternalError(e.getMessage());
+    }
+  }
+
+  @Override
+  public void handleTaskRequestEvent(TaskRequestEvent event) {
+    taskRequests.handle(event);
+  }
+
+  private class TaskRequests implements EventHandler<TaskRequestEvent> {
+    private final LinkedBlockingQueue<TaskRequestEvent> taskRequestQueue =
+        new LinkedBlockingQueue<TaskRequestEvent>();
+
+    @Override
+    public void handle(TaskRequestEvent event) {
+      LOG.info("TaskRequest: " + event.getContainerId() + "," + event.getExecutionBlockId());
+      if(stopEventHandling) {
+        event.getCallback().run(stopTaskRunnerReq);
+        return;
+      }
+      int qSize = taskRequestQueue.size();
+      if (qSize != 0 && qSize % 1000 == 0) {
+        LOG.info("Size of event-queue in YarnRMContainerAllocator is " + qSize);
+      }
+      int remCapacity = taskRequestQueue.remainingCapacity();
+      if (remCapacity < 1000) {
+        LOG.warn("Very low remaining capacity in the event-queue "
+            + "of YarnRMContainerAllocator: " + remCapacity);
+      }
+
+      taskRequestQueue.add(event);
+    }
+
+    public void getTaskRequests(final Collection<TaskRequestEvent> taskRequests,
+                                int num) {
+      taskRequestQueue.drainTo(taskRequests, num);
+    }
+
+    public int size() {
+      return taskRequestQueue.size();
+    }
+  }
+
+  public static class TaskBlockLocation {
+    private HashMap<Integer, LinkedList<QueryUnitAttemptId>> unAssignedTaskMap =
+        new HashMap<Integer, LinkedList<QueryUnitAttemptId>>();
+    private HashMap<ContainerId, Integer> assignedContainerMap = new HashMap<ContainerId, Integer>();
+    private TreeMap<Integer, Integer> volumeUsageMap = new TreeMap<Integer, Integer>();
+    private String host;
+
+    public TaskBlockLocation(String host){
+      this.host = host;
+    }
+
+    public void addQueryUnitAttemptId(Integer volumeId, QueryUnitAttemptId attemptId){
+      LinkedList<QueryUnitAttemptId> list = unAssignedTaskMap.get(volumeId);
+      if (list == null) {
+        list = new LinkedList<QueryUnitAttemptId>();
+        unAssignedTaskMap.put(volumeId, list);
+      }
+      list.add(attemptId);
+
+      if(!volumeUsageMap.containsKey(volumeId)) volumeUsageMap.put(volumeId, 0);
+    }
+
+    public LinkedList<QueryUnitAttemptId> getQueryUnitAttemptIdList(ContainerId containerId){
+      Integer volumeId;
+
+      if (!assignedContainerMap.containsKey(containerId)) {
+        volumeId = assignVolumeId();
+        assignedContainerMap.put(containerId, volumeId);
+      } else {
+        volumeId = assignedContainerMap.get(containerId);
+      }
+
+      LinkedList<QueryUnitAttemptId> list = null;
+      if (unAssignedTaskMap.size() >  0) {
+        int retry = unAssignedTaskMap.size();
+        do {
+          list = unAssignedTaskMap.get(volumeId);
+          if (list == null || list.size() == 0) {
+            //clean and reassign remaining volume
+            unAssignedTaskMap.remove(volumeId);
+            volumeUsageMap.remove(volumeId);
+            if (volumeId < 0) break; //  processed all block on disk
+
+            volumeId = assignVolumeId();
+            assignedContainerMap.put(containerId, volumeId);
+            retry--;
+          } else {
+            break;
+          }
+        } while (retry > 0);
+      }
+      return list;
+    }
+
+    public Integer assignVolumeId(){
+      Map.Entry<Integer, Integer> volumeEntry = null;
+
+      for (Map.Entry<Integer, Integer> entry : volumeUsageMap.entrySet()) {
+        if(volumeEntry == null) volumeEntry = entry;
+
+        if (volumeEntry.getValue() >= entry.getValue()) {
+          volumeEntry = entry;
+        }
+      }
+
+      if(volumeEntry != null){
+        volumeUsageMap.put(volumeEntry.getKey(), volumeEntry.getValue() + 1);
+        LOG.info("Assigned host : " + host + " Volume : " + volumeEntry.getKey() + ", Concurrency : "
+            + volumeUsageMap.get(volumeEntry.getKey()));
+        return volumeEntry.getKey();
+      } else {
+         return -1;  // processed all block on disk
+      }
+    }
+
+    public String getHost() {
+      return host;
+    }
+  }
+
+  private class ScheduledRequests {
+    private final Set<QueryUnitAttemptId> leafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
+    private final Set<QueryUnitAttemptId> nonLeafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
+    private Map<String, TaskBlockLocation> leafTaskHostMapping = new HashMap<String, TaskBlockLocation>();
+    private final Map<String, LinkedList<QueryUnitAttemptId>> leafTasksRackMapping =
+        new HashMap<String, LinkedList<QueryUnitAttemptId>>();
+
+    public void addLeafTask(DefaultTaskSchedulerEvent event) {
+      List<DataLocation> locations = event.getDataLocations();
+
+      for (DataLocation location : locations) {
+        String host = location.getHost();
+
+        TaskBlockLocation taskBlockLocation = leafTaskHostMapping.get(host);
+        if (taskBlockLocation == null) {
+          taskBlockLocation = new TaskBlockLocation(host);
+          leafTaskHostMapping.put(host, taskBlockLocation);
+        }
+        taskBlockLocation.addQueryUnitAttemptId(location.getVolumeId(), event.getAttemptId());
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Added attempt req to host " + host);
+        }
+      }
+      for (String rack : event.getRacks()) {
+        LinkedList<QueryUnitAttemptId> list = leafTasksRackMapping.get(rack);
+        if (list == null) {
+          list = new LinkedList<QueryUnitAttemptId>();
+          leafTasksRackMapping.put(rack, list);
+        }
+        list.add(event.getAttemptId());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Added attempt req to rack " + rack);
+        }
+      }
+
+      leafTasks.add(event.getAttemptId());
+    }
+
+    public void addNonLeafTask(DefaultTaskSchedulerEvent event) {
+      nonLeafTasks.add(event.getAttemptId());
+    }
+
+    public int leafTaskNum() {
+      return leafTasks.size();
+    }
+
+    public int nonLeafTaskNum() {
+      return nonLeafTasks.size();
+    }
+
+    public Set<QueryUnitAttemptId> assignedRequest = new HashSet<QueryUnitAttemptId>();
+
+    public void assignToLeafTasks(List<TaskRequestEvent> taskRequests) {
+      Collections.shuffle(taskRequests);
+      Iterator<TaskRequestEvent> it = taskRequests.iterator();
+
+      TaskRequestEvent taskRequest;
+      while (it.hasNext() && leafTasks.size() > 0) {
+        taskRequest = it.next();
+        LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
+            "containerId=" + taskRequest.getContainerId());
+        ContainerProxy container = context.getResourceAllocator().getContainer(taskRequest.getContainerId());
+        if(container == null) {
+          continue;
+        }
+        String host = container.getTaskHostName();
+
+        QueryUnitAttemptId attemptId = null;
+        LinkedList<QueryUnitAttemptId> list = null;
+
+        // local disk allocation
+        if(!leafTaskHostMapping.containsKey(host)){
+          host = NetUtils.normalizeHost(host);
+        }
+
+        TaskBlockLocation taskBlockLocation = leafTaskHostMapping.get(host);
+        if (taskBlockLocation != null) {
+          list = taskBlockLocation.getQueryUnitAttemptIdList(taskRequest.getContainerId());
+        }
+
+        while (list != null && list.size() > 0) {
+          QueryUnitAttemptId tId = list.removeFirst();
+
+          if (leafTasks.contains(tId)) {
+            leafTasks.remove(tId);
+            attemptId = tId;
+            //LOG.info(attemptId + " Assigned based on host match " + hostName);
+            hostLocalAssigned++;
+            break;
+          }
+        }
+
+        // rack allocation
+        if (attemptId == null) {
+          String rack = RackResolver.resolve(host).getNetworkLocation();
+          list = leafTasksRackMapping.get(rack);
+          while(list != null && list.size() > 0) {
+
+            QueryUnitAttemptId tId = list.removeFirst();
+
+            if (leafTasks.contains(tId)) {
+              leafTasks.remove(tId);
+              attemptId = tId;
+              //LOG.info(attemptId + "Assigned based on rack match " + rack);
+              rackLocalAssigned++;
+              break;
+            }
+          }
+
+          // random allocation
+          if (attemptId == null && leafTaskNum() > 0) {
+            synchronized (leafTasks){
+              attemptId = leafTasks.iterator().next();
+              leafTasks.remove(attemptId);
+            }
+            //LOG.info(attemptId + " Assigned based on * match");
+          }
+        }
+
+        SubQuery subQuery = context.getQuery().getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
+
+        if (attemptId != null) {
+          QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
+          QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
+              attemptId,
+              new ArrayList<FragmentProto>(task.getAllFragments()),
+              "",
+              false,
+              task.getLogicalPlan().toJson(),
+              context.getQueryContext(),
+              subQuery.getDataChannel(), subQuery.getBlock().getEnforcer());
+          if (!subQuery.getMasterPlan().isRoot(subQuery.getBlock())) {
+            taskAssign.setInterQuery();
+          }
+
+          context.getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
+              taskRequest.getContainerId(),
+              host, container.getTaskPort()));
+          assignedRequest.add(attemptId);
+
+          totalAssigned++;
+          taskRequest.getCallback().run(taskAssign.getProto());
+        } else {
+          throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!");
+        }
+      }
+
+      LOG.debug("HostLocalAssigned / Total: " + hostLocalAssigned + " / " + totalAssigned);
+      LOG.debug("RackLocalAssigned: " + rackLocalAssigned + " / " + totalAssigned);
+    }
+
+    public void assignToNonLeafTasks(List<TaskRequestEvent> taskRequests) {
+      Iterator<TaskRequestEvent> it = taskRequests.iterator();
+
+      TaskRequestEvent taskRequest;
+      while (it.hasNext()) {
+        taskRequest = it.next();
+        LOG.debug("assignToNonLeafTasks: " + taskRequest.getExecutionBlockId());
+
+        QueryUnitAttemptId attemptId;
+        // random allocation
+        if (nonLeafTasks.size() > 0) {
+          synchronized (nonLeafTasks){
+            attemptId = nonLeafTasks.iterator().next();
+            nonLeafTasks.remove(attemptId);
+          }
+          LOG.debug("Assigned based on * match");
+
+          QueryUnit task;
+          SubQuery subQuery = context.getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
+          task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
+          QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
+              attemptId,
+              Lists.newArrayList(task.getAllFragments()),
+              "",
+              false,
+              task.getLogicalPlan().toJson(),
+              context.getQueryContext(),
+              subQuery.getDataChannel(),
+              subQuery.getBlock().getEnforcer());
+          if (!subQuery.getMasterPlan().isRoot(subQuery.getBlock())) {
+            taskAssign.setInterQuery();
+          }
+          for (ScanNode scan : task.getScanNodes()) {
+            Collection<URI> fetches = task.getFetch(scan);
+            if (fetches != null) {
+              for (URI fetch : fetches) {
+                taskAssign.addFetch(scan.getTableName(), fetch);
+              }
+            }
+          }
+
+          ContainerProxy container = context.getResourceAllocator().getContainer(
+              taskRequest.getContainerId());
+          context.getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
+              taskRequest.getContainerId(), container.getTaskHostName(), container.getTaskPort()));
+          taskRequest.getCallback().run(taskAssign.getProto());
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/92674542/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskScheduler.java
deleted file mode 100644
index e8b4a52..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskScheduler.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tajo.master.event.TaskSchedulerEvent;
-
-public interface TaskScheduler extends EventHandler<TaskSchedulerEvent> {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/92674542/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java
new file mode 100644
index 0000000..57132db
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.master.querymaster.QueryMasterTask.QueryMasterTaskContext;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+public class TaskSchedulerFactory {
+
+  private static final Map<String, Class<? extends AbstractTaskScheduler>> CACHED_SCHEDULER_CLASSES = Maps.newConcurrentMap();
+
+  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
+
+  private static final Class<?>[] DEFAULT_SCHEDULER_PARAMS = { QueryMasterTaskContext.class };
+
+  public static <T extends AbstractTaskScheduler> T getTaskSCheduler(Configuration conf, QueryMasterTaskContext context) {
+    T result;
+
+    try {
+      Class<T> schedulerClass = (Class<T>) getTaskSchedulerClass(conf);
+      Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(schedulerClass);
+      if (constructor == null) {
+        constructor = schedulerClass.getDeclaredConstructor(DEFAULT_SCHEDULER_PARAMS);
+        constructor.setAccessible(true);
+        CONSTRUCTOR_CACHE.put(schedulerClass, constructor);
+      }
+      result = constructor.newInstance(new Object[]{context});
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return result;
+  }
+
+  public static Class<? extends AbstractTaskScheduler> getTaskSchedulerClass(Configuration conf) throws IOException {
+    String handlerName = getSchedulerType(conf);
+    Class<? extends AbstractTaskScheduler> schedulerClass = CACHED_SCHEDULER_CLASSES.get(handlerName);
+    if (schedulerClass == null) {
+      schedulerClass = conf.getClass(String.format("tajo.master.scheduler-handler.%s.class", handlerName), null, AbstractTaskScheduler.class);
+      CACHED_SCHEDULER_CLASSES.put(handlerName, schedulerClass);
+    }
+
+    if (schedulerClass == null) {
+      throw new IOException("Unknown Scheduler Type: " + handlerName);
+    }
+
+    return schedulerClass;
+  }
+
+  public static String getSchedulerType(Configuration conf) {
+    return conf.get("tajo.master.scheduler.type", "default");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/92674542/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
deleted file mode 100644
index 4d5a951..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ /dev/null
@@ -1,544 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.engine.planner.logical.ScanNode;
-import org.apache.tajo.engine.query.QueryUnitRequest;
-import org.apache.tajo.engine.query.QueryUnitRequestImpl;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.event.TaskAttemptAssignedEvent;
-import org.apache.tajo.master.event.TaskRequestEvent;
-import org.apache.tajo.master.event.TaskScheduleEvent;
-import org.apache.tajo.master.event.TaskSchedulerEvent;
-import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
-import org.apache.tajo.master.querymaster.QueryUnit;
-import org.apache.tajo.master.querymaster.SubQuery;
-import org.apache.tajo.storage.DataLocation;
-import org.apache.tajo.util.NetUtils;
-
-import java.net.URI;
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-
-public class TaskSchedulerImpl extends AbstractService
-    implements TaskScheduler {
-  private static final Log LOG = LogFactory.getLog(TaskScheduleEvent.class);
-
-  private final QueryMasterTask.QueryMasterTaskContext context;
-  private TajoAsyncDispatcher dispatcher;
-
-  private Thread eventHandlingThread;
-  private Thread schedulingThread;
-  private volatile boolean stopEventHandling;
-
-  BlockingQueue<TaskSchedulerEvent> eventQueue
-      = new LinkedBlockingQueue<TaskSchedulerEvent>();
-
-  private ScheduledRequests scheduledRequests;
-  private TaskRequests taskRequests;
-
-  private int hostLocalAssigned = 0;
-  private int rackLocalAssigned = 0;
-  private int totalAssigned = 0;
-
-  public TaskSchedulerImpl(QueryMasterTask.QueryMasterTaskContext context) {
-    super(TaskSchedulerImpl.class.getName());
-    this.context = context;
-    this.dispatcher = context.getDispatcher();
-  }
-
-  @Override
-  public void init(Configuration conf) {
-
-    scheduledRequests = new ScheduledRequests();
-    taskRequests  = new TaskRequests();
-
-    super.init(conf);
-  }
-
-  @Override
-  public void start() {
-    LOG.info("Start TaskScheduler");
-    this.eventHandlingThread = new Thread() {
-      public void run() {
-
-        TaskSchedulerEvent event;
-        while(!stopEventHandling && !Thread.currentThread().isInterrupted()) {
-          try {
-            event = eventQueue.take();
-            handleEvent(event);
-          } catch (InterruptedException e) {
-            //LOG.error("Returning, iterrupted : " + e);
-            break;
-          }
-        }
-        LOG.info("TaskScheduler eventHandlingThread stopped");
-      }
-    };
-
-    this.eventHandlingThread.start();
-
-    this.schedulingThread = new Thread() {
-      public void run() {
-
-        while(!stopEventHandling && !Thread.currentThread().isInterrupted()) {
-          try {
-            Thread.sleep(100);
-          } catch (InterruptedException e) {
-            break;
-          }
-
-          schedule();
-        }
-        LOG.info("TaskScheduler schedulingThread stopped");
-      }
-    };
-
-    this.schedulingThread.start();
-    super.start();
-  }
-
-  private static final QueryUnitAttemptId NULL_ATTEMPT_ID;
-  public static final TajoWorkerProtocol.QueryUnitRequestProto stopTaskRunnerReq;
-  static {
-    ExecutionBlockId nullSubQuery = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
-    NULL_ATTEMPT_ID = QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(nullSubQuery, 0), 0);
-
-    TajoWorkerProtocol.QueryUnitRequestProto.Builder builder =
-        TajoWorkerProtocol.QueryUnitRequestProto.newBuilder();
-    builder.setId(NULL_ATTEMPT_ID.getProto());
-    builder.setShouldDie(true);
-    builder.setOutputTable("");
-    builder.setSerializedData("");
-    builder.setClusteredOutput(false);
-    stopTaskRunnerReq = builder.build();
-  }
-
-  @Override
-  public void stop() {
-    stopEventHandling = true;
-    eventHandlingThread.interrupt();
-    schedulingThread.interrupt();
-
-    // Return all of request callbacks instantly.
-    for (TaskRequestEvent req : taskRequests.taskRequestQueue) {
-      req.getCallback().run(stopTaskRunnerReq);
-    }
-
-    LOG.info("Task Scheduler stopped");
-    super.stop();
-  }
-
-  private void handleEvent(TaskSchedulerEvent event) {
-    if (event.getType() == EventType.T_SCHEDULE) {
-      TaskScheduleEvent castEvent = (TaskScheduleEvent) event;
-      if (castEvent.isLeafQuery()) {
-        scheduledRequests.addLeafTask(castEvent);
-      } else {
-        scheduledRequests.addNonLeafTask(castEvent);
-      }
-    }
-  }
-
-  List<TaskRequestEvent> taskRequestEvents = new ArrayList<TaskRequestEvent>();
-  public void schedule() {
-
-    if (taskRequests.size() > 0) {
-      if (scheduledRequests.leafTaskNum() > 0) {
-        LOG.debug("Try to schedule tasks with taskRequestEvents: " +
-            taskRequests.size() + ", LeafTask Schedule Request: " +
-            scheduledRequests.leafTaskNum());
-        taskRequests.getTaskRequests(taskRequestEvents,
-            scheduledRequests.leafTaskNum());
-        LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents ");
-        if (taskRequestEvents.size() > 0) {
-          scheduledRequests.assignToLeafTasks(taskRequestEvents);
-          taskRequestEvents.clear();
-        }
-      }
-    }
-
-    if (taskRequests.size() > 0) {
-      if (scheduledRequests.nonLeafTaskNum() > 0) {
-        LOG.debug("Try to schedule tasks with taskRequestEvents: " +
-            taskRequests.size() + ", NonLeafTask Schedule Request: " +
-            scheduledRequests.nonLeafTaskNum());
-        taskRequests.getTaskRequests(taskRequestEvents,
-            scheduledRequests.nonLeafTaskNum());
-        scheduledRequests.assignToNonLeafTasks(taskRequestEvents);
-        taskRequestEvents.clear();
-      }
-    }
-  }
-
-  @Override
-  public void handle(TaskSchedulerEvent event) {
-    int qSize = eventQueue.size();
-    if (qSize != 0 && qSize % 1000 == 0) {
-      LOG.info("Size of event-queue in YarnRMContainerAllocator is " + qSize);
-    }
-    int remCapacity = eventQueue.remainingCapacity();
-    if (remCapacity < 1000) {
-      LOG.warn("Very low remaining capacity in the event-queue "
-          + "of YarnRMContainerAllocator: " + remCapacity);
-    }
-
-    try {
-      eventQueue.put(event);
-    } catch (InterruptedException e) {
-      throw new InternalError(e.getMessage());
-    }
-  }
-
-  public void handleTaskRequestEvent(TaskRequestEvent event) {
-    taskRequests.handle(event);
-  }
-
-  private class TaskRequests implements EventHandler<TaskRequestEvent> {
-    private final LinkedBlockingQueue<TaskRequestEvent> taskRequestQueue =
-        new LinkedBlockingQueue<TaskRequestEvent>();
-
-    @Override
-    public void handle(TaskRequestEvent event) {
-      LOG.info("TaskRequest: " + event.getContainerId() + "," + event.getExecutionBlockId());
-      if(stopEventHandling) {
-        event.getCallback().run(stopTaskRunnerReq);
-        return;
-      }
-      int qSize = taskRequestQueue.size();
-      if (qSize != 0 && qSize % 1000 == 0) {
-        LOG.info("Size of event-queue in YarnRMContainerAllocator is " + qSize);
-      }
-      int remCapacity = taskRequestQueue.remainingCapacity();
-      if (remCapacity < 1000) {
-        LOG.warn("Very low remaining capacity in the event-queue "
-            + "of YarnRMContainerAllocator: " + remCapacity);
-      }
-
-      taskRequestQueue.add(event);
-    }
-
-    public void getTaskRequests(final Collection<TaskRequestEvent> taskRequests,
-                                int num) {
-      taskRequestQueue.drainTo(taskRequests, num);
-    }
-
-    public int size() {
-      return taskRequestQueue.size();
-    }
-  }
-
-  public static class TaskBlockLocation {
-    private HashMap<Integer, LinkedList<QueryUnitAttemptId>> unAssignedTaskMap =
-        new HashMap<Integer, LinkedList<QueryUnitAttemptId>>();
-    private HashMap<ContainerId, Integer> assignedContainerMap = new HashMap<ContainerId, Integer>();
-    private TreeMap<Integer, Integer> volumeUsageMap = new TreeMap<Integer, Integer>();
-    private String host;
-
-    public TaskBlockLocation(String host){
-      this.host = host;
-    }
-
-    public void addQueryUnitAttemptId(Integer volumeId, QueryUnitAttemptId attemptId){
-      LinkedList<QueryUnitAttemptId> list = unAssignedTaskMap.get(volumeId);
-      if (list == null) {
-        list = new LinkedList<QueryUnitAttemptId>();
-        unAssignedTaskMap.put(volumeId, list);
-      }
-      list.add(attemptId);
-
-      if(!volumeUsageMap.containsKey(volumeId)) volumeUsageMap.put(volumeId, 0);
-    }
-
-    public LinkedList<QueryUnitAttemptId> getQueryUnitAttemptIdList(ContainerId containerId){
-      Integer volumeId;
-
-      if (!assignedContainerMap.containsKey(containerId)) {
-        volumeId = assignVolumeId();
-        assignedContainerMap.put(containerId, volumeId);
-      } else {
-        volumeId = assignedContainerMap.get(containerId);
-      }
-
-      LinkedList<QueryUnitAttemptId> list = null;
-      if (unAssignedTaskMap.size() >  0) {
-        int retry = unAssignedTaskMap.size();
-        do {
-          list = unAssignedTaskMap.get(volumeId);
-          if (list == null || list.size() == 0) {
-            //clean and reassign remaining volume
-            unAssignedTaskMap.remove(volumeId);
-            volumeUsageMap.remove(volumeId);
-            if (volumeId < 0) break; //  processed all block on disk
-
-            volumeId = assignVolumeId();
-            assignedContainerMap.put(containerId, volumeId);
-            retry--;
-          } else {
-            break;
-          }
-        } while (retry > 0);
-      }
-      return list;
-    }
-
-    public Integer assignVolumeId(){
-      Map.Entry<Integer, Integer> volumeEntry = null;
-
-      for (Map.Entry<Integer, Integer> entry : volumeUsageMap.entrySet()) {
-        if(volumeEntry == null) volumeEntry = entry;
-
-        if (volumeEntry.getValue() >= entry.getValue()) {
-          volumeEntry = entry;
-        }
-      }
-
-      if(volumeEntry != null){
-        volumeUsageMap.put(volumeEntry.getKey(), volumeEntry.getValue() + 1);
-        LOG.info("Assigned host : " + host + " Volume : " + volumeEntry.getKey() + ", Concurrency : "
-            + volumeUsageMap.get(volumeEntry.getKey()));
-        return volumeEntry.getKey();
-      } else {
-         return -1;  // processed all block on disk
-      }
-    }
-
-    public String getHost() {
-      return host;
-    }
-  }
-
-  private class ScheduledRequests {
-    private final Set<QueryUnitAttemptId> leafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
-    private final Set<QueryUnitAttemptId> nonLeafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
-    private Map<String, TaskBlockLocation> leafTaskHostMapping = new HashMap<String, TaskBlockLocation>();
-    private final Map<String, LinkedList<QueryUnitAttemptId>> leafTasksRackMapping =
-        new HashMap<String, LinkedList<QueryUnitAttemptId>>();
-
-    public void addLeafTask(TaskScheduleEvent event) {
-      List<DataLocation> locations = event.getDataLocations();
-
-      for (DataLocation location : locations) {
-        String host = location.getHost();
-
-        TaskBlockLocation taskBlockLocation = leafTaskHostMapping.get(host);
-        if (taskBlockLocation == null) {
-          taskBlockLocation = new TaskBlockLocation(host);
-          leafTaskHostMapping.put(host, taskBlockLocation);
-        }
-        taskBlockLocation.addQueryUnitAttemptId(location.getVolumeId(), event.getAttemptId());
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Added attempt req to host " + host);
-        }
-      }
-      for (String rack : event.getRacks()) {
-        LinkedList<QueryUnitAttemptId> list = leafTasksRackMapping.get(rack);
-        if (list == null) {
-          list = new LinkedList<QueryUnitAttemptId>();
-          leafTasksRackMapping.put(rack, list);
-        }
-        list.add(event.getAttemptId());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Added attempt req to rack " + rack);
-        }
-      }
-
-      leafTasks.add(event.getAttemptId());
-    }
-
-    public void addNonLeafTask(TaskScheduleEvent event) {
-      nonLeafTasks.add(event.getAttemptId());
-    }
-
-    public int leafTaskNum() {
-      return leafTasks.size();
-    }
-
-    public int nonLeafTaskNum() {
-      return nonLeafTasks.size();
-    }
-
-    public Set<QueryUnitAttemptId> assignedRequest = new HashSet<QueryUnitAttemptId>();
-
-    public void assignToLeafTasks(List<TaskRequestEvent> taskRequests) {
-      Collections.shuffle(taskRequests);
-      Iterator<TaskRequestEvent> it = taskRequests.iterator();
-
-      TaskRequestEvent taskRequest;
-      while (it.hasNext() && leafTasks.size() > 0) {
-        taskRequest = it.next();
-        LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
-            "containerId=" + taskRequest.getContainerId());
-        ContainerProxy container = context.getResourceAllocator().getContainer(taskRequest.getContainerId());
-        if(container == null) {
-          continue;
-        }
-        String host = container.getTaskHostName();
-
-        QueryUnitAttemptId attemptId = null;
-        LinkedList<QueryUnitAttemptId> list = null;
-
-        // local disk allocation
-        if(!leafTaskHostMapping.containsKey(host)){
-          host = NetUtils.normalizeHost(host);
-        }
-
-        TaskBlockLocation taskBlockLocation = leafTaskHostMapping.get(host);
-        if (taskBlockLocation != null) {
-          list = taskBlockLocation.getQueryUnitAttemptIdList(taskRequest.getContainerId());
-        }
-
-        while (list != null && list.size() > 0) {
-          QueryUnitAttemptId tId = list.removeFirst();
-
-          if (leafTasks.contains(tId)) {
-            leafTasks.remove(tId);
-            attemptId = tId;
-            //LOG.info(attemptId + " Assigned based on host match " + hostName);
-            hostLocalAssigned++;
-            break;
-          }
-        }
-
-        // rack allocation
-        if (attemptId == null) {
-          String rack = RackResolver.resolve(host).getNetworkLocation();
-          list = leafTasksRackMapping.get(rack);
-          while(list != null && list.size() > 0) {
-
-            QueryUnitAttemptId tId = list.removeFirst();
-
-            if (leafTasks.contains(tId)) {
-              leafTasks.remove(tId);
-              attemptId = tId;
-              //LOG.info(attemptId + "Assigned based on rack match " + rack);
-              rackLocalAssigned++;
-              break;
-            }
-          }
-
-          // random allocation
-          if (attemptId == null && leafTaskNum() > 0) {
-            synchronized (leafTasks){
-              attemptId = leafTasks.iterator().next();
-              leafTasks.remove(attemptId);
-            }
-            //LOG.info(attemptId + " Assigned based on * match");
-          }
-        }
-
-        SubQuery subQuery = context.getQuery().getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
-
-        if (attemptId != null) {
-          QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
-          QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
-              attemptId,
-              new ArrayList<FragmentProto>(task.getAllFragments()),
-              "",
-              false,
-              task.getLogicalPlan().toJson(),
-              context.getQueryContext(),
-              subQuery.getDataChannel(), subQuery.getBlock().getEnforcer());
-          if (!subQuery.getMasterPlan().isRoot(subQuery.getBlock())) {
-            taskAssign.setInterQuery();
-          }
-
-          context.getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
-              taskRequest.getContainerId(),
-              host, container.getTaskPort()));
-          assignedRequest.add(attemptId);
-
-          totalAssigned++;
-          taskRequest.getCallback().run(taskAssign.getProto());
-        } else {
-          throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!");
-        }
-      }
-
-      LOG.debug("HostLocalAssigned / Total: " + hostLocalAssigned + " / " + totalAssigned);
-      LOG.debug("RackLocalAssigned: " + rackLocalAssigned + " / " + totalAssigned);
-    }
-
-    public void assignToNonLeafTasks(List<TaskRequestEvent> taskRequests) {
-      Iterator<TaskRequestEvent> it = taskRequests.iterator();
-
-      TaskRequestEvent taskRequest;
-      while (it.hasNext()) {
-        taskRequest = it.next();
-        LOG.debug("assignToNonLeafTasks: " + taskRequest.getExecutionBlockId());
-
-        QueryUnitAttemptId attemptId;
-        // random allocation
-        if (nonLeafTasks.size() > 0) {
-          synchronized (nonLeafTasks){
-            attemptId = nonLeafTasks.iterator().next();
-            nonLeafTasks.remove(attemptId);
-          }
-          LOG.debug("Assigned based on * match");
-
-          QueryUnit task;
-          SubQuery subQuery = context.getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
-          task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
-          QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
-              attemptId,
-              Lists.newArrayList(task.getAllFragments()),
-              "",
-              false,
-              task.getLogicalPlan().toJson(),
-              context.getQueryContext(),
-              subQuery.getDataChannel(),
-              subQuery.getBlock().getEnforcer());
-          if (!subQuery.getMasterPlan().isRoot(subQuery.getBlock())) {
-            taskAssign.setInterQuery();
-          }
-          for (ScanNode scan : task.getScanNodes()) {
-            Collection<URI> fetches = task.getFetch(scan);
-            if (fetches != null) {
-              for (URI fetch : fetches) {
-                taskAssign.addFetch(scan.getTableName(), fetch);
-              }
-            }
-          }
-
-          ContainerProxy container = context.getResourceAllocator().getContainer(
-              taskRequest.getContainerId());
-          context.getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
-              taskRequest.getContainerId(), container.getTaskHostName(), container.getTaskPort()));
-          taskRequest.getCallback().run(taskAssign.getProto());
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/92674542/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/DefaultTaskSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/DefaultTaskSchedulerEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/DefaultTaskSchedulerEvent.java
new file mode 100644
index 0000000..00bce5b
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/DefaultTaskSchedulerEvent.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.master.querymaster.QueryUnitAttempt;
+import org.apache.tajo.storage.DataLocation;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class DefaultTaskSchedulerEvent extends TaskSchedulerEvent {
+  private final QueryUnitAttemptId attemptId;
+  private final boolean isLeafQuery;
+  private final List<DataLocation> dataLocations;
+  private final String[] racks;
+
+  public DefaultTaskSchedulerEvent(final EventType eventType,
+                                   final QueryUnitAttempt attempt) {
+    super(eventType, attempt.getId().getQueryUnitId().getExecutionBlockId());
+    this.attemptId = attempt.getId();
+    this.isLeafQuery = attempt.isLeafTask();
+    if (this.isLeafQuery) {
+      this.dataLocations = attempt.getQueryUnit().getDataLocations();
+      Set<String> racks = new HashSet<String>();
+      for (DataLocation location : attempt.getQueryUnit().getDataLocations()) {
+        racks.add(RackResolver.resolve(location.getHost()).getNetworkLocation());
+      }
+      this.racks = racks.toArray(new String[racks.size()]);
+    } else {
+      this.dataLocations = null;
+      this.racks = null;
+    }
+  }
+
+  public DefaultTaskSchedulerEvent(final QueryUnitAttemptId attemptId,
+                                   final EventType eventType, boolean isLeafQuery,
+                                   final List<DataLocation> dataLocations,
+                                   final String[] racks) {
+    super(eventType, attemptId.getQueryUnitId().getExecutionBlockId());
+    this.attemptId = attemptId;
+    this.isLeafQuery = isLeafQuery;
+    this.dataLocations = dataLocations;
+    this.racks = racks;
+  }
+
+  public QueryUnitAttemptId getAttemptId() {
+    return this.attemptId;
+  }
+
+  public boolean isLeafQuery() {
+    return this.isLeafQuery;
+  }
+
+  public List<DataLocation> getDataLocations() {
+    return this.dataLocations;
+  }
+
+  public String[] getRacks() {
+    return this.racks;
+  }
+
+  @Override
+  public String toString() {
+    return "DefaultTaskSchedulerEvent{" +
+        "attemptId=" + attemptId +
+        ", isLeafQuery=" + isLeafQuery +
+        ", hosts=" + (dataLocations == null ? null : dataLocations) +
+        ", racks=" + (racks == null ? null : Arrays.asList(racks)) +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/92674542/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
index 4934633..ec4a682 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.master.event;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tajo.QueryUnitAttemptId;
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/92674542/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptScheduleEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptScheduleEvent.java
new file mode 100644
index 0000000..8f153af
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptScheduleEvent.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.QueryUnitAttemptId;
+
+public class TaskAttemptScheduleEvent extends TaskAttemptEvent {
+  private Configuration conf;
+
+  public TaskAttemptScheduleEvent(final Configuration conf,
+                                  final QueryUnitAttemptId id,
+                                  final TaskAttemptEventType taskAttemptEventType) {
+    super(id, taskAttemptEventType);
+    this.conf = conf;
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/92674542/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
index d980e05..aad77c8 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.master.event;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.ipc.TajoWorkerProtocol.TaskStatusProto;
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/92674542/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
index 3ee389a..f3e2ab2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.master.event;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport;
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/92674542/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
index d70de8a..b54aab9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.master.event;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.ipc.TajoWorkerProtocol.TaskFatalErrorReport;
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/92674542/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java
deleted file mode 100644
index 3a66af1..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.event;
-
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.storage.DataLocation;
-
-import java.util.Arrays;
-import java.util.List;
-
-public class TaskScheduleEvent extends TaskSchedulerEvent {
-  private final QueryUnitAttemptId attemptId;
-  private final boolean isLeafQuery;
-  private final List<DataLocation> dataLocations;
-  private final String[] racks;
-
-  public TaskScheduleEvent(final QueryUnitAttemptId attemptId,
-                           final EventType eventType, boolean isLeafQuery,
-                           final List<DataLocation> dataLocations,
-                           final String[] racks) {
-    super(eventType, attemptId.getQueryUnitId().getExecutionBlockId());
-    this.attemptId = attemptId;
-    this.isLeafQuery = isLeafQuery;
-    this.dataLocations = dataLocations;
-    this.racks = racks;
-  }
-
-  public QueryUnitAttemptId getAttemptId() {
-    return this.attemptId;
-  }
-
-  public boolean isLeafQuery() {
-    return this.isLeafQuery;
-  }
-
-  public List<DataLocation> getDataLocations() {
-    return this.dataLocations;
-  }
-
-  public String[] getRacks() {
-    return this.racks;
-  }
-
-  @Override
-  public String toString() {
-    return "TaskScheduleEvent{" +
-        "attemptId=" + attemptId +
-        ", isLeafQuery=" + isLeafQuery +
-        ", hosts=" + (dataLocations == null ? null : dataLocations) +
-        ", racks=" + (racks == null ? null : Arrays.asList(racks)) +
-        '}';
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/92674542/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
index 71d8587..007a976 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
@@ -22,13 +22,13 @@ import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
 
-public class TaskSchedulerEvent extends AbstractEvent<EventType> {
+public abstract class TaskSchedulerEvent extends AbstractEvent<EventType> {
   public enum EventType {
     T_SCHEDULE,
     T_SUBQUERY_COMPLETED
   }
 
-  private final ExecutionBlockId executionBlockId;
+  protected final ExecutionBlockId executionBlockId;
 
   public TaskSchedulerEvent(EventType eventType, ExecutionBlockId queryBlockId) {
     super(eventType);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/92674542/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEventFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEventFactory.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEventFactory.java
new file mode 100644
index 0000000..e7bca57
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEventFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.master.TaskSchedulerFactory;
+import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
+import org.apache.tajo.master.querymaster.QueryUnitAttempt;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+public class TaskSchedulerEventFactory {
+  private static final Map<String, Class<? extends TaskSchedulerEvent>> CACHED_EVENT_CLASSES = Maps.newConcurrentMap();
+  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
+  private static final Class<?>[] DEFAULT_EVENT_PARAMS = { EventType.class, QueryUnitAttempt.class };
+
+  public static <T extends TaskSchedulerEvent> T getTaskSchedulerEvent(Configuration conf, QueryUnitAttempt attempt, EventType eventType) {
+    T result;
+
+    try {
+      Class<T> eventClass = (Class<T>) getTaskSchedulerEventClass(conf);
+      Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(eventClass);
+      if (constructor == null) {
+        constructor = eventClass.getDeclaredConstructor(DEFAULT_EVENT_PARAMS);
+        constructor.setAccessible(true);
+        CONSTRUCTOR_CACHE.put(eventClass, constructor);
+      }
+      result = constructor.newInstance(new Object[]{ eventType, attempt });
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return result;
+  }
+
+  public static Class<? extends TaskSchedulerEvent> getTaskSchedulerEventClass(Configuration conf) throws IOException {
+    String handlerName = TaskSchedulerFactory.getSchedulerType(conf);
+    Class<? extends TaskSchedulerEvent> eventClass = CACHED_EVENT_CLASSES.get(handlerName);
+    if (eventClass == null) {
+      eventClass = conf.getClass(String.format("tajo.querymaster.task-schedule-event.%s.class", handlerName), null, TaskSchedulerEvent.class);
+      CACHED_EVENT_CLASSES.put(handlerName, eventClass);
+    }
+
+    if (eventClass == null) {
+      throw new IOException("Unknown Event Type: " + handlerName);
+    }
+    return eventClass;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/92674542/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index 4149749..4f0e128 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -33,7 +33,7 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.TaskSchedulerImpl;
+import org.apache.tajo.master.DefaultTaskScheduler;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.rpc.AsyncRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
@@ -129,7 +129,7 @@ public class QueryMasterManagerService extends CompositeService
 
       if(queryMasterTask == null || queryMasterTask.isStopped()) {
         LOG.debug("getTask:" + cid + ", ebId:" + ebId + ", but query is finished.");
-        done.run(TaskSchedulerImpl.stopTaskRunnerReq);
+        done.run(DefaultTaskScheduler.stopTaskRunnerReq);
       } else {
         LOG.debug("getTask:" + cid + ", ebId:" + ebId);
         queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(cid, ebId, done));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/92674542/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 5f734cb..1d40616 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -22,13 +22,13 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.*;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.QueryUnitId;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.ipc.TajoWorkerProtocol.Partition;
@@ -36,7 +36,6 @@ import org.apache.tajo.master.TaskState;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.storage.DataLocation;
 import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.util.TajoIdUtils;
 
 import java.net.URI;
@@ -53,6 +52,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
   /** Class Logger */
   private static final Log LOG = LogFactory.getLog(QueryUnit.class);
 
+  private final Configuration systemConf;
 	private QueryUnitId taskId;
   private EventHandler eventHandler;
 	private StoreTableNode store = null;
@@ -112,7 +112,9 @@ public class QueryUnit implements EventHandler<TaskEvent> {
   private final Lock readLock;
   private final Lock writeLock;
 
-	public QueryUnit(QueryUnitId id, boolean isLeafTask, EventHandler eventHandler) {
+	public QueryUnit(Configuration conf, QueryUnitId id,
+                   boolean isLeafTask, EventHandler eventHandler) {
+    this.systemConf = conf;
 		this.taskId = id;
     this.eventHandler = eventHandler;
     this.isLeafTask = isLeafTask;
@@ -387,10 +389,10 @@ public class QueryUnit implements EventHandler<TaskEvent> {
     }
 
     if (failedAttempts > 0) {
-      eventHandler.handle(new TaskAttemptEvent(attempt.getId(),
+      eventHandler.handle(new TaskAttemptScheduleEvent(systemConf, attempt.getId(),
           TaskAttemptEventType.TA_RESCHEDULE));
     } else {
-      eventHandler.handle(new TaskAttemptEvent(attempt.getId(),
+      eventHandler.handle(new TaskAttemptScheduleEvent(systemConf, attempt.getId(),
           TaskAttemptEventType.TA_SCHEDULE));
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/92674542/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index 9a70c9a..accb549 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -22,7 +22,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.*;
-import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.catalog.statistics.TableStats;
@@ -31,10 +30,11 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
 import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
-import org.apache.tajo.storage.DataLocation;
 import org.apache.tajo.util.TajoIdUtils;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -203,27 +203,9 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
     @Override
     public void transition(QueryUnitAttempt taskAttempt,
                            TaskAttemptEvent taskAttemptEvent) {
-
-      if (taskAttempt.isLeafTask()
-          && taskAttempt.getQueryUnit().getScanNodes().length == 1) {
-        Set<String> racks = new HashSet<String>();
-        for (DataLocation location : taskAttempt.getQueryUnit().getDataLocations()) {
-          racks.add(RackResolver.resolve(location.getHost()).getNetworkLocation());
-        }
-
-        taskAttempt.eventHandler.handle(new TaskScheduleEvent(
-            taskAttempt.getId(), EventType.T_SCHEDULE, true,
-            taskAttempt.getQueryUnit().getDataLocations(),
-            racks.toArray(new String[racks.size()])
-        ));
-      } else {
-        taskAttempt.eventHandler.handle(new TaskScheduleEvent(
-            taskAttempt.getId(), EventType.T_SCHEDULE,
-            false,
-            null,
-            null
-        ));
-      }
+      TaskAttemptScheduleEvent castEvent = (TaskAttemptScheduleEvent) taskAttemptEvent;
+      taskAttempt.eventHandler.handle(
+          TaskSchedulerEventFactory.getTaskSchedulerEvent(castEvent.getConf(), taskAttempt, EventType.T_SCHEDULE));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/92674542/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index d58a6a6..82d2be4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -40,8 +40,8 @@ import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.exception.InternalException;
 import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
 import org.apache.tajo.storage.AbstractStorageManager;
-import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.TupleRange;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.util.TajoIdUtils;
 
@@ -106,8 +106,9 @@ public class Repartitioner {
     if (leftSmall && rightSmall) {
       LOG.info("[Distributed Join Strategy] : Immediate Two Way Join on Single Machine");
       tasks = new QueryUnit[1];
-      tasks[0] = new QueryUnit(QueryIdFactory.newQueryUnitId(subQuery.getId(), 0),
-          false, subQuery.getEventHandler());
+      tasks[0] = new QueryUnit(subQuery.getContext().getConf(),
+          QueryIdFactory.newQueryUnitId(subQuery.getId(), 0),
+          subQuery.getMasterPlan().isLeaf(execBlock), subQuery.getEventHandler());
       tasks[0].setLogicalPlan(execBlock.getPlan());
       tasks[0].setFragment(scans[0].getCanonicalName(), fragments[0]);
       tasks[0].setFragment(scans[1].getCanonicalName(), fragments[1]);
@@ -226,7 +227,7 @@ public class Repartitioner {
 
   private static QueryUnit newQueryUnit(SubQuery subQuery, int taskId, FileFragment fragment) {
     ExecutionBlock execBlock = subQuery.getBlock();
-    QueryUnit unit = new QueryUnit(
+    QueryUnit unit = new QueryUnit(subQuery.getContext().getConf(),
         QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), subQuery.getMasterPlan().isLeaf(execBlock),
         subQuery.getEventHandler());
     unit.setLogicalPlan(execBlock.getPlan());
@@ -238,7 +239,7 @@ public class Repartitioner {
     ExecutionBlock execBlock = subQuery.getBlock();
     QueryUnit [] tasks = new QueryUnit[taskNum];
     for (int i = 0; i < taskNum; i++) {
-      tasks[i] = new QueryUnit(
+      tasks[i] = new QueryUnit(subQuery.getContext().getConf(),
           QueryIdFactory.newQueryUnitId(subQuery.getId(), i), subQuery.getMasterPlan().isLeaf(execBlock),
           subQuery.getEventHandler());
       tasks[i].setLogicalPlan(execBlock.getPlan());
@@ -567,7 +568,8 @@ public class Repartitioner {
     LogicalNode plan = subQuery.getBlock().getPlan();
     QueryUnit [] tasks = new QueryUnit[num];
     for (int i = 0; i < num; i++) {
-      tasks[i] = new QueryUnit(QueryIdFactory.newQueryUnitId(subQuery.getId(), i),
+      tasks[i] = new QueryUnit(subQuery.getContext().getConf(),
+          QueryIdFactory.newQueryUnitId(subQuery.getId(), i),
           false, subQuery.getEventHandler());
       tasks[i].setFragment2(frag);
       tasks[i].setLogicalPlan(plan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/92674542/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 0c8f395..4aa3866 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -47,10 +47,10 @@ import org.apache.tajo.engine.planner.logical.GroupbyNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.engine.planner.logical.StoreTableNode;
+import org.apache.tajo.master.AbstractTaskScheduler;
 import org.apache.tajo.master.TaskRunnerGroupEvent;
 import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
-import org.apache.tajo.master.TaskScheduler;
-import org.apache.tajo.master.TaskSchedulerImpl;
+import org.apache.tajo.master.TaskSchedulerFactory;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.fragment.FileFragment;
@@ -81,7 +81,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
   private TableStats statistics;
   private EventHandler eventHandler;
   private final AbstractStorageManager sm;
-  private TaskSchedulerImpl taskScheduler;
+  private AbstractTaskScheduler taskScheduler;
   private QueryMasterTask.QueryMasterTaskContext context;
 
   private long startTime;
@@ -177,7 +177,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     return eventHandler;
   }
 
-  public TaskScheduler getTaskScheduler() {
+  public AbstractTaskScheduler getTaskScheduler() {
     return taskScheduler;
   }
 
@@ -461,7 +461,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     }
 
     private void initTaskScheduler(SubQuery subQuery) {
-      subQuery.taskScheduler = new TaskSchedulerImpl(subQuery.context);
+      subQuery.taskScheduler = TaskSchedulerFactory.getTaskSCheduler(subQuery.context.getConf(), subQuery.context);
       subQuery.taskScheduler.init(subQuery.context.getConf());
       subQuery.taskScheduler.start();
     }
@@ -660,7 +660,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
 
     private static QueryUnit newQueryUnit(SubQuery subQuery, int taskId, FileFragment fragment) {
       ExecutionBlock execBlock = subQuery.getBlock();
-      QueryUnit unit = new QueryUnit(
+      QueryUnit unit = new QueryUnit(subQuery.context.getConf(),
           QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), subQuery.masterPlan.isLeaf(execBlock),
           subQuery.eventHandler);
       unit.setLogicalPlan(execBlock.getPlan());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/92674542/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 4190dab..6be2172 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -23,19 +23,10 @@ import com.google.protobuf.RpcController;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.service.CompositeService;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.TaskSchedulerImpl;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.querymaster.QueryMaster;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.rpc.AsyncRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.util.NetUtils;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/92674542/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
index 0ca3f9c..fd30fc3 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
+++ b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
@@ -37,4 +37,28 @@
     <description></description>
   </property>
 
+  <!--- Registered Scheduler Handler -->
+  <property>
+    <name>tajo.master.scheduler-handler</name>
+    <value>default</value>
+  </property>
+
+  <!--- Scheduler Configuration -->
+  <property>
+    <name>tajo.master.scheduler.type</name>
+    <value>default</value>
+  </property>
+
+  <!--- Scheduler Handler -->
+  <property>
+    <name>tajo.master.scheduler-handler.default.class</name>
+    <value>org.apache.tajo.master.DefaultTaskScheduler</value>
+  </property>
+
+  <!-- Scheduler Event handler -->
+  <property>
+    <name>tajo.querymaster.task-schedule-event.default.class</name>
+    <value>org.apache.tajo.master.event.DefaultTaskSchedulerEvent</value>
+  </property>
+
 </configuration>
\ No newline at end of file


Mime
View raw message