tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [30/51] [partial] tajo git commit: TAJO-1761: Separate an integration unit test kit into an independent module.
Date Fri, 14 Aug 2015 14:30:09 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
new file mode 100644
index 0000000..b139645
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.benchmark.TPCH;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
+import org.apache.tajo.engine.planner.global.GlobalPlanner;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.plan.LogicalOptimizer;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.junit.Assert.assertEquals;
+
+public class TestExecutionBlockCursor {
+  private static TajoTestingCluster util;
+  private static TajoConf conf;
+  private static CatalogService catalog;
+  private static GlobalPlanner planner;
+  private static SQLAnalyzer analyzer;
+  private static LogicalPlanner logicalPlanner;
+  private static LogicalOptimizer optimizer;
+  private static AsyncDispatcher dispatcher;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.startCatalogCluster();
+
+    conf = util.getConfiguration();
+    conf.set(TajoConf.ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname, "false");
+
+    catalog = util.getMiniCatalogCluster().getCatalog();
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, "hdfs://localhost:!234/warehouse");
+    catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+    TPCH tpch = new TPCH();
+    tpch.loadSchemas();
+    tpch.loadOutSchema();
+    for (String table : tpch.getTableNames()) {
+      TableMeta m = CatalogUtil.newTableMeta("TEXT");
+      TableDesc d = CatalogUtil.newTableDesc(
+          CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, table), tpch.getSchema(table), m, CommonTestingUtil.getTestDir());
+      TableStats stats = new TableStats();
+      stats.setNumBytes(TPCH.tableVolumes.get(table));
+      d.setStats(stats);
+      catalog.createTable(d);
+    }
+
+    analyzer = new SQLAnalyzer();
+    logicalPlanner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
+    optimizer = new LogicalOptimizer(conf, catalog);
+
+    dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    planner = new GlobalPlanner(conf, catalog);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    util.shutdownCatalogCluster();
+    if (dispatcher != null) {
+      dispatcher.stop();
+    }
+  }
+
+  @Test
+  public void testNextBlock() throws Exception {
+    Expr context = analyzer.parse(
+        "select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment, ps_supplycost, " +
+            "r_name, p_type, p_size " +
+            "from region join nation on n_regionkey = r_regionkey and r_name = 'AMERICA' " +
+            "join supplier on s_nationkey = n_nationkey " +
+            "join partsupp on s_suppkey = ps_suppkey " +
+            "join part on p_partkey = ps_partkey and p_type like '%BRASS' and p_size = 15");
+    LogicalPlan logicalPlan = logicalPlanner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), context);
+    optimizer.optimize(logicalPlan);
+    QueryContext queryContext = new QueryContext(conf);
+    MasterPlan plan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), queryContext, logicalPlan);
+    planner.build(queryContext, plan);
+
+    ExecutionBlockCursor cursor = new ExecutionBlockCursor(plan);
+
+    int count = 0;
+    for (ExecutionBlock eb : cursor) {
+      count++;
+    }
+
+    /*
+     |-eb_0000000000000_0001_000010
+       |-eb_0000000000000_0001_000009
+          |-eb_0000000000000_0001_000008
+          |-eb_0000000000000_0001_000007
+             |-eb_0000000000000_0001_000006
+                |-eb_0000000000000_0001_000005
+                |-eb_0000000000000_0001_000004
+             |-eb_0000000000000_0001_000003
+                |-eb_0000000000000_0001_000002
+                |-eb_0000000000000_0001_000001
+     */
+    assertEquals(10, count);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
new file mode 100644
index 0000000..207f64d
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.tajo.QueryTestCaseBase;
+import org.junit.Test;
+
+public class TestNonForwardQueryResultSystemScanner extends QueryTestCaseBase {
+  @Test
+  public void testGetNextRowsForAggregateFunction() throws Exception {
+    assertQueryStr("SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES " +
+        "WHERE TABLE_NAME = 'lineitem' OR TABLE_NAME = 'nation' OR TABLE_NAME = 'customer'");
+  }
+
+  @Test
+  public void testGetNextRowsForTable() throws Exception {
+    assertQueryStr("SELECT TABLE_NAME, TABLE_TYPE FROM INFORMATION_SCHEMA.TABLES " +
+        "WHERE TABLE_NAME = 'lineitem' OR TABLE_NAME = 'nation' OR TABLE_NAME = 'customer'");
+  }
+
+  @Test
+  public void testGetClusterDetails() throws Exception {
+    assertQueryStr("SELECT TYPE FROM INFORMATION_SCHEMA.CLUSTER");
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
new file mode 100644
index 0000000..36942a4
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -0,0 +1,511 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.ResourceProtos.FetchProto;
+import org.apache.tajo.TestTajoIds;
+import org.apache.tajo.querymaster.Repartitioner;
+import org.apache.tajo.querymaster.Task;
+import org.apache.tajo.querymaster.Task.IntermediateEntry;
+import org.apache.tajo.util.Pair;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.FetchImpl;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.*;
+
+import static junit.framework.Assert.assertEquals;
+import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
+import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.HASH_SHUFFLE;
+import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.SCATTERED_HASH_SHUFFLE;
+import static org.apache.tajo.querymaster.Repartitioner.FetchGroupMeta;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestRepartitioner {
+  @Test
+  public void testCreateHashFetchURL() throws Exception {
+    QueryId q1 = TestTajoIds.createQueryId(1315890136000l, 2);
+    String hostName = "tajo1";
+    int port = 1234;
+    ExecutionBlockId sid = new ExecutionBlockId(q1, 2);
+    int numPartition = 10;
+
+    Map<Integer, List<IntermediateEntry>> intermediateEntries = new HashMap<Integer, List<IntermediateEntry>>();
+    for (int i = 0; i < numPartition; i++) {
+      intermediateEntries.put(i, new ArrayList<IntermediateEntry>());
+    }
+    for (int i = 0; i < 1000; i++) {
+      int partitionId = i % numPartition;
+      IntermediateEntry entry = new IntermediateEntry(i, 0, partitionId, new Task.PullHost(hostName, port));
+      entry.setEbId(sid);
+      entry.setVolume(10);
+      intermediateEntries.get(partitionId).add(entry);
+    }
+
+    Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> hashEntries =
+        new HashMap<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>>();
+
+    for (Map.Entry<Integer, List<IntermediateEntry>> eachEntry: intermediateEntries.entrySet()) {
+      FetchImpl fetch = new FetchImpl(new Task.PullHost(hostName, port), ShuffleType.HASH_SHUFFLE,
+          sid, eachEntry.getKey(), eachEntry.getValue());
+
+      fetch.setName(sid.toString());
+
+      FetchProto proto = fetch.getProto();
+      fetch = new FetchImpl(proto);
+      assertEquals(proto, fetch.getProto());
+
+      Map<ExecutionBlockId, List<IntermediateEntry>> ebEntries = new HashMap<ExecutionBlockId, List<IntermediateEntry>>();
+      ebEntries.put(sid, eachEntry.getValue());
+
+      hashEntries.put(eachEntry.getKey(), ebEntries);
+
+      List<URI> uris = fetch.getURIs();
+      assertEquals(1, uris.size());   //In Hash Suffle, Fetcher return only one URI per partition.
+
+      URI uri = uris.get(0);
+      final Map<String, List<String>> params =
+          new QueryStringDecoder(uri).parameters();
+
+      assertEquals(eachEntry.getKey().toString(), params.get("p").get(0));
+      assertEquals("h", params.get("type").get(0));
+      assertEquals("" + sid.getId(), params.get("sid").get(0));
+    }
+
+    Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> mergedHashEntries =
+        Repartitioner.mergeIntermediateByPullHost(hashEntries);
+
+    assertEquals(numPartition, mergedHashEntries.size());
+    for (int i = 0; i < numPartition; i++) {
+      Map<ExecutionBlockId, List<IntermediateEntry>> eachEntry = mergedHashEntries.get(0);
+      assertEquals(1, eachEntry.size());
+      List<IntermediateEntry> interEntry = eachEntry.get(sid);
+      assertEquals(1, interEntry.size());
+
+      assertEquals(1000, interEntry.get(0).getVolume());
+    }
+  }
+
+  @Test
+  public void testScheduleFetchesByEvenDistributedVolumes() {
+    Map<Integer, FetchGroupMeta> fetchGroups = Maps.newHashMap();
+    String tableName = "test1";
+
+
+    ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+    FetchImpl [] fetches = new FetchImpl[12];
+    for (int i = 0; i < 12; i++) {
+      fetches[i] = new FetchImpl(new Task.PullHost("localhost", 10000 + i), HASH_SHUFFLE, ebId, i / 2);
+    }
+
+    int [] VOLUMES = {100, 80, 70, 30, 10, 5};
+
+    for (int i = 0; i < 12; i += 2) {
+      fetchGroups.put(i, new FetchGroupMeta(VOLUMES[i / 2], fetches[i]).addFetche(fetches[i + 1]));
+    }
+
+    Pair<Long [], Map<String, List<FetchImpl>>[]> results;
+
+    results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 1);
+    long expected [] = {100 + 80 + 70 + 30 + 10 + 5};
+    assertFetchVolumes(expected, results.getFirst());
+    assertFetchImpl(fetches, results.getSecond());
+
+    results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 2);
+    long expected0 [] = {140, 155};
+    assertFetchVolumes(expected0, results.getFirst());
+    assertFetchImpl(fetches, results.getSecond());
+
+    results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 3);
+    long expected1 [] = {100, 95, 100};
+    assertFetchVolumes(expected1, results.getFirst());
+    assertFetchImpl(fetches, results.getSecond());
+
+    results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 4);
+    long expected2 [] = {100, 80, 70, 45};
+    assertFetchVolumes(expected2, results.getFirst());
+    assertFetchImpl(fetches, results.getSecond());
+
+    results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 5);
+    long expected3 [] = {100, 80, 70, 30, 15};
+    assertFetchVolumes(expected3, results.getFirst());
+    assertFetchImpl(fetches, results.getSecond());
+
+    results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 6);
+    long expected4 [] = {100, 80, 70, 30, 10, 5};
+    assertFetchVolumes(expected4, results.getFirst());
+    assertFetchImpl(fetches, results.getSecond());
+  }
+
+  private static void assertFetchVolumes(long [] expected, Long [] results) {
+    assertEquals("the lengths of volumes are mismatch", expected.length, results.length);
+
+    for (int i = 0; i < expected.length; i++) {
+      assertTrue(expected[i] + " is expected, but " + results[i], expected[i] == results[i]);
+    }
+  }
+
+  @Test
+  public void testMergeIntermediates() {
+    //Test Merge
+    List<IntermediateEntry> intermediateEntries = new ArrayList<IntermediateEntry>();
+
+    int[] pageLengths = {10 * 1024 * 1024, 10 * 1024 * 1024, 10 * 1024 * 1024, 5 * 1024 * 1024};   //35 MB
+    long expectedTotalLength = 0;
+    for (int i = 0; i < 20; i++) {
+      List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
+      long offset = 0;
+      for (int j = 0; j < pageLengths.length; j++) {
+        pages.add(new Pair(offset, pageLengths[j]));
+        offset += pageLengths[j];
+        expectedTotalLength += pageLengths[j];
+      }
+      IntermediateEntry interm = new IntermediateEntry(i, -1, -1, new Task.PullHost("" + i, i));
+      interm.setPages(pages);
+      interm.setVolume(offset);
+      intermediateEntries.add(interm);
+    }
+
+    long splitVolume = 128 * 1024 * 1024;
+    List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
+        splitVolume, 10 * 1024 * 1024);
+    assertEquals(6, fetches.size());
+
+    int totalInterms = 0;
+    int index = 0;
+    int numZeroPosFetcher = 0;
+    long totalLength = 0;
+    for (List<FetchImpl> eachFetchList: fetches) {
+      totalInterms += eachFetchList.size();
+      long eachFetchVolume = 0;
+      for (FetchImpl eachFetch: eachFetchList) {
+        eachFetchVolume += eachFetch.getLength();
+        if (eachFetch.getOffset() == 0) {
+          numZeroPosFetcher++;
+        }
+        totalLength += eachFetch.getLength();
+      }
+      assertTrue(eachFetchVolume + " should be smaller than splitVolume", eachFetchVolume < splitVolume);
+      if (index < fetches.size() - 1) {
+        assertTrue(eachFetchVolume + " should be great than 100MB", eachFetchVolume >= 100 * 1024 * 1024);
+      }
+      index++;
+    }
+    assertEquals(23, totalInterms);
+    assertEquals(20, numZeroPosFetcher);
+    assertEquals(expectedTotalLength, totalLength);
+  }
+
+  @Test
+  public void testSplitIntermediates() {
+    List<IntermediateEntry> intermediateEntries = new ArrayList<IntermediateEntry>();
+
+    int[] pageLengths = new int[20];  //195MB
+    for (int i = 0 ; i < pageLengths.length; i++) {
+      if (i < pageLengths.length - 1) {
+        pageLengths[i] =  10 * 1024 * 1024;
+      } else {
+        pageLengths[i] =  5 * 1024 * 1024;
+      }
+    }
+
+    long expectedTotalLength = 0;
+    for (int i = 0; i < 20; i++) {
+      List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
+      long offset = 0;
+      for (int j = 0; j < pageLengths.length; j++) {
+        pages.add(new Pair(offset, pageLengths[j]));
+        offset += pageLengths[j];
+        expectedTotalLength += pageLengths[j];
+      }
+      IntermediateEntry interm = new IntermediateEntry(i, -1, 0, new Task.PullHost("" + i, i));
+      interm.setPages(pages);
+      interm.setVolume(offset);
+      intermediateEntries.add(interm);
+    }
+
+    long splitVolume = 128 * 1024 * 1024;
+    List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
+        splitVolume, 10 * 1024 * 1024);
+    assertEquals(32, fetches.size());
+
+    int index = 0;
+    int numZeroPosFetcher = 0;
+    long totalLength = 0;
+    Set<String> uniqPullHost = new HashSet<String>();
+
+    for (List<FetchImpl> eachFetchList: fetches) {
+      long length = 0;
+      for (FetchImpl eachFetch: eachFetchList) {
+        if (eachFetch.getOffset() == 0) {
+          numZeroPosFetcher++;
+        }
+        totalLength += eachFetch.getLength();
+        length += eachFetch.getLength();
+        uniqPullHost.add(eachFetch.getPullHost().toString());
+      }
+      assertTrue(length + " should be smaller than splitVolume", length < splitVolume);
+      if (index < fetches.size() - 1) {
+        assertTrue(length + " should be great than 100MB" + fetches.size() + "," + index, length >= 100 * 1024 * 1024);
+      }
+      index++;
+    }
+    assertEquals(20, numZeroPosFetcher);
+    assertEquals(20, uniqPullHost.size());
+    assertEquals(expectedTotalLength, totalLength);
+  }
+
+  @Test
+  public void testSplitIntermediates2() {
+    long[][] pageDatas = {
+        {0, 10538717},
+        {10538717, 10515884},
+        {21054601, 10514343},
+        {31568944, 10493988},
+        {42062932, 10560639},
+        {52623571, 10548486},
+        {63172057, 10537811},
+        {73709868, 10571060},
+        {84280928, 10515062},
+        {94795990, 10502964},
+        {105298954, 10514011},
+        {115812965, 10532154},
+        {126345119, 10534133},
+        {136879252, 10549749},
+        {147429001, 10566547},
+        {157995548, 10543700},
+        {168539248, 10490324},
+        {179029572, 10500720},
+        {189530292, 10505425},
+        {200035717, 10548418},
+        {210584135, 10562887},
+        {221147022, 10554967},
+        {231701989, 10507297},
+        {242209286, 10515612},
+        {252724898, 10491274},
+        {263216172, 10512956},
+        {273729128, 10490736},
+        {284219864, 10501878},
+        {294721742, 10564568},
+        {305286310, 10488896},
+        {315775206, 10516308},
+        {326291514, 10517965},
+        {336809479, 10487038},
+        {347296517, 10603472},
+        {357899989, 10507330},
+        {368407319, 10549429},
+        {378956748, 10533443},
+        {389490191, 10530852},
+        {400021043, 11036431},
+        {411057474, 10541007},
+        {421598481, 10600477},
+        {432198958, 10519805},
+        {442718763, 10500769},
+        {453219532, 10507192},
+        {463726724, 10540424},
+        {474267148, 10509129},
+        {484776277, 10527100},
+        {495303377, 10720789},
+        {506024166, 10568542},
+        {516592708, 11046886},
+        {527639594, 10580358},
+        {538219952, 10508940},
+        {548728892, 10523968},
+        {559252860, 10580626},
+        {569833486, 10539361},
+        {580372847, 10496662},
+        {590869509, 10505280},
+        {601374789, 10564655},
+        {611939444, 10505842},
+        {622445286, 10523889},
+        {632969175, 10553186},
+        {643522361, 10535866},
+        {654058227, 10501796},
+        {664560023, 10530358},
+        {675090381, 10585340},
+        {685675721, 10602017},
+        {696277738, 10546614},
+        {706824352, 10511511},
+        {717335863, 11019221},
+        {728355084, 10558143},
+        {738913227, 10516245},
+        {749429472, 10502613},
+        {759932085, 10522145},
+        {770454230, 10489373},
+        {780943603, 10520973},
+        {791464576, 11021218},
+        {802485794, 10496362},
+        {812982156, 10502354},
+        {823484510, 10515932},
+        {834000442, 10591044},
+        {844591486, 5523957}
+    };
+
+    List<IntermediateEntry> entries = new ArrayList<IntermediateEntry>();
+    for (int i = 0; i < 2; i++) {
+      List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
+      for (int j = 0; j < pageDatas.length; j++) {
+        pages.add(new Pair(pageDatas[j][0], (int) (pageDatas[j][1])));
+      }
+      IntermediateEntry entry = new IntermediateEntry(-1, -1, 1, new Task.PullHost("host" + i , 9000));
+      entry.setPages(pages);
+
+      entries.add(entry);
+    }
+
+    long splitVolume = 256 * 1024 * 1024;
+    List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, entries, splitVolume,
+        10 * 1024 * 1024);
+
+
+    long[][] expected = {
+        {0,263216172},
+        {263216172,264423422},
+        {527639594,263824982},
+        {791464576,58650867},
+        {0,200035717},
+        {200035717,263691007},
+        {463726724,264628360},
+        {728355084,121760359},
+    };
+    int index = 0;
+    for (List<FetchImpl> eachFetchList: fetches) {
+      if (index == 3) {
+        assertEquals(2, eachFetchList.size());
+      } else {
+        assertEquals(1, eachFetchList.size());
+      }
+      for (FetchImpl eachFetch: eachFetchList) {
+        assertEquals(expected[index][0], eachFetch.getOffset());
+        assertEquals(expected[index][1], eachFetch.getLength());
+        index++;
+      }
+    }
+  }
+
+  @Test
+  public void testSplitIntermediatesWithUniqueHost() {
+    List<IntermediateEntry> intermediateEntries = new ArrayList<IntermediateEntry>();
+
+    int[] pageLengths = new int[20];  //195MB
+    for (int i = 0 ; i < pageLengths.length; i++) {
+      if (i < pageLengths.length - 1) {
+        pageLengths[i] =  10 * 1024 * 1024;
+      } else {
+        pageLengths[i] =  5 * 1024 * 1024;
+      }
+    }
+
+    long expectedTotalLength = 0;
+    Task.PullHost pullHost = new Task.PullHost("host", 0);
+
+    for (int i = 0; i < 20; i++) {
+      List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
+      long offset = 0;
+      for (int j = 0; j < pageLengths.length; j++) {
+        pages.add(new Pair(offset, pageLengths[j]));
+        offset += pageLengths[j];
+        expectedTotalLength += pageLengths[j];
+      }
+      IntermediateEntry interm = new IntermediateEntry(i, -1, 0, pullHost);
+      interm.setPages(pages);
+      interm.setVolume(offset);
+      intermediateEntries.add(interm);
+    }
+
+    long splitVolume = 128 * 1024 * 1024;
+    List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
+        splitVolume, 10 * 1024 * 1024);
+    assertEquals(32, fetches.size());
+
+    int expectedSize = 0;
+    Set<FetchImpl> fetchSet = TUtil.newHashSet();
+    for(List<FetchImpl> list : fetches){
+      expectedSize += list.size();
+      fetchSet.addAll(list);
+    }
+    assertEquals(expectedSize, fetchSet.size());
+
+
+    int index = 0;
+    int numZeroPosFetcher = 0;
+    long totalLength = 0;
+    Set<String> uniqPullHost = new HashSet<String>();
+
+    for (List<FetchImpl> eachFetchList: fetches) {
+      long length = 0;
+      for (FetchImpl eachFetch: eachFetchList) {
+        if (eachFetch.getOffset() == 0) {
+          numZeroPosFetcher++;
+        }
+        totalLength += eachFetch.getLength();
+        length += eachFetch.getLength();
+        uniqPullHost.add(eachFetch.getPullHost().toString());
+      }
+      assertTrue(length + " should be smaller than splitVolume", length < splitVolume);
+      if (index < fetches.size() - 1) {
+        assertTrue(length + " should be great than 100MB" + fetches.size() + "," + index, length >= 100 * 1024 * 1024);
+      }
+      index++;
+    }
+    assertEquals(20, numZeroPosFetcher);
+    assertEquals(1, uniqPullHost.size());
+    assertEquals(expectedTotalLength, totalLength);
+  }
+
+  @Test
+  public void testFetchImpl() {
+    ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+    Task.PullHost pullHost = new Task.PullHost("localhost", 0);
+
+    FetchImpl expected = new FetchImpl(pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1);
+    FetchImpl fetch2 = new FetchImpl(pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1);
+    assertEquals(expected, fetch2);
+    fetch2.setOffset(5);
+    fetch2.setLength(10);
+    assertNotEquals(expected, fetch2);
+  }
+
+  private static void assertFetchImpl(FetchImpl [] expected, Map<String, List<FetchImpl>>[] result) {
+    Set<FetchImpl> expectedURLs = Sets.newHashSet();
+
+    for (FetchImpl f : expected) {
+      expectedURLs.add(f);
+    }
+
+    Set<FetchImpl> resultURLs = Sets.newHashSet();
+
+    for (Map<String, List<FetchImpl>> e : result) {
+      for (List<FetchImpl> list : e.values()) {
+        resultURLs.addAll(list);
+      }
+    }
+
+    assertEquals(expectedURLs.size(), resultURLs.size());
+    assertEquals(expectedURLs, resultURLs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/master/rule/TestMasterRules.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/master/rule/TestMasterRules.java b/tajo-core-tests/src/test/java/org/apache/tajo/master/rule/TestMasterRules.java
new file mode 100644
index 0000000..dc34620
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/master/rule/TestMasterRules.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rule;
+
+import static org.junit.Assert.*;
+import static org.hamcrest.CoreMatchers.*;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.rule.EvaluationContext;
+import org.apache.tajo.rule.EvaluationFailedException;
+import org.apache.tajo.rule.EvaluationResult;
+import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode;
+import org.apache.tajo.rule.SelfDiagnosisRuleEngine;
+import org.apache.tajo.rule.SelfDiagnosisRuleSession;
+import org.apache.tajo.rule.base.TajoConfValidationRule;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestMasterRules {
+  
+  private static Path rootFilePath;
+  
+  @BeforeClass
+  public static void setUpClass() throws Exception {
+    rootFilePath = CommonTestingUtil.getTestDir();
+  }
+  
+  @AfterClass
+  public static void tearDownClass() throws Exception {
+    CommonTestingUtil.cleanupTestDir(rootFilePath.toUri().getPath());
+  }
+
+  @Test
+  public void testTajoConfValidationRule() throws Exception {
+    TajoConf tajoConf = new TajoConf(new YarnConfiguration());
+    
+    EvaluationContext context = new EvaluationContext();
+    context.addParameter(TajoConf.class.getName(), tajoConf);
+    
+    TajoConfValidationRule validationRule = new TajoConfValidationRule();
+    EvaluationResult result = validationRule.evaluate(context);
+    
+    assertThat(result, is(notNullValue()));
+    assertThat(result.getReturnCode(), is(EvaluationResultCode.OK));
+  }
+  
+  @Test(expected=EvaluationFailedException.class)
+  public void testTajoConfValidationRuleWithException() throws Exception {
+    TajoConf tajoConf = new TajoConf(new YarnConfiguration());
+    SelfDiagnosisRuleEngine ruleEngine = SelfDiagnosisRuleEngine.getInstance();
+    SelfDiagnosisRuleSession ruleSession = ruleEngine.newRuleSession();
+    
+    tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, "invalid path.");
+    
+    EvaluationContext context = new EvaluationContext();
+    context.addParameter(TajoConf.class.getName(), tajoConf);
+    
+    ruleSession.withRuleNames("TajoConfValidationRule").fireRules(context);
+    
+    fail("EvaluationFailedException exception is expected, but it does not happen.");
+  }
+  
+  protected void createTajoDirectories(TajoConf tajoConf) throws Exception {
+    Path tajoRootDir = new Path(rootFilePath, "tajo-root");
+    FileSystem rootFs = tajoRootDir.getFileSystem(tajoConf);
+    FsPermission defaultPermission = FsPermission.createImmutable((short)0700);
+    
+    if (!rootFs.exists(tajoRootDir)) {
+      rootFs.mkdirs(tajoRootDir, new FsPermission(defaultPermission));
+    }
+    
+    tajoConf.setVar(ConfVars.ROOT_DIR, tajoRootDir.toUri().toString());
+    
+    Path tajoSystemDir = new Path(tajoRootDir, TajoConstants.SYSTEM_DIR_NAME);
+    if (!rootFs.exists(tajoSystemDir)) {
+      rootFs.mkdirs(tajoSystemDir, new FsPermission(defaultPermission));
+    }
+    
+    Path tajoSystemResourceDir = new Path(tajoSystemDir, TajoConstants.SYSTEM_RESOURCE_DIR_NAME);
+    if (!rootFs.exists(tajoSystemResourceDir)) {
+      rootFs.mkdirs(tajoSystemResourceDir, new FsPermission(defaultPermission));
+    }
+    
+    Path tajoWarehouseDir = new Path(tajoRootDir, TajoConstants.WAREHOUSE_DIR_NAME);
+    if (!rootFs.exists(tajoWarehouseDir)) {
+      rootFs.mkdirs(tajoWarehouseDir, new FsPermission(defaultPermission));
+    }
+    
+    Path tajoStagingDir = new Path(tajoRootDir, "staging");
+    if (!rootFs.exists(tajoStagingDir)) {
+      rootFs.mkdirs(tajoStagingDir, new FsPermission(defaultPermission));
+    }
+    tajoConf.setVar(ConfVars.STAGING_ROOT_DIR, tajoStagingDir.toUri().toString());
+  }
+  
+  @Test
+  public void testFileSystemRule() throws Exception {
+    TajoConf tajoConf = new TajoConf(new YarnConfiguration());
+    
+    createTajoDirectories(tajoConf);
+    
+    EvaluationContext context = new EvaluationContext();
+    context.addParameter(TajoConf.class.getName(), tajoConf);
+    
+    FileSystemRule fsRule = new FileSystemRule();
+    EvaluationResult result = fsRule.evaluate(context);
+    
+    assertThat(result, is(notNullValue()));
+    assertThat(result.getReturnCode(), is(EvaluationResultCode.OK));
+  }
+  
+  @Test
+  public void testFileSystemRuleWithError() throws Exception {
+    TajoConf tajoConf = new TajoConf(new YarnConfiguration());
+    
+    createTajoDirectories(tajoConf);
+    Path systemResourceDir = TajoConf.getSystemResourceDir(tajoConf);
+    FileSystem defaultFs = systemResourceDir.getFileSystem(tajoConf);
+    if (defaultFs.exists(systemResourceDir)) {
+      defaultFs.delete(systemResourceDir, true);
+    }
+    
+    EvaluationContext context = new EvaluationContext();
+    context.addParameter(TajoConf.class.getName(), tajoConf);
+    
+    FileSystemRule fsRule = new FileSystemRule();
+    EvaluationResult result = fsRule.evaluate(context);
+    
+    assertThat(result, is(notNullValue()));
+    assertThat(result.getReturnCode(), is(EvaluationResultCode.ERROR));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/master/scheduler/TestSimpleScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/master/scheduler/TestSimpleScheduler.java b/tajo-core-tests/src/test/java/org/apache/tajo/master/scheduler/TestSimpleScheduler.java
new file mode 100644
index 0000000..328c281
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/master/scheduler/TestSimpleScheduler.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.scheduler;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.annotation.NotThreadSafe;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.QueryInfo;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.rm.*;
+import org.apache.tajo.master.scheduler.event.ResourceReserveSchedulerEvent;
+import org.apache.tajo.master.scheduler.event.SchedulerEvent;
+import org.apache.tajo.master.scheduler.event.SchedulerEventType;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.resource.NodeResources;
+import org.apache.tajo.rpc.CallFuture;
+import org.junit.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+
+import static org.apache.tajo.ResourceProtos.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@NotThreadSafe
+public class TestSimpleScheduler {
+  private CompositeService service;
+  private SimpleScheduler scheduler;
+  private TajoRMContext rmContext;
+  private AsyncDispatcher dispatcher;
+  private TajoConf conf;
+  private int workerNum = 3;
+  private NodeResource nodeResource;
+  private NodeResource totalResource;
+  private Semaphore barrier;
+  private int testDelay = 50;
+  private static ScheduledExecutorService executorService;
+
+  @BeforeClass
+  public static void setupClass() {
+    executorService = Executors.newScheduledThreadPool(10);
+  }
+
+  @AfterClass
+  public static void tearDownClass() {
+    executorService.shutdown();
+  }
+
+  @Before
+  public void setup() {
+    conf = new TajoConf();
+    nodeResource = NodeResource.createResource(1500, 2, 3);
+    service = new CompositeService(TestSimpleScheduler.class.getSimpleName()) {
+
+      @Override
+      protected void serviceInit(Configuration conf) throws Exception {
+        dispatcher = new AsyncDispatcher();
+        addService(dispatcher);
+
+        rmContext = new TajoRMContext(dispatcher);
+        rmContext.getDispatcher().register(NodeEventType.class,
+            new TajoResourceManager.WorkerEventDispatcher(rmContext));
+
+        barrier = new Semaphore(0);
+        scheduler = new MySimpleScheduler(rmContext, barrier);
+        addService(scheduler);
+        rmContext.getDispatcher().register(SchedulerEventType.class, scheduler);
+
+        for (int i = 0; i < workerNum; i++) {
+          WorkerConnectionInfo conn = new WorkerConnectionInfo("host" + i, 28091 + i, 28092, 21000, 28093, 28080);
+          rmContext.getNodes().putIfAbsent(conn.getId(),
+              new NodeStatus(rmContext, NodeResources.clone(nodeResource), conn));
+          rmContext.getDispatcher().getEventHandler().handle(new NodeEvent(conn.getId(), NodeEventType.STARTED));
+        }
+        super.serviceInit(conf);
+      }
+    };
+    service.init(conf);
+    service.start();
+
+    assertEquals(workerNum, rmContext.getNodes().size());
+    totalResource = NodeResources.createResource(0);
+    for(NodeStatus nodeStatus : rmContext.getNodes().values()) {
+      NodeResources.addTo(totalResource, nodeStatus.getTotalResourceCapability());
+    }
+  }
+
+  @After
+  public void tearDown() {
+    service.stop();
+  }
+
+  @Test
+  public void testInitialCapacity() throws InterruptedException {
+    assertEquals(workerNum, scheduler.getNumClusterNodes());
+    assertEquals(0, scheduler.getRunningQuery());
+
+    assertEquals(totalResource, scheduler.getMaximumResourceCapability());
+    assertEquals(totalResource, scheduler.getClusterResource());
+
+    assertEquals(TajoConf.ConfVars.QUERYMASTER_MINIMUM_MEMORY.defaultIntVal,
+        scheduler.getQMMinimumResourceCapability().getMemory());
+
+    assertEquals(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY.defaultIntVal,
+        scheduler.getMinimumResourceCapability().getMemory());
+  }
+
+  @Test(timeout = 10000)
+  public void testSubmitOneQuery() throws InterruptedException {
+    QuerySchedulingInfo schedulingInfo = new QuerySchedulingInfo("default",
+        "user",
+        QueryIdFactory.newQueryId(System.nanoTime(), 0),
+        1,
+        System.currentTimeMillis());
+
+    assertEquals(0, scheduler.getRunningQuery());
+
+    scheduler.submitQuery(schedulingInfo);
+    barrier.acquire();
+    assertEquals(1, scheduler.getRunningQuery());
+
+    assertEquals(totalResource, scheduler.getMaximumResourceCapability());
+    assertEquals(totalResource,
+        NodeResources.add(scheduler.getQMMinimumResourceCapability(), scheduler.getClusterResource()));
+  }
+
+  @Test(timeout = 10000)
+  public void testMaximumSubmitQuery() throws InterruptedException {
+    assertEquals(0, scheduler.getRunningQuery());
+    int maximumParallelQuery = scheduler.getResourceCalculator().computeAvailableContainers(
+        scheduler.getMaximumResourceCapability(), scheduler.getQMMinimumResourceCapability());
+
+    int testParallelNum = 10;
+    for (int i = 0; i < testParallelNum; i++) {
+      QuerySchedulingInfo schedulingInfo = new QuerySchedulingInfo("default",
+          "user",
+          QueryIdFactory.newQueryId(System.nanoTime(), 0),
+          1,
+          System.currentTimeMillis());
+      scheduler.submitQuery(schedulingInfo);
+    }
+
+    barrier.acquire();
+    // allow 50% parallel running
+    assertEquals(Math.floor(maximumParallelQuery * 0.5f), (double) scheduler.getRunningQuery(), 1.0f);
+    assertEquals(testParallelNum, scheduler.getRunningQuery() + scheduler.getQueryQueue().size());
+  }
+
+  @Test(timeout = 10000)
+  public void testReserveResource() throws InterruptedException, ExecutionException {
+    int requestNum = 3;
+    assertEquals(totalResource, scheduler.getMaximumResourceCapability());
+    assertEquals(totalResource, scheduler.getClusterResource());
+
+    QueryId queryId = QueryIdFactory.newQueryId(System.nanoTime(), 0);
+    CallFuture<NodeResourceResponse> callBack = new CallFuture<NodeResourceResponse>();
+    rmContext.getDispatcher().getEventHandler().handle(new ResourceReserveSchedulerEvent(
+        createResourceRequest(queryId, requestNum, new ArrayList<Integer>()), callBack));
+
+    NodeResourceResponse responseProto = callBack.get();
+    assertEquals(queryId, new QueryId(responseProto.getQueryId()));
+    assertEquals(requestNum, responseProto.getResourceCount());
+
+    NodeResource allocations = NodeResources.createResource(0);
+    for (AllocationResourceProto resourceProto : responseProto.getResourceList()) {
+      NodeResources.addTo(allocations, new NodeResource(resourceProto.getResource()));
+    }
+
+    assertEquals(NodeResources.subtract(totalResource, allocations), scheduler.getClusterResource());
+  }
+
+  @Test(timeout = 10000)
+  public void testReserveResourceWithWorkerPriority() throws InterruptedException, ExecutionException {
+    int requestNum = 2;
+    assertEquals(totalResource, scheduler.getMaximumResourceCapability());
+    assertEquals(totalResource, scheduler.getClusterResource());
+
+    List<Integer> targetWorkers = Lists.newArrayList();
+    Map.Entry<Integer, NodeStatus> workerEntry = rmContext.getNodes().entrySet().iterator().next();
+    targetWorkers.add(workerEntry.getKey());
+
+    NodeResource expectResource = NodeResources.multiply(scheduler.getMinimumResourceCapability(), requestNum);
+    assertTrue(NodeResources.fitsIn(expectResource, workerEntry.getValue().getAvailableResource()));
+
+    QueryId queryId = QueryIdFactory.newQueryId(System.nanoTime(), 0);
+    NodeResourceRequest requestProto = createResourceRequest(queryId, requestNum, targetWorkers);
+    CallFuture<NodeResourceResponse> callBack = new CallFuture<NodeResourceResponse>();
+    rmContext.getDispatcher().getEventHandler().handle(new ResourceReserveSchedulerEvent(
+        requestProto, callBack));
+
+    NodeResourceResponse responseProto = callBack.get();
+    assertEquals(queryId, new QueryId(responseProto.getQueryId()));
+    assertEquals(requestNum, responseProto.getResourceCount());
+
+    for (AllocationResourceProto resourceProto : responseProto.getResourceList()) {
+      assertEquals(workerEntry.getKey().intValue(), resourceProto.getWorkerId());
+    }
+  }
+
+  private NodeResourceRequest
+  createResourceRequest(QueryId queryId, int containerNum, List<Integer> candidateWorkers) {
+    NodeResourceRequest.Builder request =
+        NodeResourceRequest.newBuilder();
+    request.setCapacity(scheduler.getMinimumResourceCapability().getProto())
+        .setNumContainers(containerNum)
+        .setPriority(1)
+        .setQueryId(queryId.getProto())
+        .setType(ResourceType.LEAF)
+        .setUserId("test user")
+        .setRunningTasks(0)
+        .addAllCandidateNodes(candidateWorkers)
+        .setQueue("default");
+    return request.build();
+  }
+
+  class MySimpleScheduler extends SimpleScheduler {
+    Semaphore barrier;
+    Map<QueryId, QueryInfo> queryInfoMap = Maps.newHashMap();
+    Map<QueryId, AllocationResourceProto> qmAllocationMap = Maps.newHashMap();
+
+    public MySimpleScheduler(TajoRMContext rmContext, Semaphore barrier) {
+      super(null, rmContext);
+      this.barrier = barrier;
+    }
+
+    @Override
+    public void submitQuery(QuerySchedulingInfo schedulingInfo) {
+      queryInfoMap.put(schedulingInfo.getQueryId(), new QueryInfo(schedulingInfo.getQueryId()) {
+        QueryContext context;
+        @Override
+        public QueryContext getQueryContext() {
+          if(context == null) {
+            context = new QueryContext(conf);
+            context.setUser("user");
+          }
+          return context;
+        }
+      });
+      super.submitQuery(schedulingInfo);
+    }
+
+    @Override
+    protected boolean startQuery(final QueryId queryId, final AllocationResourceProto allocation) {
+      executorService.schedule(new Runnable() {
+        @Override
+        public void run() {
+          barrier.release();
+          qmAllocationMap.put(queryId, allocation);
+          rmContext.getDispatcher().getEventHandler().handle(new SchedulerEvent(SchedulerEventType.RESOURCE_UPDATE));
+        }
+      }, testDelay, TimeUnit.MILLISECONDS);
+      return true;
+    }
+
+    @Override
+    public void handle(SchedulerEvent event) {
+      super.handle(event);
+      barrier.release();
+    }
+
+    @Override
+    protected QueryInfo getQueryInfo(QueryId queryId) {
+      return queryInfoMap.get(queryId);
+    }
+
+    @Override
+    public void stopQuery(QueryId queryId) {
+      queryInfoMap.remove(queryId);
+      AllocationResourceProto allocationResourceProto = qmAllocationMap.remove(queryId);
+      NodeResources.addTo(rmContext.getNodes().get(allocationResourceProto.getWorkerId()).getAvailableResource(),
+          new NodeResource(allocationResourceProto.getResource()));
+      super.stopQuery(queryId);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java
new file mode 100644
index 0000000..237fb32
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import org.apache.tajo.util.Pair;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestIntermediateEntry {
+  @Test
+  public void testPage() {
+    Task.IntermediateEntry interm = new Task.IntermediateEntry(-1, -1, 1, null);
+
+    List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
+    pages.add(new Pair(0L, 1441275));
+    pages.add(new Pair(1441275L, 1447446));
+    pages.add(new Pair(2888721L, 1442507));
+
+    interm.setPages(pages);
+
+    long splitBytes = 3 * 1024 * 1024;
+
+    List<Pair<Long, Long>> splits = interm.split(splitBytes, splitBytes);
+    assertEquals(2, splits.size());
+
+    long[][] expected = { {0, 1441275 + 1447446}, {1441275 + 1447446, 1442507} };
+    for (int i = 0; i < 2; i++) {
+      Pair<Long, Long> eachSplit = splits.get(i);
+      assertEquals(expected[i][0], eachSplit.getFirst().longValue());
+      assertEquals(expected[i][1], eachSplit.getSecond().longValue());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
new file mode 100644
index 0000000..0e3e63e
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.tajo.*;
+import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.benchmark.TPCH;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.global.GlobalPlanner;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.engine.query.TaskRequestImpl;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.event.QueryEvent;
+import org.apache.tajo.master.event.QueryEventType;
+import org.apache.tajo.master.event.StageEvent;
+import org.apache.tajo.master.event.StageEventType;
+import org.apache.tajo.plan.LogicalOptimizer;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.resource.NodeResources;
+import org.apache.tajo.session.Session;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.worker.*;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+public class TestKillQuery {
+  private static TajoTestingCluster cluster;
+  private static TajoConf conf;
+  private static TajoClient client;
+  private static String queryStr = "select t1.l_orderkey, t1.l_partkey, t2.c_custkey " +
+      "from lineitem t1 join customer t2 " +
+      "on t1.l_orderkey = t2.c_custkey order by t1.l_orderkey";
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    cluster = new TajoTestingCluster();
+    cluster.startMiniClusterInLocal(1);
+    conf = cluster.getConfiguration();
+    client = cluster.newTajoClient();
+    File file = TPCH.getDataFile("lineitem");
+    client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) "
+        + "using text location 'file://" + file.getAbsolutePath() + "'");
+    assertTrue(client.existTable("default.lineitem"));
+
+    file = TPCH.getDataFile("customer");
+    client.executeQueryAndGetResult("create external table default.customer (c_custkey int, c_name text) "
+        + "using text location 'file://" + file.getAbsolutePath() + "'");
+    assertTrue(client.existTable("default.customer"));
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    if (client != null) client.close();
+    if (cluster != null) cluster.shutdownMiniCluster();
+  }
+
+  @Test
+  public final void testKillQueryFromInitState() throws Exception {
+    SQLAnalyzer analyzer = new SQLAnalyzer();
+    QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
+    Session session = LocalTajoTestingUtility.createDummySession();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+
+    LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
+    Expr expr =  analyzer.parse(queryStr);
+    LogicalPlan plan = planner.createPlan(defaultContext, expr);
+
+    optimizer.optimize(plan);
+
+    QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
+    QueryContext queryContext = new QueryContext(conf);
+    MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+    GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
+    globalPlanner.build(queryContext, masterPlan);
+
+    CountDownLatch barrier  = new CountDownLatch(1);
+    MockAsyncDispatch dispatch = new MockAsyncDispatch(barrier, StageEventType.SQ_INIT);
+
+    QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster();
+    QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(),
+        queryId, session, defaultContext, expr.toJson(), NodeResources.createResource(512), dispatch);
+
+    queryMasterTask.init(conf);
+    queryMasterTask.getQueryTaskContext().getDispatcher().start();
+    queryMasterTask.startQuery();
+
+    try{
+      barrier.await(5000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      fail("Query state : " + queryMasterTask.getQuery().getSynchronizedState());
+    }
+
+    Stage stage = queryMasterTask.getQuery().getStages().iterator().next();
+    assertNotNull(stage);
+
+    // fire kill event
+    queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL));
+
+    try {
+      cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50);
+      assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState());
+    } catch (Exception e) {
+      e.printStackTrace();
+      if (stage != null) {
+        System.err.println(String.format("Stage: [%s] (Total: %d, Complete: %d, Success: %d, Killed: %d, Failed: %d)",
+            stage.getId().toString(),
+            stage.getTotalScheduledObjectsCount(),
+            stage.getCompletedTaskCount(),
+            stage.getSucceededObjectCount(),
+            stage.getKilledObjectCount(),
+            stage.getFailedObjectCount()));
+      }
+      throw e;
+    } finally {
+      queryMasterTask.stop();
+    }
+  }
+
+  @Test
+  public final void testIgnoreStageStateFromKilled() throws Exception {
+
+    SQLAnalyzer analyzer = new SQLAnalyzer();
+    QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
+    Session session = LocalTajoTestingUtility.createDummySession();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+
+    LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
+    Expr expr =  analyzer.parse(queryStr);
+    LogicalPlan plan = planner.createPlan(defaultContext, expr);
+
+    optimizer.optimize(plan);
+
+    QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
+    QueryContext queryContext = new QueryContext(conf);
+    MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+    GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
+    globalPlanner.build(queryContext, masterPlan);
+
+    CountDownLatch barrier  = new CountDownLatch(1);
+    MockAsyncDispatch dispatch = new MockAsyncDispatch(barrier, TajoProtos.QueryState.QUERY_RUNNING);
+
+    QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster();
+    QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(),
+        queryId, session, defaultContext, expr.toJson(), NodeResources.createResource(512), dispatch);
+
+    queryMasterTask.init(conf);
+    queryMasterTask.getQueryTaskContext().getDispatcher().start();
+    queryMasterTask.startQuery();
+
+    try{
+      barrier.await(5000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      fail("Query state : " + queryMasterTask.getQuery().getSynchronizedState());
+    }
+
+    Stage stage = queryMasterTask.getQuery().getStages().iterator().next();
+    assertNotNull(stage);
+
+    // fire kill event
+    queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL));
+
+    try {
+      cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50);
+      assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState());
+    }   finally {
+      queryMasterTask.stop();
+    }
+
+    List<Stage> stages = Lists.newArrayList(queryMasterTask.getQuery().getStages());
+    Stage lastStage = stages.get(stages.size() - 1);
+
+    assertEquals(StageState.KILLED, lastStage.getSynchronizedState());
+
+    lastStage.getStateMachine().doTransition(StageEventType.SQ_START,
+        new StageEvent(lastStage.getId(), StageEventType.SQ_START));
+
+    lastStage.getStateMachine().doTransition(StageEventType.SQ_KILL,
+        new StageEvent(lastStage.getId(), StageEventType.SQ_KILL));
+
+    lastStage.getStateMachine().doTransition(StageEventType.SQ_SHUFFLE_REPORT,
+        new StageEvent(lastStage.getId(), StageEventType.SQ_SHUFFLE_REPORT));
+
+    lastStage.getStateMachine().doTransition(StageEventType.SQ_STAGE_COMPLETED,
+        new StageEvent(lastStage.getId(), StageEventType.SQ_STAGE_COMPLETED));
+
+    lastStage.getStateMachine().doTransition(StageEventType.SQ_FAILED,
+        new StageEvent(lastStage.getId(), StageEventType.SQ_FAILED));
+  }
+
+  @Test
+  public void testKillTask() throws Throwable {
+    QueryId qid = LocalTajoTestingUtility.newQueryId();
+    ExecutionBlockId eid = QueryIdFactory.newExecutionBlockId(qid, 1);
+    TaskId tid = QueryIdFactory.newTaskId(eid);
+    final TajoConf conf = new TajoConf();
+    TaskRequestImpl taskRequest = new TaskRequestImpl();
+    WorkerConnectionInfo queryMaster = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+
+    TaskAttemptId attemptId = new TaskAttemptId(tid, 1);
+    taskRequest.set(attemptId, new ArrayList<CatalogProtos.FragmentProto>(),
+        null, false, PlanProto.LogicalNodeTree.newBuilder().build(), new QueryContext(conf),
+        null, null, queryMaster.getHostAndQMPort());
+    taskRequest.setInterQuery();
+
+
+    ExecutionBlockContextResponse.Builder requestProtoBuilder =
+        ExecutionBlockContextResponse.newBuilder();
+    requestProtoBuilder.setExecutionBlockId(eid.getProto())
+        .setPlanJson("test")
+        .setQueryContext(new QueryContext(conf).getProto())
+        .setQueryOutputPath("testpath")
+        .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
+
+    TajoWorker.WorkerContext workerContext = new MockWorkerContext() {
+      @Override
+      public TajoConf getConf() {
+        return conf;
+      }
+
+      @Override
+      public TaskManager getTaskManager() {
+        return null;
+      }
+
+      @Override
+      public TaskExecutor getTaskExecuor() {
+        return null;
+      }
+
+      @Override
+      public NodeResourceManager getNodeResourceManager() {
+        return null;
+      }
+    };
+
+    ExecutionBlockContext context = new MockExecutionBlock(workerContext, requestProtoBuilder.build()) {
+      @Override
+      public Path createBaseDir() throws IOException {
+        return new Path("test");
+      }
+    };
+
+    org.apache.tajo.worker.Task task = new TaskImpl(taskRequest, context);
+    task.kill();
+    assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getTaskContext().getState());
+    try {
+      task.run();
+      assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getTaskContext().getState());
+    } catch (Exception e) {
+      assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getTaskContext().getState());
+    }
+  }
+
+  static class MockAsyncDispatch extends AsyncDispatcher {
+    private CountDownLatch latch;
+    private Enum eventType;
+
+    MockAsyncDispatch(CountDownLatch latch, Enum eventType) {
+      super();
+      this.latch = latch;
+      this.eventType = eventType;
+    }
+
+    @Override
+    protected void dispatch(Event event) {
+      if (event.getType() == eventType) {
+        latch.countDown();
+      }
+      super.dispatch(event);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryProgress.java b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryProgress.java
new file mode 100644
index 0000000..f9ed367
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryProgress.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.client.QueryStatus;
+import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoClientUtil;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ClientProtos;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category(IntegrationTest.class)
+public class TestQueryProgress {
+  private static TajoTestingCluster cluster;
+  private static TajoConf conf;
+  private static TajoClient client;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    cluster = TpchTestBase.getInstance().getTestingCluster();
+    conf = cluster.getConfiguration();
+    client = cluster.newTajoClient();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    client.close();
+  }
+
+  @Test(timeout = 10000)
+  public final void testQueryProgress() throws Exception {
+    ClientProtos.SubmitQueryResponse res = client.executeQuery("select l_orderkey from lineitem group by l_orderkey");
+    QueryId queryId = new QueryId(res.getQueryId());
+
+    float prevProgress = 0;
+    while (true) {
+      QueryStatus status = client.getQueryStatus(queryId);
+      if (status == null) continue;
+
+      float progress = status.getProgress();
+
+      if (prevProgress > progress) {
+        fail("Previous progress: " + prevProgress + ", Current progress : " + progress);
+      }
+      prevProgress = progress;
+      assertTrue(progress <= 1.0f);
+
+      if (TajoClientUtil.isQueryComplete(status.getState())) break;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java
new file mode 100644
index 0000000..978d709
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import org.apache.tajo.*;
+import org.apache.tajo.annotation.NotThreadSafe;
+import org.apache.tajo.client.QueryStatus;
+import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoClientUtil;
+import org.apache.tajo.ipc.ClientProtos;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.*;
+
+@Category(IntegrationTest.class)
+@NotThreadSafe
+public class TestQueryState {
+  private static TajoTestingCluster cluster;
+  private static TajoClient client;
+
+  @BeforeClass
+  public static void setUpClass() throws Exception {
+    cluster = TpchTestBase.getInstance().getTestingCluster();
+    client = cluster.newTajoClient();
+  }
+
+  @AfterClass
+  public static void tearDownClass() {
+    client.close();
+  }
+
+  @Test(timeout = 10000)
+  public void testSucceededState() throws Exception {
+    String queryStr = "select l_orderkey from lineitem group by l_orderkey order by l_orderkey";
+    /*
+    =======================================================
+    Block Id: eb_1429886996479_0001_000001 [LEAF] HASH_SHUFFLE
+    Block Id: eb_1429886996479_0001_000002 [INTERMEDIATE] RANGE_SHUFFLE
+    Block Id: eb_1429886996479_0001_000003 [ROOT] NONE_SHUFFLE
+    Block Id: eb_1429886996479_0001_000004 [TERMINAL]
+    =======================================================
+
+    The order of execution:
+
+    1: eb_1429886996479_0001_000001
+    2: eb_1429886996479_0001_000002
+    3: eb_1429886996479_0001_000003
+    4: eb_1429886996479_0001_000004
+    */
+
+    ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr);
+    QueryId queryId = new QueryId(res.getQueryId());
+
+    QueryStatus queryState = client.getQueryStatus(queryId);
+    while (!TajoClientUtil.isQueryComplete(queryState.getState())) {
+      try {
+        Thread.sleep(50);
+      } catch (InterruptedException e) {
+        fail("Query state : " + queryState);
+      }
+      queryState = client.getQueryStatus(queryId);
+    }
+
+    QueryMasterTask qmt = cluster.getQueryMasterTask(queryId);
+    Query query = qmt.getQuery();
+
+    assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, qmt.getState());
+    assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, query.getSynchronizedState());
+    assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, query.getState());
+
+    assertFalse(query.getStages().isEmpty());
+    for (Stage stage : query.getStages()) {
+      assertEquals(StageState.SUCCEEDED, stage.getSynchronizedState());
+      assertEquals(StageState.SUCCEEDED, stage.getState());
+    }
+
+    /* get status from TajoMaster */
+    assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId).getState());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java
new file mode 100644
index 0000000..b468e37
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.worker.TajoWorker;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.ResultSet;
+import java.util.*;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.junit.Assert.*;
+
+@Category(IntegrationTest.class)
+public class TestTaskStatusUpdate extends QueryTestCaseBase {
+
+  public TestTaskStatusUpdate() {
+    super(TajoConstants.DEFAULT_DATABASE_NAME);
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    conf.set(TajoConf.ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname, "false");
+  }
+
+  @Test
+  public final void case1() throws Exception {
+    // select l_linenumber, count(1) as unique_key from lineitem group by l_linenumber;
+    ResultSet res = null;
+    try {
+      res = executeQuery();
+
+      // tpch/lineitem.tbl
+      long[] expectedNumRows = new long[]{5, 2, 2, 2};
+      long[] expectedNumBytes = new long[]{604, 18, 18, 8};
+      long[] expectedReadBytes = new long[]{604, 604, 18, 0};
+
+      assertStatus(2, expectedNumRows, expectedNumBytes, expectedReadBytes);
+    } finally {
+      cleanupQuery(res);
+    }
+  }
+
+  @Test
+  public final void case2() throws Exception {
+    // ExternalMergeSort
+    ResultSet res = null;
+    try {
+      res = executeQuery();
+
+      // tpch/lineitem.tbl
+      long[] expectedNumRows = new long[]{5, 2, 2, 2, 2, 2};
+      long[] expectedNumBytes = new long[]{604, 162, 162, 138, 138, 194};
+      long[] expectedReadBytes = new long[]{604, 604, 162, 0, 138, 0};
+
+      assertStatus(3, expectedNumRows, expectedNumBytes, expectedReadBytes);
+    } finally {
+      cleanupQuery(res);
+    }
+  }
+
+
+  @Test
+  public final void case3() throws Exception {
+    // Partition Scan
+    ResultSet res = null;
+    try {
+      createColumnPartitionedTable();
+
+      /*
+      |-eb_1404143727281_0002_000005
+         |-eb_1404143727281_0002_000004        (order by)
+            |-eb_1404143727281_0002_000003     (join)
+               |-eb_1404143727281_0002_000002  (scan, filter)
+               |-eb_1404143727281_0002_000001  (scan)
+       */
+      res = executeQuery();
+
+      // in/out * stage(4)
+      long[] expectedNumRows = new long[]{5, 5, 2, 2, 7, 2, 2, 2};
+      long[] expectedNumBytes = new long[]{20, 75, 8, 34, 109, 34, 34, 18};
+      long[] expectedReadBytes = new long[]{20, 20, 8, 8, 109, 0, 34, 0};
+
+      assertStatus(4, expectedNumRows, expectedNumBytes, expectedReadBytes);
+    } finally {
+      cleanupQuery(res);
+    }
+  }
+
+  private void createColumnPartitionedTable() throws Exception {
+    String tableName = CatalogUtil.normalizeIdentifier("ColumnPartitionedTable");
+    ResultSet res = executeString(
+        "create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8) ");
+    res.close();
+
+    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+    assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size());
+    assertEquals(3,
+        catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size());
+
+    res = testBase.execute(
+        "insert overwrite into " + tableName + " select l_orderkey, l_partkey, l_quantity from lineitem");
+
+    res.close();
+  }
+
+  private void assertStatus(int numStages,
+                            long[] expectedNumRows,
+                            long[] expectedNumBytes,
+                            long[] expectedReadBytes) throws Exception {
+      List<TajoWorker> tajoWorkers = testingCluster.getTajoWorkers();
+      Collection<QueryMasterTask> finishedTasks = null;
+      for (TajoWorker eachWorker: tajoWorkers) {
+        finishedTasks = eachWorker.getWorkerContext().getQueryMaster().getFinishedQueryMasterTasks();
+        if (finishedTasks != null && !finishedTasks.isEmpty()) {
+          break;
+        }
+      }
+
+      assertNotNull(finishedTasks);
+      assertTrue(!finishedTasks.isEmpty());
+
+      List<QueryMasterTask> finishedTaskList = new ArrayList<QueryMasterTask>(finishedTasks);
+
+      Collections.sort(finishedTaskList, new Comparator<QueryMasterTask>() {
+        @Override
+        public int compare(QueryMasterTask o1, QueryMasterTask o2) {
+          return o2.getQueryId().compareTo(o1.getQueryId());
+        }
+      });
+
+      Query query = finishedTaskList.get(0).getQuery();
+
+      assertNotNull(query);
+
+      List<Stage> stages = new ArrayList<Stage>(query.getStages());
+      assertEquals(numStages, stages.size());
+
+      Collections.sort(stages, new Comparator<Stage>() {
+        @Override
+        public int compare(Stage o1, Stage o2) {
+          return o1.getId().compareTo(o2.getId());
+        }
+      });
+
+      int index = 0;
+      for (Stage eachStage : stages) {
+        TableStats inputStats = eachStage.getInputStats();
+        TableStats resultStats = eachStage.getResultStats();
+
+        assertNotNull(inputStats);
+        assertEquals(expectedNumRows[index], inputStats.getNumRows().longValue());
+        assertEquals(expectedNumBytes[index], inputStats.getNumBytes().longValue());
+        assertEquals(expectedReadBytes[index], inputStats.getReadBytes().longValue());
+
+        index++;
+
+        assertNotNull(resultStats);
+        assertEquals(expectedNumRows[index], resultStats.getNumRows().longValue());
+        assertEquals(expectedNumBytes[index], resultStats.getNumBytes().longValue());
+        assertEquals(expectedReadBytes[index], resultStats.getReadBytes().longValue());
+
+        index++;
+      }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/resource/TestResources.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/resource/TestResources.java b/tajo-core-tests/src/test/java/org/apache/tajo/resource/TestResources.java
new file mode 100644
index 0000000..eb0d732
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/resource/TestResources.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.resource;
+
+import org.junit.Test;
+
+import static org.apache.tajo.resource.NodeResources.componentwiseMin;
+import static org.apache.tajo.resource.NodeResources.createResource;
+import static org.apache.tajo.resource.NodeResources.fitsIn;
+import static org.junit.Assert.*;
+
+public class TestResources {
+  @Test
+  public void testFitsIn() {
+    assertTrue(fitsIn(createResource(512, 1, 1), createResource(1024, 2, 1)));
+    assertTrue(fitsIn(createResource(1024, 2, 1), createResource(1024, 2, 1)));
+    assertFalse(fitsIn(createResource(1024, 2, 1), createResource(512, 1, 1)));
+    assertFalse(fitsIn(createResource(512, 2, 1), createResource(1024, 1, 1)));
+    assertFalse(fitsIn(createResource(1024, 1, 1), createResource(512, 2, 1)));
+    assertFalse(fitsIn(createResource(512, 1, 2), createResource(512, 1, 1)));
+  }
+
+  @Test
+  public void testComponentwiseMin() {
+    assertEquals(createResource(1, 1),
+        componentwiseMin(createResource(1, 1), createResource(2, 2)));
+    assertEquals(createResource(1, 1),
+        componentwiseMin(createResource(2, 2), createResource(1, 1)));
+    assertEquals(createResource(1, 1),
+        componentwiseMin(createResource(1, 2), createResource(2, 1)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java
new file mode 100644
index 0000000..d0ab1c0
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.collect.Sets;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.SortedSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestFileFragment {
+  private Path path;
+  
+  @Before
+  public final void setUp() throws Exception {
+    path = CommonTestingUtil.getTestDir();
+  }
+
+  @Test
+  public final void testGetAndSetFields() {
+    FileFragment fragment1 = new FileFragment("table1_1", new Path(path, "table0"), 0, 500);
+
+    assertEquals("table1_1", fragment1.getTableName());
+    assertEquals(new Path(path, "table0"), fragment1.getPath());
+    assertTrue(0 == fragment1.getStartKey());
+    assertTrue(500 == fragment1.getLength());
+  }
+
+  @Test
+  public final void testGetProtoAndRestore() {
+    FileFragment fragment = new FileFragment("table1_1", new Path(path, "table0"), 0, 500);
+
+    FileFragment fragment1 = FragmentConvertor.convert(FileFragment.class, fragment.getProto());
+    assertEquals("table1_1", fragment1.getTableName());
+    assertEquals(new Path(path, "table0"), fragment1.getPath());
+    assertTrue(0 == fragment1.getStartKey());
+    assertTrue(500 == fragment1.getLength());
+  }
+
+  @Test
+  public final void testCompareTo() {
+    final int num = 10;
+    FileFragment[] tablets = new FileFragment[num];
+    for (int i = num - 1; i >= 0; i--) {
+      tablets[i] = new FileFragment("tablet1_"+i, new Path(path, "tablet0"), i * 500, (i+1) * 500);
+    }
+    
+    Arrays.sort(tablets);
+
+    for(int i = 0; i < num; i++) {
+      assertEquals("tablet1_"+i, tablets[i].getTableName());
+    }
+  }
+
+  @Test
+  public final void testCompareTo2() {
+    final int num = 1860;
+    FileFragment[] tablets = new FileFragment[num];
+    for (int i = num - 1; i >= 0; i--) {
+      tablets[i] = new FileFragment("tablet1_"+i, new Path(path, "tablet0"), (long)i * 6553500, (long)(i+1) * 6553500);
+    }
+
+    SortedSet sortedSet = Sets.newTreeSet();
+    for (FileFragment frag : tablets) {
+      sortedSet.add(frag);
+    }
+    assertEquals(num, sortedSet.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java
new file mode 100644
index 0000000..e45dd75
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableProto;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.FileUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestRowFile {
+  private static final Log LOG = LogFactory.getLog(TestRowFile.class);
+
+  private TajoTestingCluster cluster;
+  private TajoConf conf;
+
+  @Before
+  public void setup() throws Exception {
+    cluster = TpchTestBase.getInstance().getTestingCluster();
+    conf = cluster.getConfiguration();
+  }
+
+  @After
+  public void teardown() throws Exception {
+  }
+
+  @Test
+  public void test() throws IOException {
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("age", Type.INT8);
+    schema.addColumn("description", Type.TEXT);
+
+    TableMeta meta = CatalogUtil.newTableMeta("ROWFILE");
+
+    FileTablespace sm = (FileTablespace) TablespaceManager.get(cluster.getDefaultFileSystem().getUri()).get();
+
+    Path tablePath = new Path("/test");
+    Path metaPath = new Path(tablePath, ".meta");
+    Path dataPath = new Path(tablePath, "test.tbl");
+    FileSystem fs = sm.getFileSystem();
+    fs.mkdirs(tablePath);
+
+    FileUtil.writeProto(fs, metaPath, meta.getProto());
+
+    Appender appender = sm.getAppender(meta, schema, dataPath);
+    appender.enableStats();
+    appender.init();
+
+    int tupleNum = 200;
+    Tuple tuple;
+    Datum stringDatum = DatumFactory.createText("abcdefghijklmnopqrstuvwxyz");
+    Set<Integer> idSet = Sets.newHashSet();
+
+    tuple = new VTuple(3);
+    long start = System.currentTimeMillis();
+    for(int i = 0; i < tupleNum; i++) {
+      tuple.put(0, DatumFactory.createInt4(i + 1));
+      tuple.put(1, DatumFactory.createInt8(25l));
+      tuple.put(2, stringDatum);
+      appender.addTuple(tuple);
+      idSet.add(i+1);
+    }
+    appender.close();
+
+    TableStats stat = appender.getStats();
+    assertEquals(tupleNum, stat.getNumRows().longValue());
+
+    FileStatus file = fs.getFileStatus(dataPath);
+    TableProto proto = (TableProto) FileUtil.loadProto(
+        cluster.getDefaultFileSystem(), metaPath, TableProto.getDefaultInstance());
+    meta = new TableMeta(proto);
+    FileFragment fragment = new FileFragment("test.tbl", dataPath, 0, file.getLen());
+
+    int tupleCnt = 0;
+    start = System.currentTimeMillis();
+    Scanner scanner = sm.getScanner(meta, schema, fragment);
+    scanner.init();
+    while ((tuple=scanner.next()) != null) {
+      tupleCnt++;
+    }
+    scanner.close();
+
+    assertEquals(tupleNum, tupleCnt);
+
+    tupleCnt = 0;
+    long fileStart = 0;
+    long fileLen = file.getLen()/13;
+
+    for (int i = 0; i < 13; i++) {
+      fragment = new FileFragment("test.tbl", dataPath, fileStart, fileLen);
+      scanner = new RowFile.RowFileScanner(conf, schema, meta, fragment);
+      scanner.init();
+      while ((tuple=scanner.next()) != null) {
+        if (!idSet.remove(tuple.getInt4(0)) && LOG.isDebugEnabled()) {
+          LOG.debug("duplicated! " + tuple.getInt4(0));
+        }
+        tupleCnt++;
+      }
+      scanner.close();
+      fileStart += fileLen;
+      if (i == 11) {
+        fileLen = file.getLen() - fileStart;
+      }
+    }
+    assertEquals(tupleNum, tupleCnt);
+  }
+}


Mime
View raw message