tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [4/6] tajo git commit: TAJO-1267: Remove LazyTaskScheduler. (DaeMyung Kang via jihoon)
Date Thu, 25 Dec 2014 10:28:30 GMT
TAJO-1267: Remove LazyTaskScheduler. (DaeMyung Kang via jihoon)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/533e709b
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/533e709b
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/533e709b

Branch: refs/heads/index_support
Commit: 533e709b75ab7cf8bc8a06b48870dcf2ebc8fe11
Parents: db54965
Author: Jihoon Son <jihoonson@apache.org>
Authored: Thu Dec 25 19:06:54 2014 +0900
Committer: Jihoon Son <jihoonson@apache.org>
Committed: Thu Dec 25 19:06:54 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../DefaultFragmentScheduleAlgorithm.java       | 251 ---------
 .../tajo/master/FragmentScheduleAlgorithm.java  |  38 --
 .../FragmentScheduleAlgorithmFactory.java       |  68 ---
 .../master/GreedyFragmentScheduleAlgorithm.java | 429 ---------------
 .../apache/tajo/master/LazyTaskScheduler.java   | 529 -------------------
 .../querymaster/QueryMasterManagerService.java  |   6 +-
 tajo-core/src/main/resources/tajo-default.xml   |   7 +-
 8 files changed, 6 insertions(+), 1324 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 2ee58a2..124977d 100644
--- a/CHANGES
+++ b/CHANGES
@@ -227,6 +227,8 @@ Release 0.9.1 - unreleased
 
   TASKS
 
+    TAJO-1267: Remove LazyTaskScheduler. (DaeMyung Kang via jihoon)
+
     TAJO-1233: Merge hbase_storage branch to the master branch. 
     (Hyoungjun via hyunsik)
  

http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
deleted file mode 100644
index 406550d..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.NetUtils;
-
-import java.util.*;
-import java.util.Map.Entry;
-
-/**
- * DefaultFragmentScheduleAlgorithm selects a fragment randomly for the given argument.
- * For example, when getHostLocalFragment(host, disk) is called, this algorithm randomly selects a fragment among
- * the fragments which are stored at the disk of the host specified by the arguments.
- */
-public class DefaultFragmentScheduleAlgorithm implements FragmentScheduleAlgorithm {
-  private final static Log LOG = LogFactory.getLog(DefaultFragmentScheduleAlgorithm.class);
-  private Map<String, Map<Integer, FragmentsPerDisk>> fragmentHostMapping =
-      new HashMap<String, Map<Integer, FragmentsPerDisk>>();
-  private Map<String, Set<FragmentPair>> rackFragmentMapping =
-      new HashMap<String, Set<FragmentPair>>();
-  private int fragmentNum = 0;
-  private Random random = new Random(System.currentTimeMillis());
-
-  public static class FragmentsPerDisk {
-    private Integer diskId;
-    private Set<FragmentPair> fragmentPairSet;
-
-    public FragmentsPerDisk(Integer diskId) {
-      this.diskId = diskId;
-      this.fragmentPairSet = Collections.newSetFromMap(new HashMap<FragmentPair, Boolean>());
-    }
-
-    public Integer getDiskId() {
-      return diskId;
-    }
-
-    public Set<FragmentPair> getFragmentPairSet() {
-      return fragmentPairSet;
-    }
-
-    public void addFragmentPair(FragmentPair fragmentPair) {
-      fragmentPairSet.add(fragmentPair);
-    }
-
-    public boolean removeFragmentPair(FragmentPair fragmentPair) {
-      return fragmentPairSet.remove(fragmentPair);
-    }
-
-    public int size() {
-      return fragmentPairSet.size();
-    }
-
-    public Iterator<FragmentPair> getFragmentPairIterator() {
-      return fragmentPairSet.iterator();
-    }
-
-    public boolean isEmpty() {
-      return fragmentPairSet.isEmpty();
-    }
-  }
-
-  @Override
-  public void addFragment(FragmentPair fragmentPair) {
-    String[] hosts = fragmentPair.getLeftFragment().getHosts();
-    int[] diskIds = null;
-    if (fragmentPair.getLeftFragment() instanceof FileFragment) {
-      diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds();
-    }
-    for (int i = 0; i < hosts.length; i++) {
-      addFragment(hosts[i], diskIds != null ? diskIds[i] : -1, fragmentPair);
-    }
-    fragmentNum++;
-  }
-
-  private void addFragment(String host, Integer diskId, FragmentPair fragmentPair) {
-    // update the fragment maps per host
-    String normalizeHost = NetUtils.normalizeHost(host);
-    Map<Integer, FragmentsPerDisk> diskFragmentMap;
-    if (fragmentHostMapping.containsKey(normalizeHost)) {
-      diskFragmentMap = fragmentHostMapping.get(normalizeHost);
-    } else {
-      diskFragmentMap = new HashMap<Integer, FragmentsPerDisk>();
-      fragmentHostMapping.put(normalizeHost, diskFragmentMap);
-    }
-    FragmentsPerDisk fragmentsPerDisk;
-    if (diskFragmentMap.containsKey(diskId)) {
-      fragmentsPerDisk = diskFragmentMap.get(diskId);
-    } else {
-      fragmentsPerDisk = new FragmentsPerDisk(diskId);
-      diskFragmentMap.put(diskId, fragmentsPerDisk);
-    }
-    fragmentsPerDisk.addFragmentPair(fragmentPair);
-
-    // update the fragment maps per rack
-    String rack = RackResolver.resolve(normalizeHost).getNetworkLocation();
-    Set<FragmentPair> fragmentPairList;
-    if (rackFragmentMapping.containsKey(rack)) {
-      fragmentPairList = rackFragmentMapping.get(rack);
-    } else {
-      fragmentPairList = Collections.newSetFromMap(new HashMap<FragmentPair, Boolean>());
-      rackFragmentMapping.put(rack, fragmentPairList);
-    }
-    fragmentPairList.add(fragmentPair);
-  }
-
-  @Override
-  public void removeFragment(FragmentPair fragmentPair) {
-    boolean removed = false;
-    for (String eachHost : fragmentPair.getLeftFragment().getHosts()) {
-      String normalizedHost = NetUtils.normalizeHost(eachHost);
-      Map<Integer, FragmentsPerDisk> diskFragmentMap = fragmentHostMapping.get(normalizedHost);
-      for (Entry<Integer, FragmentsPerDisk> entry : diskFragmentMap.entrySet()) {
-        FragmentsPerDisk fragmentsPerDisk = entry.getValue();
-        removed = fragmentsPerDisk.removeFragmentPair(fragmentPair);
-        if (removed) {
-          if (fragmentsPerDisk.size() == 0) {
-            diskFragmentMap.remove(entry.getKey());
-          }
-          if (diskFragmentMap.size() == 0) {
-            fragmentHostMapping.remove(normalizedHost);
-          }
-          break;
-        }
-      }
-      String rack = RackResolver.resolve(normalizedHost).getNetworkLocation();
-      if (rackFragmentMapping.containsKey(rack)) {
-        Set<FragmentPair> fragmentPairs = rackFragmentMapping.get(rack);
-        fragmentPairs.remove(fragmentPair);
-        if (fragmentPairs.size() == 0) {
-          rackFragmentMapping.remove(rack);
-        }
-      }
-    }
-    if (removed) {
-      fragmentNum--;
-    }
-  }
-
-  /**
-   * Randomly select a fragment among the fragments stored on the host.
-   * @param host
-   * @return a randomly selected fragment
-   */
-  @Override
-  public FragmentPair getHostLocalFragment(String host) {
-    String normalizedHost = NetUtils.normalizeHost(host);
-    if (fragmentHostMapping.containsKey(normalizedHost)) {
-      Collection<FragmentsPerDisk> disks = fragmentHostMapping.get(normalizedHost).values();
-      Iterator<FragmentsPerDisk> diskIterator = disks.iterator();
-      int randomIndex = random.nextInt(disks.size());
-      FragmentsPerDisk fragmentsPerDisk = null;
-      for (int i = 0; i < randomIndex; i++) {
-        fragmentsPerDisk = diskIterator.next();
-      }
-
-      if (fragmentsPerDisk != null) {
-        Iterator<FragmentPair> fragmentIterator = fragmentsPerDisk.getFragmentPairIterator();
-        if (fragmentIterator.hasNext()) {
-          return fragmentIterator.next();
-        }
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Randomly select a fragment among the fragments stored at the disk of the host.
-   * @param host
-   * @param diskId
-   * @return a randomly selected fragment
-   */
-  @Override
-  public FragmentPair getHostLocalFragment(String host, Integer diskId) {
-    String normalizedHost = NetUtils.normalizeHost(host);
-    if (fragmentHostMapping.containsKey(normalizedHost)) {
-      Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap = fragmentHostMapping.get(normalizedHost);
-      if (fragmentsPerDiskMap.containsKey(diskId)) {
-        FragmentsPerDisk fragmentsPerDisk = fragmentsPerDiskMap.get(diskId);
-        if (!fragmentsPerDisk.isEmpty()) {
-          return fragmentsPerDisk.getFragmentPairIterator().next();
-        }
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Randomly select a fragment among the fragments stored on nodes of the same rack with the host.
-   * @param host
-   * @return a randomly selected fragment
-   */
-  @Override
-  public FragmentPair getRackLocalFragment(String host) {
-    String rack = RackResolver.resolve(host).getNetworkLocation();
-    if (rackFragmentMapping.containsKey(rack)) {
-      Set<FragmentPair> fragmentPairs = rackFragmentMapping.get(rack);
-      if (!fragmentPairs.isEmpty()) {
-        return fragmentPairs.iterator().next();
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Randomly select a fragment among the total fragments.
-   * @return a randomly selected fragment
-   */
-  @Override
-  public FragmentPair getRandomFragment() {
-    if (!fragmentHostMapping.isEmpty()) {
-      return fragmentHostMapping.values().iterator().next().values().iterator().next().getFragmentPairIterator().next();
-    }
-    return null;
-  }
-
-  @Override
-  public FragmentPair[] getAllFragments() {
-    List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>();
-    for (Map<Integer, FragmentsPerDisk> eachDiskFragmentMap : fragmentHostMapping.values()) {
-      for (FragmentsPerDisk fragmentsPerDisk : eachDiskFragmentMap.values()) {
-        fragmentPairs.addAll(fragmentsPerDisk.fragmentPairSet);
-      }
-    }
-    return fragmentPairs.toArray(new FragmentPair[fragmentPairs.size()]);
-  }
-
-  @Override
-  public int size() {
-    return fragmentNum;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java
deleted file mode 100644
index 10d993d..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-/**
- * FragmentScheduleAlgorithm is used by LazyTaskScheduler.
- * FragmentScheduleAlgorithm selects a fragment for the given argument.
- *
- * There are two implementations of DefaultFragmentScheduleAlgorithm and GreedyFragmentScheduleAlgorithm.
- */
-public interface FragmentScheduleAlgorithm {
-  void addFragment(FragmentPair fragmentPair);
-  void removeFragment(FragmentPair fragmentPair);
-
-  FragmentPair getHostLocalFragment(String host);
-  FragmentPair getHostLocalFragment(String host, Integer diskId);
-  FragmentPair getRackLocalFragment(String host);
-  FragmentPair getRandomFragment();
-  FragmentPair[] getAllFragments();
-
-  int size();
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java b/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java
deleted file mode 100644
index 820a0fb..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.Map;
-
-public class FragmentScheduleAlgorithmFactory {
-
-  private static Class<? extends FragmentScheduleAlgorithm> CACHED_ALGORITHM_CLASS;
-  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
-  private static final Class<?>[] DEFAULT_PARAMS = {};
-
-  public static Class<? extends FragmentScheduleAlgorithm> getScheduleAlgorithmClass(Configuration conf)
-      throws IOException {
-    if (CACHED_ALGORITHM_CLASS != null) {
-      return CACHED_ALGORITHM_CLASS;
-    } else {
-      CACHED_ALGORITHM_CLASS = conf.getClass("tajo.querymaster.lazy-task-scheduler.algorithm", null,
-          FragmentScheduleAlgorithm.class);
-    }
-
-    if (CACHED_ALGORITHM_CLASS == null) {
-      throw new IOException("Scheduler algorithm is null");
-    }
-    return CACHED_ALGORITHM_CLASS;
-  }
-
-  public static <T extends FragmentScheduleAlgorithm> T get(Class<T> clazz) {
-    T result;
-    try {
-      Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
-      if (constructor == null) {
-        constructor = clazz.getDeclaredConstructor(DEFAULT_PARAMS);
-        constructor.setAccessible(true);
-        CONSTRUCTOR_CACHE.put(clazz, constructor);
-      }
-      result = constructor.newInstance(new Object[]{});
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-    return result;
-  }
-
-  public static FragmentScheduleAlgorithm get(Configuration conf) throws IOException {
-    return get(getScheduleAlgorithmClass(conf));
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
deleted file mode 100644
index 56cf8e5..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
+++ /dev/null
@@ -1,429 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.common.base.Objects;
-import com.google.common.collect.Lists;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tajo.master.DefaultFragmentScheduleAlgorithm.FragmentsPerDisk;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.TUtil;
-
-import java.util.*;
-
-/**
- * GreedyFragmentScheduleAlgorithm selects a fragment considering the number of fragments that are not scheduled yet.
- * Disks of hosts have the priorities which are represented by the remaining number of fragments.
- * This algorithm selects a fragment with trying minimizing the maximum priority.
- */
-public class GreedyFragmentScheduleAlgorithm implements FragmentScheduleAlgorithm {
-  private final static Log LOG = LogFactory.getLog(GreedyFragmentScheduleAlgorithm.class);
-  private final HostPriorityComparator hostComparator = new HostPriorityComparator();
-  private Map<String, Map<Integer, FragmentsPerDisk>> fragmentHostMapping =
-      new HashMap<String, Map<Integer, FragmentsPerDisk>>();
-  private Map<HostAndDisk, PrioritizedHost> totalHostPriority = new HashMap<HostAndDisk, PrioritizedHost>();
-  private Map<String, Set<PrioritizedHost>> hostPriorityPerRack = new HashMap<String, Set<PrioritizedHost>>();
-  private TopologyCache topologyCache = new TopologyCache();
-  private int totalFragmentNum = 0;
-
-  private FragmentsPerDisk getHostFragmentSet(String host, Integer diskId) {
-    Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap;
-    FragmentsPerDisk fragmentsPerDisk;
-    if (fragmentHostMapping.containsKey(host)) {
-      fragmentsPerDiskMap = fragmentHostMapping.get(host);
-    } else {
-      fragmentsPerDiskMap = new HashMap<Integer, FragmentsPerDisk>();
-      fragmentHostMapping.put(host, fragmentsPerDiskMap);
-    }
-    if (fragmentsPerDiskMap.containsKey(diskId)) {
-      fragmentsPerDisk = fragmentsPerDiskMap.get(diskId);
-    } else {
-      fragmentsPerDisk = new FragmentsPerDisk(diskId);
-      fragmentsPerDiskMap.put(diskId, fragmentsPerDisk);
-    }
-    return fragmentsPerDisk;
-  }
-
-  private void updateHostPriority(HostAndDisk hostAndDisk, int priority) {
-    if (priority > 0) {
-      // update the priority among the total hosts
-      PrioritizedHost prioritizedHost;
-      if (totalHostPriority.containsKey(hostAndDisk)) {
-        prioritizedHost = totalHostPriority.get(hostAndDisk);
-        prioritizedHost.priority = priority;
-      } else {
-        prioritizedHost = new PrioritizedHost(hostAndDisk, priority);
-        totalHostPriority.put(hostAndDisk, prioritizedHost);
-      }
-
-      // update the priority among the hosts in a rack
-      String rack = topologyCache.resolve(hostAndDisk.host);
-      Set<PrioritizedHost> hostsOfRack;
-      if (!hostPriorityPerRack.containsKey(rack)) {
-        hostsOfRack = new HashSet<PrioritizedHost>();
-        hostsOfRack.add(prioritizedHost);
-        hostPriorityPerRack.put(rack, hostsOfRack);
-      }
-    } else {
-      if (totalHostPriority.containsKey(hostAndDisk)) {
-        PrioritizedHost prioritizedHost = totalHostPriority.remove(hostAndDisk);
-
-        String rack = topologyCache.resolve(hostAndDisk.host);
-        if (hostPriorityPerRack.containsKey(rack)) {
-          Set<PrioritizedHost> hostsOfRack = hostPriorityPerRack.get(rack);
-          hostsOfRack.remove(prioritizedHost);
-          if (hostsOfRack.size() == 0){
-            hostPriorityPerRack.remove(rack);
-          }
-        }
-      }
-    }
-  }
-
-  @Override
-  public void addFragment(FragmentPair fragmentPair) {
-    String[] hosts = fragmentPair.getLeftFragment().getHosts();
-    int[] diskIds = null;
-    if (fragmentPair.getLeftFragment() instanceof FileFragment) {
-      diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds();
-    }
-    for (int i = 0; i < hosts.length; i++) {
-      addFragment(hosts[i], diskIds != null ? diskIds[i] : -1, fragmentPair);
-    }
-    totalFragmentNum++;
-  }
-
-  private void addFragment(String host, Integer diskId, FragmentPair fragmentPair) {
-    host = topologyCache.normalize(host);
-    FragmentsPerDisk fragmentsPerDisk = getHostFragmentSet(host, diskId);
-    fragmentsPerDisk.addFragmentPair(fragmentPair);
-
-    int priority;
-    HostAndDisk hostAndDisk = new HostAndDisk(host, diskId);
-    if (totalHostPriority.containsKey(hostAndDisk)) {
-      priority = totalHostPriority.get(hostAndDisk).priority;
-    } else {
-      priority = 0;
-    }
-    updateHostPriority(hostAndDisk, priority+1);
-  }
-
-  public int size() {
-    return totalFragmentNum;
-  }
-
-  /**
-   * Selects a fragment that is stored in the given host, and replicated at the disk of the maximum
-   * priority.
-   * @param host
-   * @return If there are fragments stored in the host, returns a fragment. Otherwise, return null.
-   */
-  @Override
-  public FragmentPair getHostLocalFragment(String host) {
-    String normalizedHost = topologyCache.normalize(host);
-    if (!fragmentHostMapping.containsKey(normalizedHost)) {
-      return null;
-    }
-
-    Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap = fragmentHostMapping.get(normalizedHost);
-    List<Integer> disks = Lists.newArrayList(fragmentsPerDiskMap.keySet());
-    Collections.shuffle(disks);
-    FragmentsPerDisk fragmentsPerDisk = null;
-    FragmentPair fragmentPair = null;
-
-    for (Integer diskId : disks) {
-      fragmentsPerDisk = fragmentsPerDiskMap.get(diskId);
-      if (fragmentsPerDisk != null && !fragmentsPerDisk.isEmpty()) {
-        fragmentPair = getBestFragment(fragmentsPerDisk);
-      }
-      if (fragmentPair != null) {
-        return fragmentPair;
-      }
-    }
-
-    return null;
-  }
-
-  /**
-   * Selects a fragment that is stored at the given disk of the given host, and replicated at the disk of the maximum
-   * priority.
-   * @param host
-   * @param diskId
-   * @return If there are fragments stored at the disk of the host, returns a fragment. Otherwise, return null.
-   */
-  @Override
-  public FragmentPair getHostLocalFragment(String host, Integer diskId) {
-    String normalizedHost = NetUtils.normalizeHost(host);
-    if (fragmentHostMapping.containsKey(normalizedHost)) {
-      Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap = fragmentHostMapping.get(normalizedHost);
-      if (fragmentsPerDiskMap.containsKey(diskId)) {
-        FragmentsPerDisk fragmentsPerDisk = fragmentsPerDiskMap.get(diskId);
-        if (!fragmentsPerDisk.isEmpty()) {
-          return getBestFragment(fragmentsPerDisk);
-        }
-      }
-    }
-    return null;
-  }
-
-  /**
-   * In the descending order of priority, find a fragment that is shared by the given fragment set and the fragment set
-   * of the maximal priority.
-   * @param fragmentsPerDisk a fragment set
-   * @return a fragment that is shared by the given fragment set and the fragment set of the maximal priority
-   */
-  private FragmentPair getBestFragment(FragmentsPerDisk fragmentsPerDisk) {
-    // Select a fragment that is shared by host and another hostAndDisk that has the most fragments
-    Collection<PrioritizedHost> prioritizedHosts = totalHostPriority.values();
-    PrioritizedHost[] sortedHosts = prioritizedHosts.toArray(new PrioritizedHost[prioritizedHosts.size()]);
-    Arrays.sort(sortedHosts, hostComparator);
-
-    for (PrioritizedHost nextHost : sortedHosts) {
-      if (fragmentHostMapping.containsKey(nextHost.hostAndDisk.host)) {
-        Map<Integer, FragmentsPerDisk> diskFragmentsMap = fragmentHostMapping.get(nextHost.hostAndDisk.host);
-        if (diskFragmentsMap.containsKey(nextHost.hostAndDisk.diskId)) {
-          Set<FragmentPair> largeFragmentPairSet = diskFragmentsMap.get(nextHost.hostAndDisk.diskId).getFragmentPairSet();
-          Iterator<FragmentPair> smallFragmentSetIterator = fragmentsPerDisk.getFragmentPairIterator();
-          while (smallFragmentSetIterator.hasNext()) {
-            FragmentPair eachFragmentOfSmallSet = smallFragmentSetIterator.next();
-            if (largeFragmentPairSet.contains(eachFragmentOfSmallSet)) {
-              return eachFragmentOfSmallSet;
-            }
-          }
-        }
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Selects a fragment that is stored at the same rack of the given host, and replicated at the disk of the maximum
-   * priority.
-   * @param host
-   * @return If there are fragments stored at the same rack of the given host, returns a fragment. Otherwise, return null.
-   */
-  public FragmentPair getRackLocalFragment(String host) {
-    host = topologyCache.normalize(host);
-    // Select a fragment from a host that has the most fragments in the rack
-    String rack = topologyCache.resolve(host);
-    Set<PrioritizedHost> hostsOfRack = hostPriorityPerRack.get(rack);
-    if (hostsOfRack != null && hostsOfRack.size() > 0) {
-      PrioritizedHost[] sortedHosts = hostsOfRack.toArray(new PrioritizedHost[hostsOfRack.size()]);
-      Arrays.sort(sortedHosts, hostComparator);
-      for (PrioritizedHost nextHost : sortedHosts) {
-        if (fragmentHostMapping.containsKey(nextHost.hostAndDisk.host)) {
-          List<FragmentsPerDisk> disks = Lists.newArrayList(fragmentHostMapping.get(nextHost.hostAndDisk.host).values());
-          Collections.shuffle(disks);
-
-          for (FragmentsPerDisk fragmentsPerDisk : disks) {
-            if (!fragmentsPerDisk.isEmpty()) {
-              return fragmentsPerDisk.getFragmentPairIterator().next();
-            }
-          }
-        }
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Selects a fragment from the disk of the maximum priority.
-   * @return If there are remaining fragments, it returns a fragment. Otherwise, it returns null.
-   */
-  public FragmentPair getRandomFragment() {
-    // Select a fragment from a host that has the most fragments
-    Collection<PrioritizedHost> prioritizedHosts = totalHostPriority.values();
-    PrioritizedHost[] sortedHosts = prioritizedHosts.toArray(new PrioritizedHost[prioritizedHosts.size()]);
-    Arrays.sort(sortedHosts, hostComparator);
-    PrioritizedHost randomHost = sortedHosts[0];
-    if (fragmentHostMapping.containsKey(randomHost.hostAndDisk.host)) {
-      Iterator<FragmentsPerDisk> fragmentsPerDiskIterator = fragmentHostMapping.get(randomHost.hostAndDisk.host).values().iterator();
-      if (fragmentsPerDiskIterator.hasNext()) {
-        Iterator<FragmentPair> fragmentPairIterator = fragmentsPerDiskIterator.next().getFragmentPairIterator();
-        if (fragmentPairIterator.hasNext()) {
-          return fragmentPairIterator.next();
-        }
-      }
-    }
-    return null;
-  }
-
-  public FragmentPair[] getAllFragments() {
-    List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>();
-    for (Map<Integer, FragmentsPerDisk> eachValue : fragmentHostMapping.values()) {
-      for (FragmentsPerDisk fragmentsPerDisk : eachValue.values()) {
-        Set<FragmentPair> pairSet = fragmentsPerDisk.getFragmentPairSet();
-        fragmentPairs.addAll(pairSet);
-      }
-    }
-    return fragmentPairs.toArray(new FragmentPair[fragmentPairs.size()]);
-  }
-
-  public void removeFragment(FragmentPair fragmentPair) {
-    String [] hosts = fragmentPair.getLeftFragment().getHosts();
-    int[] diskIds = null;
-    if (fragmentPair.getLeftFragment() instanceof FileFragment) {
-      diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds();
-    }
-    for (int i = 0; i < hosts.length; i++) {
-      int diskId = diskIds == null ? -1 : diskIds[i];
-      String normalizedHost = NetUtils.normalizeHost(hosts[i]);
-      Map<Integer, FragmentsPerDisk> diskFragmentMap = fragmentHostMapping.get(normalizedHost);
-
-      if (diskFragmentMap != null) {
-        FragmentsPerDisk fragmentsPerDisk = diskFragmentMap.get(diskId);
-        if (fragmentsPerDisk != null) {
-          boolean isRemoved = fragmentsPerDisk.removeFragmentPair(fragmentPair);
-          if (isRemoved) {
-            if (fragmentsPerDisk.size() == 0) {
-              diskFragmentMap.remove(diskId);
-              if (diskFragmentMap.size() == 0) {
-                fragmentHostMapping.remove(normalizedHost);
-              }
-            }
-            HostAndDisk hostAndDisk = new HostAndDisk(normalizedHost, diskId);
-            if (totalHostPriority.containsKey(hostAndDisk)) {
-              PrioritizedHost prioritizedHost = totalHostPriority.get(hostAndDisk);
-              updateHostPriority(prioritizedHost.hostAndDisk, prioritizedHost.priority-1);
-            }
-          }
-        }
-      }
-    }
-
-    totalFragmentNum--;
-  }
-
-  private static class HostAndDisk {
-    private String host;
-    private Integer diskId;
-
-    public HostAndDisk(String host, Integer diskId) {
-      this.host = host;
-      this.diskId = diskId;
-    }
-
-    public String getHost() {
-      return host;
-    }
-
-    public int getDiskId() {
-      return diskId;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(host, diskId);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (o instanceof HostAndDisk) {
-        HostAndDisk other = (HostAndDisk) o;
-        return this.host.equals(other.host) &&
-            TUtil.checkEquals(this.diskId, other.diskId);
-      }
-      return false;
-    }
-  }
-
-  public static class PrioritizedHost {
-    private HostAndDisk hostAndDisk;
-    private int priority;
-
-    public PrioritizedHost(HostAndDisk hostAndDisk, int priority) {
-      this.hostAndDisk = hostAndDisk;
-      this.priority = priority;
-    }
-
-    public PrioritizedHost(String host, Integer diskId, int priority) {
-      this.hostAndDisk = new HostAndDisk(host, diskId);
-      this.priority = priority;
-    }
-
-    public String getHost() {
-      return hostAndDisk.host;
-    }
-
-    public Integer getDiskId() {
-      return hostAndDisk.diskId;
-    }
-
-    public Integer getPriority() {
-      return priority;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (o instanceof PrioritizedHost) {
-        PrioritizedHost other = (PrioritizedHost) o;
-        return this.hostAndDisk.equals(other.hostAndDisk);
-      }
-      return false;
-    }
-
-    @Override
-    public int hashCode() {
-      return hostAndDisk.hashCode();
-    }
-
-    @Override
-    public String toString() {
-      return "host: " + hostAndDisk.host + " disk: " + hostAndDisk.diskId + " priority: " + priority;
-    }
-  }
-
-
-  public static class HostPriorityComparator implements Comparator<PrioritizedHost> {
-
-    @Override
-    public int compare(PrioritizedHost prioritizedHost, PrioritizedHost prioritizedHost2) {
-      return prioritizedHost2.priority - prioritizedHost.priority;
-    }
-  }
-
-
-  public static class TopologyCache {
-    private Map<String, String> hostRackMap = new HashMap<String, String>();
-    private Map<String, String> normalizedHostMap = new HashMap<String, String>();
-
-    public String normalize(String host) {
-      if (normalizedHostMap.containsKey(host)) {
-        return normalizedHostMap.get(host);
-      } else {
-        String normalized = NetUtils.normalizeHost(host);
-        normalizedHostMap.put(host, normalized);
-        return normalized;
-      }
-    }
-
-    public String resolve(String host) {
-      if (hostRackMap.containsKey(host)) {
-        return hostRackMap.get(host);
-      } else {
-        String rack = RackResolver.resolve(host).getNetworkLocation();
-        hostRackMap.put(host, rack);
-        return rack;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
deleted file mode 100644
index 32af17b..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
+++ /dev/null
@@ -1,529 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.engine.planner.global.ExecutionBlock;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.query.TaskRequest;
-import org.apache.tajo.engine.query.TaskRequestImpl;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
-import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
-import org.apache.tajo.master.querymaster.Task;
-import org.apache.tajo.master.querymaster.TaskAttempt;
-import org.apache.tajo.master.querymaster.Stage;
-import org.apache.tajo.master.container.TajoContainerId;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.worker.FetchImpl;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-
-public class LazyTaskScheduler extends AbstractTaskScheduler {
-  private static final Log LOG = LogFactory.getLog(LazyTaskScheduler.class);
-
-  private final TaskSchedulerContext context;
-  private final Stage stage;
-
-  private Thread schedulingThread;
-  private volatile boolean stopEventHandling;
-
-  BlockingQueue<TaskSchedulerEvent> eventQueue
-      = new LinkedBlockingQueue<TaskSchedulerEvent>();
-
-  private TaskRequests taskRequests;
-  private FragmentScheduleAlgorithm scheduledFragments;
-  private ScheduledFetches scheduledFetches;
-
-  private int diskLocalAssigned = 0;
-  private int hostLocalAssigned = 0;
-  private int rackLocalAssigned = 0;
-  private int totalAssigned = 0;
-
-  private int nextTaskId = 0;
-  private int containerNum;
-
-  public LazyTaskScheduler(TaskSchedulerContext context, Stage stage) {
-    super(LazyTaskScheduler.class.getName());
-    this.context = context;
-    this.stage = stage;
-  }
-
-  @Override
-  public void init(Configuration conf) {
-    taskRequests  = new TaskRequests();
-    try {
-      scheduledFragments = FragmentScheduleAlgorithmFactory.get(conf);
-      LOG.info(scheduledFragments.getClass().getSimpleName() + " is selected for the scheduling algorithm.");
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    if (!context.isLeafQuery()) {
-      scheduledFetches = new ScheduledFetches();
-    }
-
-    super.init(conf);
-  }
-
-  @Override
-  public void start() {
-    containerNum = stage.getContext().getResourceAllocator().calculateNumRequestContainers(
-        stage.getContext().getQueryMasterContext().getWorkerContext(),
-        context.getEstimatedTaskNum(), 512);
-
-    LOG.info("Start TaskScheduler");
-    this.schedulingThread = new Thread() {
-      public void run() {
-
-        while(!stopEventHandling && !Thread.currentThread().isInterrupted()) {
-          try {
-            Thread.sleep(100);
-          } catch (InterruptedException e) {
-            break;
-          }
-
-          schedule();
-        }
-        LOG.info("TaskScheduler schedulingThread stopped");
-      }
-    };
-
-    this.schedulingThread.start();
-    super.start();
-  }
-
-  private static final TaskAttemptId NULL_ATTEMPT_ID;
-  public static final TajoWorkerProtocol.TaskRequestProto stopTaskRunnerReq;
-  static {
-    ExecutionBlockId nullStage = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
-    NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, 0), 0);
-
-    TajoWorkerProtocol.TaskRequestProto.Builder builder =
-        TajoWorkerProtocol.TaskRequestProto.newBuilder();
-    builder.setId(NULL_ATTEMPT_ID.getProto());
-    builder.setShouldDie(true);
-    builder.setOutputTable("");
-    builder.setSerializedData("");
-    builder.setClusteredOutput(false);
-    stopTaskRunnerReq = builder.build();
-  }
-
-  @Override
-  public void stop() {
-    stopEventHandling = true;
-    schedulingThread.interrupt();
-
-    // Return all of request callbacks instantly.
-    for (TaskRequestEvent req : taskRequests.taskRequestQueue) {
-      req.getCallback().run(stopTaskRunnerReq);
-    }
-
-    LOG.info("Task Scheduler stopped");
-    super.stop();
-  }
-
-  List<TaskRequestEvent> taskRequestEvents = new ArrayList<TaskRequestEvent>();
-  public void schedule() {
-    if (taskRequests.size() > 0) {
-      if (context.isLeafQuery()) {
-        LOG.debug("Try to schedule tasks with taskRequestEvents: " +
-            taskRequests.size() + ", Fragment Schedule Request: " +
-            scheduledFragments.size());
-        taskRequests.getTaskRequests(taskRequestEvents,
-            scheduledFragments.size());
-        LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents ");
-        if (taskRequestEvents.size() > 0) {
-          assignLeafTasks(taskRequestEvents);
-        }
-        taskRequestEvents.clear();
-      } else {
-        LOG.debug("Try to schedule tasks with taskRequestEvents: " +
-            taskRequests.size() + ", Fetch Schedule Request: " +
-            scheduledFetches.size());
-        taskRequests.getTaskRequests(taskRequestEvents,
-            scheduledFetches.size());
-        LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents ");
-        if (taskRequestEvents.size() > 0) {
-          assignNonLeafTasks(taskRequestEvents);
-        }
-        taskRequestEvents.clear();
-      }
-    }
-  }
-
-  @Override
-  public void handle(TaskSchedulerEvent event) {
-    int qSize = eventQueue.size();
-    if (qSize != 0 && qSize % 1000 == 0) {
-      LOG.info("Size of event-queue in DefaultTaskScheduler is " + qSize);
-    }
-    int remCapacity = eventQueue.remainingCapacity();
-    if (remCapacity < 1000) {
-      LOG.warn("Very low remaining capacity in the event-queue "
-          + "of DefaultTaskScheduler: " + remCapacity);
-    }
-
-    if (event.getType() == EventType.T_SCHEDULE) {
-      if (event instanceof FragmentScheduleEvent) {
-        FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event;
-        Collection<Fragment> rightFragments = castEvent.getRightFragments();
-        if (rightFragments == null || rightFragments.isEmpty()) {
-          scheduledFragments.addFragment(new FragmentPair(castEvent.getLeftFragment(), null));
-        } else {
-          for (Fragment eachFragment: rightFragments) {
-            scheduledFragments.addFragment(new FragmentPair(castEvent.getLeftFragment(), eachFragment));
-          }
-        }
-        if (castEvent.getLeftFragment() instanceof FileFragment) {
-          initDiskBalancer(castEvent.getLeftFragment().getHosts(), ((FileFragment)castEvent.getLeftFragment()).getDiskIds());
-        }
-      } else if (event instanceof FetchScheduleEvent) {
-        FetchScheduleEvent castEvent = (FetchScheduleEvent) event;
-        scheduledFetches.addFetch(castEvent.getFetches());
-      } else if (event instanceof TaskAttemptToSchedulerEvent) {
-        TaskAttemptToSchedulerEvent castEvent = (TaskAttemptToSchedulerEvent) event;
-        assignTask(castEvent.getContext(), castEvent.getTaskAttempt());
-      }
-    }
-  }
-
-  public void handleTaskRequestEvent(TaskRequestEvent event) {
-    taskRequests.handle(event);
-  }
-
-  @Override
-  public int remainingScheduledObjectNum() {
-    if (context.isLeafQuery()) {
-      return scheduledFragments.size();
-    } else {
-      return scheduledFetches.size();
-    }
-  }
-
-  private Map<String, DiskBalancer> hostDiskBalancerMap = new HashMap<String, DiskBalancer>();
-
-  private void initDiskBalancer(String[] hosts, int[] diskIds) {
-    for (int i = 0; i < hosts.length; i++) {
-      DiskBalancer diskBalancer;
-      String normalized = NetUtils.normalizeHost(hosts[i]);
-      if (hostDiskBalancerMap.containsKey(normalized)) {
-        diskBalancer = hostDiskBalancerMap.get(normalized);
-      } else {
-        diskBalancer = new DiskBalancer(normalized);
-        hostDiskBalancerMap.put(normalized, diskBalancer);
-      }
-      diskBalancer.addDiskId(diskIds[i]);
-    }
-  }
-
-  private static class DiskBalancer {
-    private HashMap<TajoContainerId, Integer> containerDiskMap = new HashMap<TajoContainerId,
-      Integer>();
-    private HashMap<Integer, Integer> diskReferMap = new HashMap<Integer, Integer>();
-    private String host;
-
-    public DiskBalancer(String host){
-      this.host = host;
-    }
-
-    public void addDiskId(Integer diskId) {
-      if (!diskReferMap.containsKey(diskId)) {
-        diskReferMap.put(diskId, 0);
-      }
-    }
-
-    public Integer getDiskId(TajoContainerId containerId) {
-      if (!containerDiskMap.containsKey(containerId)) {
-        assignVolumeId(containerId);
-      }
-
-      return containerDiskMap.get(containerId);
-    }
-
-    public void assignVolumeId(TajoContainerId containerId){
-      Map.Entry<Integer, Integer> volumeEntry = null;
-
-      for (Map.Entry<Integer, Integer> entry : diskReferMap.entrySet()) {
-        if(volumeEntry == null) volumeEntry = entry;
-
-        if (volumeEntry.getValue() >= entry.getValue()) {
-          volumeEntry = entry;
-        }
-      }
-
-      if(volumeEntry != null){
-        diskReferMap.put(volumeEntry.getKey(), volumeEntry.getValue() + 1);
-        LOG.info("Assigned host : " + host + " Volume : " + volumeEntry.getKey() + ", Concurrency : "
-            + diskReferMap.get(volumeEntry.getKey()));
-        containerDiskMap.put(containerId, volumeEntry.getKey());
-      }
-    }
-
-    public String getHost() {
-      return host;
-    }
-  }
-
-  private class TaskRequests implements EventHandler<TaskRequestEvent> {
-    private final LinkedBlockingQueue<TaskRequestEvent> taskRequestQueue =
-        new LinkedBlockingQueue<TaskRequestEvent>();
-
-    @Override
-    public void handle(TaskRequestEvent event) {
-      LOG.info("TaskRequest: " + event.getContainerId() + "," + event.getExecutionBlockId());
-      if(stopEventHandling) {
-        event.getCallback().run(stopTaskRunnerReq);
-        return;
-      }
-      int qSize = taskRequestQueue.size();
-      if (qSize != 0 && qSize % 1000 == 0) {
-        LOG.info("Size of event-queue in DefaultTaskScheduler is " + qSize);
-      }
-      int remCapacity = taskRequestQueue.remainingCapacity();
-      if (remCapacity < 1000) {
-        LOG.warn("Very low remaining capacity in the event-queue "
-            + "of DefaultTaskScheduler: " + remCapacity);
-      }
-
-      taskRequestQueue.add(event);
-    }
-
-    public void getTaskRequests(final Collection<TaskRequestEvent> taskRequests,
-                                int num) {
-      taskRequestQueue.drainTo(taskRequests, num);
-    }
-
-    public int size() {
-      return taskRequestQueue.size();
-    }
-  }
-
-  private long adjustTaskSize() {
-    long originTaskSize = context.getMasterContext().getConf().getIntVar(ConfVars.TASK_DEFAULT_SIZE) * 1024 * 1024;
-    long fragNumPerTask = context.getTaskSize() / originTaskSize;
-    if (fragNumPerTask * containerNum > remainingScheduledObjectNum()) {
-      return context.getTaskSize();
-    } else {
-      fragNumPerTask = (long) Math.ceil((double)remainingScheduledObjectNum() / (double)containerNum);
-      return originTaskSize * fragNumPerTask;
-    }
-  }
-
-  private void assignLeafTasks(List<TaskRequestEvent> taskRequests) {
-    Collections.shuffle(taskRequests);
-    Iterator<TaskRequestEvent> it = taskRequests.iterator();
-
-    TaskRequestEvent taskRequest;
-    while (it.hasNext() && scheduledFragments.size() > 0) {
-      taskRequest = it.next();
-      LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
-          "containerId=" + taskRequest.getContainerId());
-      ContainerProxy container = context.getMasterContext().getResourceAllocator().
-          getContainer(taskRequest.getContainerId());
-
-      if(container == null) {
-        continue;
-      }
-
-      String host = container.getTaskHostName();
-      TaskAttemptScheduleContext taskContext = new TaskAttemptScheduleContext(container.containerID,
-          host, taskRequest.getCallback());
-      Task task = Stage.newEmptyTask(context, taskContext, stage, nextTaskId++);
-
-      FragmentPair fragmentPair;
-      List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>();
-      boolean diskLocal = false;
-      long assignedFragmentSize = 0;
-      long taskSize = adjustTaskSize();
-      LOG.info("Adjusted task size: " + taskSize);
-
-      TajoConf conf = stage.getContext().getConf();
-      // host local, disk local
-      String normalized = NetUtils.normalizeHost(host);
-      Integer diskId = hostDiskBalancerMap.get(normalized).getDiskId(container.containerID);
-      if (diskId != null && diskId != -1) {
-        do {
-          fragmentPair = scheduledFragments.getHostLocalFragment(host, diskId);
-          if (fragmentPair == null || fragmentPair.getLeftFragment() == null) {
-            break;
-          }
-
-          if (assignedFragmentSize +
-              StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment()) > taskSize) {
-            break;
-          } else {
-            fragmentPairs.add(fragmentPair);
-            assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment());
-            if (fragmentPair.getRightFragment() != null) {
-              assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getRightFragment());
-            }
-          }
-          scheduledFragments.removeFragment(fragmentPair);
-          diskLocal = true;
-        } while (scheduledFragments.size() > 0 && assignedFragmentSize < taskSize);
-      }
-
-      if (assignedFragmentSize < taskSize) {
-        // host local
-        do {
-          fragmentPair = scheduledFragments.getHostLocalFragment(host);
-          if (fragmentPair == null || fragmentPair.getLeftFragment() == null) {
-            break;
-          }
-
-          if (assignedFragmentSize +
-              StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment()) > taskSize) {
-            break;
-          } else {
-            fragmentPairs.add(fragmentPair);
-            assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment());
-            if (fragmentPair.getRightFragment() != null) {
-              assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getRightFragment());
-            }
-          }
-          scheduledFragments.removeFragment(fragmentPair);
-          diskLocal = false;
-        } while (scheduledFragments.size() > 0 && assignedFragmentSize < taskSize);
-      }
-
-      // rack local
-      if (fragmentPairs.size() == 0) {
-        fragmentPair = scheduledFragments.getRackLocalFragment(host);
-
-        // random
-        if (fragmentPair == null) {
-          fragmentPair = scheduledFragments.getRandomFragment();
-        } else {
-          rackLocalAssigned++;
-        }
-
-        if (fragmentPair != null) {
-          fragmentPairs.add(fragmentPair);
-          scheduledFragments.removeFragment(fragmentPair);
-        }
-      } else {
-        if (diskLocal) {
-          diskLocalAssigned++;
-        } else {
-          hostLocalAssigned++;
-        }
-      }
-
-      if (fragmentPairs.size() == 0) {
-        throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!");
-      }
-
-      LOG.info("host: " + host + " disk id: " + diskId + " fragment num: " + fragmentPairs.size());
-
-      task.setFragment(fragmentPairs.toArray(new FragmentPair[fragmentPairs.size()]));
-      stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
-    }
-  }
-
-  private void assignNonLeafTasks(List<TaskRequestEvent> taskRequests) {
-    Iterator<TaskRequestEvent> it = taskRequests.iterator();
-
-    TaskRequestEvent taskRequest;
-    while (it.hasNext()) {
-      taskRequest = it.next();
-      LOG.debug("assignToNonLeafTasks: " + taskRequest.getExecutionBlockId());
-
-      // random allocation
-      if (scheduledFetches.size() > 0) {
-        LOG.debug("Assigned based on * match");
-        ContainerProxy container = context.getMasterContext().getResourceAllocator().getContainer(
-            taskRequest.getContainerId());
-        TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext(container.containerID,
-            container.getTaskHostName(), taskRequest.getCallback());
-        Task task = Stage.newEmptyTask(context, taskScheduleContext, stage, nextTaskId++);
-        task.setFragment(scheduledFragments.getAllFragments());
-        stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
-      }
-    }
-  }
-
-  private void assignTask(TaskAttemptScheduleContext attemptContext, TaskAttempt taskAttempt) {
-    TaskAttemptId attemptId = taskAttempt.getId();
-    TaskRequest taskAssign = new TaskRequestImpl(
-        attemptId,
-        new ArrayList<FragmentProto>(taskAttempt.getTask().getAllFragments()),
-        "",
-        false,
-        taskAttempt.getTask().getLogicalPlan().toJson(),
-        context.getMasterContext().getQueryContext(),
-        stage.getDataChannel(), stage.getBlock().getEnforcer());
-    if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) {
-      taskAssign.setInterQuery();
-    }
-
-    if (!context.isLeafQuery()) {
-      Map<String, List<FetchImpl>> fetch = scheduledFetches.getNextFetch();
-      scheduledFetches.popNextFetch();
-
-      for (Entry<String, List<FetchImpl>> fetchEntry : fetch.entrySet()) {
-        for (FetchImpl eachValue : fetchEntry.getValue()) {
-          taskAssign.addFetch(fetchEntry.getKey(), eachValue);
-        }
-      }
-    }
-
-    context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
-        attemptContext.getContainerId(), taskAttempt.getWorkerConnectionInfo()));
-
-    totalAssigned++;
-    attemptContext.getCallback().run(taskAssign.getProto());
-
-    if (context.isLeafQuery()) {
-      LOG.debug("DiskLocalAssigned / Total: " + diskLocalAssigned + " / " + totalAssigned);
-      LOG.debug("HostLocalAssigned / Total: " + hostLocalAssigned + " / " + totalAssigned);
-      LOG.debug("RackLocalAssigned: " + rackLocalAssigned + " / " + totalAssigned);
-    }
-  }
-
-  private boolean checkIfInterQuery(MasterPlan masterPlan, ExecutionBlock block) {
-    if (masterPlan.isRoot(block)) {
-      return false;
-    }
-
-    ExecutionBlock parent = masterPlan.getParent(block);
-    if (masterPlan.isRoot(parent) && parent.hasUnion()) {
-      return false;
-    }
-
-    return true;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index c2e1009..9f7d3f8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -30,9 +30,9 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.LazyTaskScheduler;
-import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.DefaultTaskScheduler;
 import org.apache.tajo.master.container.TajoContainerId;
+import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.session.Session;
 import org.apache.tajo.rpc.AsyncRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
@@ -128,7 +128,7 @@ public class QueryMasterManagerService extends CompositeService
       QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId());
 
       if(queryMasterTask == null || queryMasterTask.isStopped()) {
-        done.run(LazyTaskScheduler.stopTaskRunnerReq);
+        done.run(DefaultTaskScheduler.stopTaskRunnerReq);
       } else {
         TajoContainerId cid =
             queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());

http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/tajo-default.xml b/tajo-core/src/main/resources/tajo-default.xml
index c49e8e5..db92b02 100644
--- a/tajo-core/src/main/resources/tajo-default.xml
+++ b/tajo-core/src/main/resources/tajo-default.xml
@@ -42,9 +42,4 @@
     <value>org.apache.tajo.master.DefaultTaskScheduler</value>
   </property>
 
-  <property>
-    <name>tajo.querymaster.lazy-task-scheduler.algorithm</name>
-    <value>org.apache.tajo.master.GreedyFragmentScheduleAlgorithm</value>
-  </property>
-
-</configuration>
\ No newline at end of file
+</configuration>


Mime
View raw message