tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [07/13] TAJO-353: Add Database support to Tajo. (hyunsik)
Date Tue, 25 Mar 2014 01:36:22 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
index 305548c..bb8192f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
@@ -24,10 +24,11 @@ import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.master.session.Session;
 
 import java.util.Stack;
 
-public class LogicalPlanVerifier extends BasicLogicalPlanVisitor<VerificationState, LogicalNode> {
+public class LogicalPlanVerifier extends BasicLogicalPlanVisitor<LogicalPlanVerifier.Context, LogicalNode> {
   private TajoConf conf;
   private CatalogService catalog;
 
@@ -36,6 +37,22 @@ public class LogicalPlanVerifier extends BasicLogicalPlanVisitor<VerificationSta
     this.catalog = catalog;
   }
 
+  public static class Context {
+    Session session;
+    VerificationState state;
+
+    public Context(Session session, VerificationState state) {
+      this.session = session;
+      this.state = state;
+    }
+  }
+
+  public VerificationState verify(Session session, VerificationState state, LogicalPlan plan) throws PlanningException {
+    Context context = new Context(session, state);
+    visit(context, plan, plan.getRootBlock());
+    return context.state;
+  }
+
   /**
    * It checks if an output schema of a projectable node and target's output data types are equivalent to each other.
    */
@@ -62,12 +79,12 @@ public class LogicalPlanVerifier extends BasicLogicalPlanVisitor<VerificationSta
   }
 
   @Override
-  public LogicalNode visitProjection(VerificationState state, LogicalPlan plan, LogicalPlan.QueryBlock block,
+  public LogicalNode visitProjection(Context state, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                      ProjectionNode node, Stack<LogicalNode> stack) throws PlanningException {
     super.visitProjection(state, plan, block, node, stack);
 
     for (Target target : node.getTargets()) {
-      ExprsVerifier.verify(state, node, target.getEvalTree());
+      ExprsVerifier.verify(state.state, node, target.getEvalTree());
     }
 
     verifyProjectableOutputSchema(node);
@@ -76,42 +93,42 @@ public class LogicalPlanVerifier extends BasicLogicalPlanVisitor<VerificationSta
   }
 
   @Override
-  public LogicalNode visitLimit(VerificationState state, LogicalPlan plan, LogicalPlan.QueryBlock block,
+  public LogicalNode visitLimit(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                 LimitNode node, Stack<LogicalNode> stack) throws PlanningException {
-    super.visitLimit(state, plan, block, node, stack);
+    super.visitLimit(context, plan, block, node, stack);
 
     if (node.getFetchFirstNum() < 0) {
-      state.addVerification("LIMIT must not be negative");
+      context.state.addVerification("LIMIT must not be negative");
     }
 
     return node;
   }
 
   @Override
-  public LogicalNode visitGroupBy(VerificationState state, LogicalPlan plan, LogicalPlan.QueryBlock block,
+  public LogicalNode visitGroupBy(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                   GroupbyNode node, Stack<LogicalNode> stack) throws PlanningException {
-    super.visitGroupBy(state, plan, block, node, stack);
+    super.visitGroupBy(context, plan, block, node, stack);
 
     verifyProjectableOutputSchema(node);
     return node;
   }
 
   @Override
-  public LogicalNode visitFilter(VerificationState state, LogicalPlan plan, LogicalPlan.QueryBlock block,
+  public LogicalNode visitFilter(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                  SelectionNode node, Stack<LogicalNode> stack) throws PlanningException {
-    visit(state, plan, block, node.getChild(), stack);
-    ExprsVerifier.verify(state, node, node.getQual());
+    visit(context, plan, block, node.getChild(), stack);
+    ExprsVerifier.verify(context.state, node, node.getQual());
     return node;
   }
 
   @Override
-  public LogicalNode visitJoin(VerificationState state, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode node,
+  public LogicalNode visitJoin(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode node,
                                Stack<LogicalNode> stack) throws PlanningException {
-    visit(state, plan, block, node.getLeftChild(), stack);
-    visit(state, plan, block, node.getRightChild(), stack);
+    visit(context, plan, block, node.getLeftChild(), stack);
+    visit(context, plan, block, node.getRightChild(), stack);
 
     if (node.hasJoinQual()) {
-      ExprsVerifier.verify(state, node, node.getJoinQual());
+      ExprsVerifier.verify(context.state, node, node.getJoinQual());
     }
 
     verifyProjectableOutputSchema(node);
@@ -143,40 +160,40 @@ public class LogicalPlanVerifier extends BasicLogicalPlanVisitor<VerificationSta
   }
 
   @Override
-  public LogicalNode visitUnion(VerificationState state, LogicalPlan plan, LogicalPlan.QueryBlock block,
+  public LogicalNode visitUnion(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                 UnionNode node, Stack<LogicalNode> stack) throws PlanningException {
-    super.visitUnion(state, plan, block, node, stack);
-    verifySetStatement(state, node);
+    super.visitUnion(context, plan, block, node, stack);
+    verifySetStatement(context.state, node);
     return node;
   }
 
   @Override
-  public LogicalNode visitExcept(VerificationState state, LogicalPlan plan, LogicalPlan.QueryBlock block,
+  public LogicalNode visitExcept(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                  ExceptNode node, Stack<LogicalNode> stack) throws PlanningException {
-    super.visitExcept(state, plan, block, node, stack);
-    verifySetStatement(state, node);
+    super.visitExcept(context, plan, block, node, stack);
+    verifySetStatement(context.state, node);
     return node;
   }
 
   @Override
-  public LogicalNode visitIntersect(VerificationState state, LogicalPlan plan, LogicalPlan.QueryBlock block,
+  public LogicalNode visitIntersect(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                     IntersectNode node, Stack<LogicalNode> stack) throws PlanningException {
-    super.visitIntersect(state, plan, block, node, stack);
-    verifySetStatement(state, node);
+    super.visitIntersect(context, plan, block, node, stack);
+    verifySetStatement(context.state, node);
     return node;
   }
 
   @Override
-  public LogicalNode visitScan(VerificationState state, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode node,
+  public LogicalNode visitScan(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode node,
                                Stack<LogicalNode> stack) throws PlanningException {
     if (node.hasTargets()) {
       for (Target target : node.getTargets()) {
-        ExprsVerifier.verify(state, node, target.getEvalTree());
+        ExprsVerifier.verify(context.state, node, target.getEvalTree());
       }
     }
 
     if (node.hasQual()) {
-      ExprsVerifier.verify(state, node, node.getQual());
+      ExprsVerifier.verify(context.state, node, node.getQual());
     }
 
     verifyProjectableOutputSchema(node);
@@ -185,16 +202,16 @@ public class LogicalPlanVerifier extends BasicLogicalPlanVisitor<VerificationSta
   }
 
   @Override
-  public LogicalNode visitStoreTable(VerificationState state, LogicalPlan plan, LogicalPlan.QueryBlock block,
+  public LogicalNode visitStoreTable(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                      StoreTableNode node, Stack<LogicalNode> stack) throws PlanningException {
-    super.visitStoreTable(state, plan, block, node, stack);
+    super.visitStoreTable(context, plan, block, node, stack);
     return node;
   }
 
   @Override
-  public LogicalNode visitInsert(VerificationState state, LogicalPlan plan, LogicalPlan.QueryBlock block,
+  public LogicalNode visitInsert(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                  InsertNode node, Stack<LogicalNode> stack) throws PlanningException {
-    super.visitInsert(state, plan, block, node, stack);
+    super.visitInsert(context, plan, block, node, stack);
     return node;
   }
 
@@ -216,24 +233,17 @@ public class LogicalPlanVerifier extends BasicLogicalPlanVisitor<VerificationSta
   }
 
   @Override
-  public LogicalNode visitCreateTable(VerificationState state, LogicalPlan plan, LogicalPlan.QueryBlock block,
+  public LogicalNode visitCreateTable(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                       CreateTableNode node, Stack<LogicalNode> stack) throws PlanningException {
-    super.visitCreateTable(state, plan, block, node, stack);
-
-    if (catalog.existsTable(node.getTableName())) {
-      state.addVerification("relation \"" + node.getTableName() + "\" already exists");
-    }
-
+    super.visitCreateTable(context, plan, block, node, stack);
+    // here, we don't need check table existence because this check is performed in PreLogicalPlanVerifier.
     return node;
   }
 
   @Override
-  public LogicalNode visitDropTable(VerificationState state, LogicalPlan plan, LogicalPlan.QueryBlock block,
+  public LogicalNode visitDropTable(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                     DropTableNode node, Stack<LogicalNode> stack) {
-    if (!catalog.existsTable(node.getTableName())) {
-      state.addVerification("table \"" + node.getTableName() + "\" does not exist");
-    }
-
+    // here, we don't need check table existence because this check is performed in PreLogicalPlanVerifier.
     return node;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
index bf41996..76454b9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
@@ -71,6 +71,12 @@ public interface LogicalPlanVisitor<CONTEXT, RESULT> {
   RESULT visitInsert(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, InsertNode node,
                      Stack<LogicalNode> stack) throws PlanningException;
 
+  RESULT visitCreateDatabase(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, CreateDatabaseNode node,
+                          Stack<LogicalNode> stack) throws PlanningException;
+
+  RESULT visitDropDatabase(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, DropDatabaseNode node,
+                             Stack<LogicalNode> stack) throws PlanningException;
+
   RESULT visitCreateTable(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, CreateTableNode node,
                           Stack<LogicalNode> stack) throws PlanningException;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 01bacc0..48e3c93 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -42,6 +42,7 @@ import org.apache.tajo.engine.planner.LogicalPlan.QueryBlock;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
 import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.master.session.Session;
 import org.apache.tajo.util.TUtil;
 
 import java.util.*;
@@ -69,6 +70,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   }
 
   public class PlanContext {
+    Session session;
     LogicalPlan plan;
 
     // transient data for each query block
@@ -76,13 +78,15 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
     boolean debugOrUnitTests;
 
-    public PlanContext(LogicalPlan plan, QueryBlock block, boolean debugOrUnitTests) {
+    public PlanContext(Session session, LogicalPlan plan, QueryBlock block, boolean debugOrUnitTests) {
+      this.session = session;
       this.plan = plan;
       this.queryBlock = block;
       this.debugOrUnitTests = debugOrUnitTests;
     }
 
     public PlanContext(PlanContext context, QueryBlock block) {
+      this.session = context.session;
       this.plan = context.plan;
       this.queryBlock = block;
       this.debugOrUnitTests = context.debugOrUnitTests;
@@ -100,20 +104,20 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
    * @param expr A relational algebraic expression for a query.
    * @return A logical plan
    */
-  public LogicalPlan createPlan(Expr expr) throws PlanningException {
-    return createPlan(expr, false);
+  public LogicalPlan createPlan(Session session, Expr expr) throws PlanningException {
+    return createPlan(session, expr, false);
   }
 
   @VisibleForTesting
-  public LogicalPlan createPlan(Expr expr, boolean debug) throws PlanningException {
+  public LogicalPlan createPlan(Session session, Expr expr, boolean debug) throws PlanningException {
 
-    LogicalPlan plan = new LogicalPlan(this);
+    LogicalPlan plan = new LogicalPlan(session.getCurrentDatabase(), this);
 
     QueryBlock rootBlock = plan.newAndGetBlock(LogicalPlan.ROOT_BLOCK);
-    PreprocessContext preProcessorCtx = new PreprocessContext(plan, rootBlock);
+    PreprocessContext preProcessorCtx = new PreprocessContext(session, plan, rootBlock);
     preprocessor.visit(preProcessorCtx, new Stack<Expr>(), expr);
 
-    PlanContext context = new PlanContext(plan, plan.getRootBlock(), debug);
+    PlanContext context = new PlanContext(session, plan, plan.getRootBlock(), debug);
     LogicalNode topMostNode = this.visit(context, new Stack<Expr>(), expr);
 
     // Add Root Node
@@ -1148,7 +1152,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   private InsertNode buildInsertIntoTablePlan(PlanContext context, InsertNode insertNode, Insert expr)
       throws PlanningException {
     // Get and set a target table
-    TableDesc desc = catalog.getTableDesc(expr.getTableName());
+    TableDesc desc = catalog.getTableDesc(context.session.getCurrentDatabase(), expr.getTableName());
     insertNode.setTargetTable(desc);
 
     //
@@ -1272,13 +1276,36 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
    ===============================================================================================*/
 
   @Override
+  public LogicalNode visitCreateDatabase(PlanContext context, Stack<Expr> stack, CreateDatabase expr)
+      throws PlanningException {
+    CreateDatabaseNode createDatabaseNode = context.queryBlock.getNodeFromExpr(expr);
+    createDatabaseNode.init(expr.getDatabaseName(), expr.isIfNotExists());
+    return createDatabaseNode;
+  }
+
+  @Override
+  public LogicalNode visitDropDatabase(PlanContext context, Stack<Expr> stack, DropDatabase expr)
+      throws PlanningException {
+    DropDatabaseNode dropDatabaseNode = context.plan.createNode(DropDatabaseNode.class);
+    dropDatabaseNode.init(expr.getDatabaseName(), expr.isIfExists());
+    return dropDatabaseNode;
+  }
+
+  @Override
   public LogicalNode visitCreateTable(PlanContext context, Stack<Expr> stack, CreateTable expr)
       throws PlanningException {
 
     CreateTableNode createTableNode = context.queryBlock.getNodeFromExpr(expr);
+    createTableNode.setIfNotExists(expr.isIfNotExists());
 
     // Set a table name to be created.
-    createTableNode.setTableName(expr.getTableName());
+    if (CatalogUtil.isFQTableName(expr.getTableName())) {
+      createTableNode.setTableName(expr.getTableName());
+    } else {
+      createTableNode.setTableName(
+          CatalogUtil.buildFQName(context.session.getCurrentDatabase(), expr.getTableName()));
+    }
+
 
     if (expr.hasStorageType()) { // If storage type (using clause) is specified
       createTableNode.setStorageType(CatalogUtil.getStoreType(expr.getStorageType()));
@@ -1359,18 +1386,16 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   private PartitionMethodDesc getPartitionMethod(PlanContext context,
                                                  String tableName,
                                                  CreateTable.PartitionMethodDescExpr expr) throws PlanningException {
-    PartitionMethodDesc partitionMethodDesc = new PartitionMethodDesc();
-    partitionMethodDesc.setTableId(tableName);
+    PartitionMethodDesc partitionMethodDesc;
 
     if(expr.getPartitionType() == PartitionType.COLUMN) {
       CreateTable.ColumnPartition partition = (CreateTable.ColumnPartition) expr;
       String partitionExpression = Joiner.on(',').join(partition.getColumns());
-      partitionMethodDesc.setPartitionType(CatalogProtos.PartitionType.COLUMN);
-      partitionMethodDesc.setExpression(partitionExpression);
-      partitionMethodDesc.setExpressionSchema(convertColumnsToSchema(partition.getColumns()));
+
+      partitionMethodDesc = new PartitionMethodDesc(context.session.getCurrentDatabase(), tableName,
+          CatalogProtos.PartitionType.COLUMN, partitionExpression, convertColumnsToSchema(partition.getColumns()));
     } else {
-      throw new PlanningException(String.format("Not supported PartitonType: %s",
-          expr.getPartitionType()));
+      throw new PlanningException(String.format("Not supported PartitonType: %s", expr.getPartitionType()));
     }
     return partitionMethodDesc;
   }
@@ -1426,7 +1451,13 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   @Override
   public LogicalNode visitDropTable(PlanContext context, Stack<Expr> stack, DropTable dropTable) {
     DropTableNode dropTableNode = context.queryBlock.getNodeFromExpr(dropTable);
-    dropTableNode.init(dropTable.getTableName(), dropTable.isPurge());
+    String qualified;
+    if (CatalogUtil.isFQTableName(dropTable.getTableName())) {
+      qualified = dropTable.getTableName();
+    } else {
+      qualified = CatalogUtil.buildFQName(context.session.getCurrentDatabase(), dropTable.getTableName());
+    }
+    dropTableNode.init(qualified, dropTable.isIfExists(), dropTable.isPurge());
     return dropTableNode;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index a928fb5..fbd65a9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -21,10 +21,7 @@ package org.apache.tajo.engine.planner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import org.apache.tajo.algebra.CountRowsFunctionExpr;
-import org.apache.tajo.algebra.Expr;
-import org.apache.tajo.algebra.GeneralSetFunctionExpr;
-import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.algebra.*;
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
@@ -48,7 +45,12 @@ public class PlannerUtil {
       baseNode = ((LogicalRootNode) node).getChild();
     }
 
-    return (baseNode.getType() == NodeType.CREATE_TABLE && !((CreateTableNode)baseNode).hasSubQuery()) ||
+    NodeType type = baseNode.getType();
+
+    return
+        type == NodeType.CREATE_DATABASE ||
+        type == NodeType.DROP_DATABASE ||
+        (type == NodeType.CREATE_TABLE && !((CreateTableNode)baseNode).hasSubQuery()) ||
         baseNode.getType() == NodeType.DROP_TABLE;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
index 508740f..fef9dc1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
@@ -21,22 +21,40 @@ package org.apache.tajo.engine.planner;
 import com.google.common.collect.ObjectArrays;
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.master.session.Session;
 import org.apache.tajo.util.TUtil;
 
 import java.util.Arrays;
 import java.util.Set;
 import java.util.Stack;
 
-public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <VerificationState, Expr> {
+public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <PreLogicalPlanVerifier.Context, Expr> {
   private CatalogService catalog;
 
   public PreLogicalPlanVerifier(CatalogService catalog) {
     this.catalog = catalog;
   }
 
-  public Expr visitProjection(VerificationState state, Stack<Expr> stack, Projection expr) throws PlanningException {
-    super.visitProjection(state, stack, expr);
+  public static class Context {
+    Session session;
+    VerificationState state;
+
+    public Context(Session session, VerificationState state) {
+      this.session = session;
+      this.state = state;
+    }
+  }
+
+  public VerificationState verify(Session session, VerificationState state, Expr expr) throws PlanningException {
+    Context context = new Context(session, state);
+    visit(context, new Stack<Expr>(), expr);
+    return context.state;
+  }
+
+  public Expr visitProjection(Context context, Stack<Expr> stack, Projection expr) throws PlanningException {
+    super.visitProjection(context, stack, expr);
 
     Set<String> names = TUtil.newHashSet();
     Expr [] distinctValues = null;
@@ -45,7 +63,8 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <VerificationStat
 
       if (namedExpr.hasAlias()) {
         if (names.contains(namedExpr.getAlias())) {
-          state.addVerification(String.format("column name \"%s\" specified more than once", namedExpr.getAlias()));
+          context.state.addVerification(String.format("column name \"%s\" specified more than once",
+              namedExpr.getAlias()));
         } else {
           names.add(namedExpr.getAlias());
         }
@@ -92,13 +111,13 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <VerificationStat
   }
 
   @Override
-  public Expr visitGroupBy(VerificationState ctx, Stack<Expr> stack, Aggregation expr) throws PlanningException {
-    super.visitGroupBy(ctx, stack, expr);
+  public Expr visitGroupBy(Context context, Stack<Expr> stack, Aggregation expr) throws PlanningException {
+    super.visitGroupBy(context, stack, expr);
 
     // Enforcer only ordinary grouping set.
     for (Aggregation.GroupElement groupingElement : expr.getGroupSet()) {
       if (groupingElement.getType() != Aggregation.GroupType.OrdinaryGroup) {
-        ctx.addVerification(groupingElement.getType() + " is not supported yet");
+        context.state.addVerification(groupingElement.getType() + " is not supported yet");
       }
     }
 
@@ -118,22 +137,37 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <VerificationStat
   }
 
   @Override
-  public Expr visitRelation(VerificationState state, Stack<Expr> stack, Relation expr) throws PlanningException {
-    assertRelationExistence(state, expr.getName());
+  public Expr visitRelation(Context context, Stack<Expr> stack, Relation expr) throws PlanningException {
+    assertRelationExistence(context, expr.getName());
     return expr;
   }
 
-  private boolean assertRelationExistence(VerificationState state, String name) {
-    if (!catalog.existsTable(name)) {
-      state.addVerification(String.format("relation \"%s\" does not exist", name));
+  private boolean assertRelationExistence(Context context, String tableName) {
+    String qualifiedName;
+
+    if (CatalogUtil.isFQTableName(tableName)) {
+      qualifiedName = tableName;
+    } else {
+      qualifiedName = CatalogUtil.buildFQName(context.session.getCurrentDatabase(), tableName);
+    }
+
+    if (!catalog.existsTable(qualifiedName)) {
+      context.state.addVerification(String.format("relation \"%s\" does not exist", qualifiedName));
       return false;
     }
     return true;
   }
 
-  private boolean assertRelationNoExistence(VerificationState state, String name) {
-    if (catalog.existsTable(name)) {
-      state.addVerification(String.format("relation \"%s\" already exists", name));
+  private boolean assertRelationNoExistence(Context context, String tableName) {
+    String qualifiedName;
+
+    if (CatalogUtil.isFQTableName(tableName)) {
+      qualifiedName = tableName;
+    } else {
+      qualifiedName = CatalogUtil.buildFQName(context.session.getCurrentDatabase(), tableName);
+    }
+    if (catalog.existsTable(qualifiedName)) {
+      context.state.addVerification(String.format("relation \"%s\" already exists", qualifiedName));
       return false;
     }
     return true;
@@ -147,15 +181,62 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <VerificationStat
     return true;
   }
 
+  private boolean assertDatabaseExistence(VerificationState state, String name) {
+    if (!catalog.existDatabase(name)) {
+      state.addVerification(String.format("database \"%s\" does not exist", name));
+      return false;
+    }
+    return true;
+  }
+
+  private boolean assertDatabaseNoExistence(VerificationState state, String name) {
+    if (catalog.existDatabase(name)) {
+      state.addVerification(String.format("database \"%s\" already exists", name));
+      return false;
+    }
+    return true;
+  }
+
   ///////////////////////////////////////////////////////////////////////////////////////////////////////////
   // Data Definition Language Section
   ///////////////////////////////////////////////////////////////////////////////////////////////////////////
 
+
+  @Override
+  public Expr visitCreateDatabase(Context context, Stack<Expr> stack, CreateDatabase expr)
+      throws PlanningException {
+    super.visitCreateDatabase(context, stack, expr);
+    if (!expr.isIfNotExists()) {
+      assertDatabaseNoExistence(context.state, expr.getDatabaseName());
+    }
+    return expr;
+  }
+
   @Override
-  public Expr visitCreateTable(VerificationState state, Stack<Expr> stack, CreateTable expr) throws PlanningException {
-    super.visitCreateTable(state, stack, expr);
-    assertRelationNoExistence(state, expr.getTableName());
-    assertUnsupportedStoreType(state, expr.getStorageType());
+  public Expr visitDropDatabase(Context context, Stack<Expr> stack, DropDatabase expr) throws PlanningException {
+    super.visitDropDatabase(context, stack, expr);
+    if (!expr.isIfExists()) {
+      assertDatabaseExistence(context.state, expr.getDatabaseName());
+    }
+    return expr;
+  }
+
+  @Override
+  public Expr visitCreateTable(Context context, Stack<Expr> stack, CreateTable expr) throws PlanningException {
+    super.visitCreateTable(context, stack, expr);
+    if (!expr.isIfNotExists()) {
+      assertRelationNoExistence(context, expr.getTableName());
+    }
+    assertUnsupportedStoreType(context.state, expr.getStorageType());
+    return expr;
+  }
+
+  @Override
+  public Expr visitDropTable(Context context, Stack<Expr> stack, DropTable expr) throws PlanningException {
+    super.visitDropTable(context, stack, expr);
+    if (!expr.isIfExists()) {
+      assertRelationExistence(context, expr.getTableName());
+    }
     return expr;
   }
 
@@ -163,11 +244,11 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <VerificationStat
   // Insert or Update Section
   ///////////////////////////////////////////////////////////////////////////////////////////////////////////
 
-  public Expr visitInsert(VerificationState state, Stack<Expr> stack, Insert expr) throws PlanningException {
-    Expr child = super.visitInsert(state, stack, expr);
+  public Expr visitInsert(Context context, Stack<Expr> stack, Insert expr) throws PlanningException {
+    Expr child = super.visitInsert(context, stack, expr);
 
     if (expr.hasTableName()) {
-      assertRelationExistence(state, expr.getTableName());
+      assertRelationExistence(context, expr.getTableName());
     }
 
     if (child != null && child.getType() == OpType.Projection) {
@@ -177,9 +258,9 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <VerificationStat
         int targetColumnNum = expr.getTargetColumns().length;
 
         if (targetColumnNum > projectColumnNum)  {
-          state.addVerification("INSERT has more target columns than expressions");
+          context.state.addVerification("INSERT has more target columns than expressions");
         } else if (targetColumnNum < projectColumnNum) {
-          state.addVerification("INSERT has more expressions than target columns");
+          context.state.addVerification("INSERT has more expressions than target columns");
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/VerificationState.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/VerificationState.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/VerificationState.java
index e1b136d..18882b8 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/VerificationState.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/VerificationState.java
@@ -19,13 +19,18 @@
 package org.apache.tajo.engine.planner;
 
 import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.TUtil;
 
 import java.util.List;
 
 public class VerificationState {
+  private static final Log LOG = LogFactory.getLog(VerificationState.class);
   List<String> errorMessages = Lists.newArrayList();
 
   public void addVerification(String error) {
+    LOG.warn(TUtil.getCurrentCodePoint(1) + " causes: " + error);
     errorMessages.add(error);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 461c5d5..73de079 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -129,7 +129,7 @@ public class GlobalPlanner {
     }
 
     masterPlan.setTerminal(terminalBlock);
-    LOG.info(masterPlan);
+    LOG.info(masterPlan.toString());
   }
 
   private static void setFinalOutputChannel(DataChannel outputChannel, Schema outputSchema) {
@@ -141,7 +141,8 @@ public class GlobalPlanner {
 
   public static ScanNode buildInputExecutor(LogicalPlan plan, DataChannel channel) {
     Preconditions.checkArgument(channel.getSchema() != null,
-        "Channel schema (" + channel.getSrcId().getId() + " -> " + channel.getTargetId().getId() + ") is not initialized");
+        "Channel schema (" + channel.getSrcId().getId() + " -> " + channel.getTargetId().getId() +
+            ") is not initialized");
     TableMeta meta = new TableMeta(channel.getStoreType(), new Options());
     TableDesc desc = new TableDesc(channel.getSrcId().toString(), channel.getSchema(), meta, new Path("/"));
     ScanNode scanNode = plan.createNode(ScanNode.class);

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateDatabaseNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateDatabaseNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateDatabaseNode.java
new file mode 100644
index 0000000..e0b2f0f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateDatabaseNode.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.common.base.Objects;
+import org.apache.tajo.engine.planner.PlanString;
+
+public class CreateDatabaseNode extends LogicalNode implements Cloneable {
+  private String databaseName;
+  private boolean ifNotExists;
+
+  public CreateDatabaseNode(int pid) {
+    super(pid, NodeType.CREATE_DATABASE);
+  }
+
+  public void init(String databaseName, boolean ifNotExists) {
+    this.databaseName = databaseName;
+    this.ifNotExists = ifNotExists;
+  }
+
+  public String getDatabaseName() {
+    return this.databaseName;
+  }
+
+  public boolean isIfNotExists() {
+    return ifNotExists;
+  }
+
+  @Override
+  public PlanString getPlanString() {
+    return new PlanString(this).appendTitle(ifNotExists ? " IF NOT EXISTS " : " ").appendTitle(databaseName);
+  }
+
+  public int hashCode() {
+    return Objects.hashCode(databaseName, ifNotExists);
+  }
+
+  public boolean equals(Object obj) {
+    if (obj instanceof CreateDatabaseNode) {
+      CreateDatabaseNode other = (CreateDatabaseNode) obj;
+      return super.equals(other) && this.databaseName.equals(other.databaseName) && ifNotExists == other.ifNotExists;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    CreateDatabaseNode newNode = (CreateDatabaseNode) super.clone();
+    newNode.databaseName = databaseName;
+    newNode.ifNotExists = ifNotExists;
+    return newNode;
+  }
+
+  @Override
+  public String toString() {
+    return "CREATE DATABASE " + (ifNotExists ? " IF NOT EXISTS " : "") + databaseName;
+  }
+
+  @Override
+  public void preOrder(LogicalNodeVisitor visitor) {
+    visitor.visit(this);
+  }
+
+  @Override
+  public void postOrder(LogicalNodeVisitor visitor) {
+    visitor.visit(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
index b8e7143..c70fb10 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.engine.planner.logical;
 
+import com.google.common.base.Objects;
 import com.google.gson.annotations.Expose;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Options;
@@ -29,6 +30,7 @@ public class CreateTableNode extends StoreTableNode implements Cloneable {
   @Expose private Schema schema;
   @Expose private Path path;
   @Expose private boolean external;
+  @Expose private boolean ifNotExists;
 
   public CreateTableNode(int pid) {
     super(pid, NodeType.CREATE_TABLE);
@@ -76,10 +78,22 @@ public class CreateTableNode extends StoreTableNode implements Cloneable {
     return child != null;
   }
 
+  public void setIfNotExists(boolean ifNotExists) {
+    this.ifNotExists = ifNotExists;
+  }
+
+  public boolean isIfNotExists() {
+    return ifNotExists;
+  }
+
   @Override
   public PlanString getPlanString() {
     return new PlanString(this);
   }
+
+  public int hashCode() {
+    return super.hashCode() ^ Objects.hashCode(schema, path, external, ifNotExists) * 31;
+  }
   
   @Override
   public boolean equals(Object obj) {
@@ -88,7 +102,8 @@ public class CreateTableNode extends StoreTableNode implements Cloneable {
       return super.equals(other)
           && this.schema.equals(other.schema)
           && this.external == other.external
-          && TUtil.checkEquals(path, other.path);
+          && TUtil.checkEquals(path, other.path)
+          && ifNotExists == other.ifNotExists;
     } else {
       return false;
     }
@@ -96,18 +111,20 @@ public class CreateTableNode extends StoreTableNode implements Cloneable {
   
   @Override
   public Object clone() throws CloneNotSupportedException {
-    CreateTableNode store = (CreateTableNode) super.clone();
-    store.tableName = tableName;
-    store.schema = (Schema) schema.clone();
-    store.storageType = storageType;
-    store.external = external;
-    store.path = path != null ? new Path(path.toString()) : null;
-    store.options = (Options) (options != null ? options.clone() : null);
-    return store;
+    CreateTableNode createTableNode = (CreateTableNode) super.clone();
+    createTableNode.tableName = tableName;
+    createTableNode.schema = (Schema) schema.clone();
+    createTableNode.storageType = storageType;
+    createTableNode.external = external;
+    createTableNode.path = path != null ? new Path(path.toString()) : null;
+    createTableNode.options = (Options) (options != null ? options.clone() : null);
+    createTableNode.ifNotExists = ifNotExists;
+    return createTableNode;
   }
 
   public String toString() {
-    return "CreateTable (table=" + tableName + ", external=" + external + ", storeType=" + storageType + ")";
+    return "CreateTable (table=" + tableName + ", external=" + external + ", storeType=" + storageType +
+        ", ifNotExists=" + ifNotExists +")";
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/DropDatabaseNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/DropDatabaseNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/DropDatabaseNode.java
new file mode 100644
index 0000000..1578759
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/DropDatabaseNode.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.engine.planner.PlanString;
+
+public class DropDatabaseNode extends LogicalNode implements Cloneable {
+  @Expose private String databaseName;
+  @Expose private boolean ifExists;
+
+  public DropDatabaseNode(int pid) {
+    super(pid, NodeType.DROP_DATABASE);
+  }
+
+  public void init(String databaseName, boolean ifExists) {
+    this.databaseName = databaseName;
+    this.ifExists = ifExists;
+  }
+
+  public String getDatabaseName() {
+    return this.databaseName;
+  }
+
+  public boolean isIfExists() {
+    return ifExists;
+  }
+
+  @Override
+  public PlanString getPlanString() {
+    return new PlanString(this).appendTitle(ifExists ? " IF EXISTS " : " ").appendTitle(databaseName);
+  }
+
+  public int hashCode() {
+    return Objects.hashCode(databaseName, ifExists);
+  }
+
+  public boolean equals(Object obj) {
+    if (obj instanceof DropDatabaseNode) {
+      DropDatabaseNode other = (DropDatabaseNode) obj;
+      return super.equals(other) && this.databaseName.equals(other.databaseName) && ifExists == other.ifExists;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    DropDatabaseNode dropTableNode = (DropDatabaseNode) super.clone();
+    dropTableNode.databaseName = databaseName;
+    return dropTableNode;
+  }
+
+  @Override
+  public String toString() {
+    return "DROP DATABASE " + (ifExists ? "IF EXISTS ":"") + databaseName;
+  }
+
+  @Override
+  public void preOrder(LogicalNodeVisitor visitor) {
+    visitor.visit(this);
+  }
+
+  @Override
+  public void postOrder(LogicalNodeVisitor visitor) {
+    visitor.visit(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/DropTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/DropTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/DropTableNode.java
index 0c6675f..ac68a9c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/DropTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/DropTableNode.java
@@ -18,18 +18,21 @@
 
 package org.apache.tajo.engine.planner.logical;
 
+import com.google.common.base.Objects;
 import org.apache.tajo.engine.planner.PlanString;
 
-public class DropTableNode extends LogicalNode {
+public class DropTableNode extends LogicalNode implements Cloneable {
   private String tableName;
+  private boolean ifExists;
   private boolean purge;
 
   public DropTableNode(int pid) {
     super(pid, NodeType.DROP_TABLE);
   }
 
-  public void init(String tableName, boolean purge) {
+  public void init(String tableName, boolean ifExists, boolean purge) {
     this.tableName = tableName;
+    this.ifExists = ifExists;
     this.purge = purge;
   }
 
@@ -37,19 +40,30 @@ public class DropTableNode extends LogicalNode {
     return this.tableName;
   }
 
+  public boolean isIfExists() {
+    return this.ifExists;
+  }
+
   public boolean isPurge() {
     return this.purge;
   }
 
   @Override
   public PlanString getPlanString() {
-    return new PlanString(this).appendTitle(purge ? " (PURGE)" : "");
+    return new PlanString(this).appendTitle(ifExists ? " IF EXISTS" : "").appendTitle(purge ? " PURGE" : "");
+  }
+
+  public int hashCode() {
+    return Objects.hashCode(tableName, ifExists, purge);
   }
 
   public boolean equals(Object obj) {
     if (obj instanceof DropTableNode) {
       DropTableNode other = (DropTableNode) obj;
-      return super.equals(other) && this.tableName.equals(other.tableName) && this.purge == other.purge;
+      return super.equals(other) &&
+          this.tableName.equals(other.tableName) &&
+          this.ifExists == other.ifExists &&
+          this.purge == other.purge;
     } else {
       return false;
     }
@@ -59,13 +73,14 @@ public class DropTableNode extends LogicalNode {
   public Object clone() throws CloneNotSupportedException {
     DropTableNode dropTableNode = (DropTableNode) super.clone();
     dropTableNode.tableName = tableName;
+    dropTableNode.ifExists = ifExists;
     dropTableNode.purge = purge;
     return dropTableNode;
   }
 
   @Override
   public String toString() {
-    return "DROP TABLE " + tableName + (purge ? " PURGE" : "");
+    return "DROP TABLE " + (ifExists ? "IF EXISTS " : "") + tableName + (purge ? " PURGE" : "");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
index acaf85b..2b453fb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
@@ -43,6 +43,9 @@ public enum NodeType {
   BST_INDEX_SCAN(IndexScanNode.class),
   STORE(StoreTableNode.class),
   INSERT(InsertNode.class),
+
+  CREATE_DATABASE(CreateDatabaseNode.class),
+  DROP_DATABASE(DropDatabaseNode.class),
   CREATE_TABLE(CreateTableNode.class),
   DROP_TABLE(DropTableNode.class)
   ;

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
index 80a0d7a..8e4911f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
@@ -30,7 +30,7 @@ import org.apache.tajo.engine.planner.Target;
 import org.apache.tajo.engine.utils.SchemaUtil;
 import org.apache.tajo.util.TUtil;
 
-public class ScanNode extends RelationNode implements Projectable {
+public class ScanNode extends RelationNode implements Projectable, Cloneable {
 	@Expose protected TableDesc tableDesc;
   @Expose protected String alias;
   @Expose protected Schema logicalSchema;
@@ -55,10 +55,18 @@ public class ScanNode extends RelationNode implements Projectable {
 	public void init(TableDesc desc, String alias) {
     this.tableDesc = desc;
     this.alias = CatalogUtil.normalizeIdentifier(alias);
+
+    if (!CatalogUtil.isFQTableName(this.tableDesc.getName())) {
+      throw new IllegalArgumentException("the name in TableDesc must be qualified, but it is \"" +
+          desc.getName() + "\"");
+    }
+
+    String databaseName = CatalogUtil.extractQualifier(this.tableDesc.getName());
+    String qualifiedAlias = CatalogUtil.buildFQName(databaseName, alias);
     this.setInSchema(tableDesc.getSchema());
-    this.getInSchema().setQualifier(alias);
+    this.getInSchema().setQualifier(qualifiedAlias);
     this.setOutSchema(new Schema(getInSchema()));
-    logicalSchema = SchemaUtil.getQualifiedLogicalSchema(tableDesc, alias);
+    logicalSchema = SchemaUtil.getQualifiedLogicalSchema(tableDesc, qualifiedAlias);
 	}
 	
 	public String getTableName() {
@@ -70,7 +78,12 @@ public class ScanNode extends RelationNode implements Projectable {
 	}
 
   public String getCanonicalName() {
-    return hasAlias() ? alias : tableDesc.getName();
+    if (CatalogUtil.isFQTableName(this.tableDesc.getName())) {
+      String databaseName = CatalogUtil.extractQualifier(this.tableDesc.getName());
+      return hasAlias() ? CatalogUtil.buildFQName(databaseName, alias) : tableDesc.getName();
+    } else {
+      return hasAlias() ? alias : tableDesc.getName();
+    }
   }
 
   public Schema getTableSchema() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryContext.java
index 1db4c4b..5076b87 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryContext.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryContext.java
@@ -28,7 +28,6 @@ import org.apache.tajo.engine.planner.logical.NodeType;
 import static org.apache.tajo.catalog.proto.CatalogProtos.KeyValueSetProto;
 
 public class QueryContext extends Options {
-
   public static final String COMMAND_TYPE = "tajo.query.command";
 
   public static final String STAGING_DIR = "tajo.query.staging_dir";

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/ProtoUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/ProtoUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/ProtoUtil.java
deleted file mode 100644
index 93eca22..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/ProtoUtil.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.utils;
-
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
-
-public class ProtoUtil {
-  public static StringProto newProto(String val) {
-    StringProto.Builder builder = StringProto.newBuilder();
-    builder.setValue(val);
-    return builder.build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index d7ded93..d0e8dc4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -29,9 +29,9 @@ import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.exception.AlreadyExistsTableException;
-import org.apache.tajo.catalog.exception.NoSuchTableException;
+import org.apache.tajo.catalog.exception.*;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
@@ -46,14 +46,16 @@ import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.master.TajoMaster.MasterContext;
 import org.apache.tajo.master.querymaster.QueryInfo;
 import org.apache.tajo.master.querymaster.QueryJobManager;
+import org.apache.tajo.master.session.Session;
 import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.StorageUtil;
 
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Stack;
 
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
 import static org.apache.tajo.ipc.ClientProtos.GetQueryStatusResponse;
 
 public class GlobalEngine extends AbstractService {
@@ -101,7 +103,7 @@ public class GlobalEngine extends AbstractService {
     super.stop();
   }
 
-  public GetQueryStatusResponse executeQuery(String sql)
+  public GetQueryStatusResponse executeQuery(Session session, String sql)
       throws InterruptedException, IOException, IllegalQueryStatusException {
 
     LOG.info("SQL: " + sql);
@@ -133,13 +135,13 @@ public class GlobalEngine extends AbstractService {
       context.getSystemMetrics().counter("Query", "totalQuery").inc();
 
       Expr planningContext = hiveQueryMode ? converter.parse(sql) : analyzer.parse(sql);
-      LogicalPlan plan = createLogicalPlan(planningContext);
+      LogicalPlan plan = createLogicalPlan(session, planningContext);
       LogicalRootNode rootNode = plan.getRootBlock().getRoot();
 
       GetQueryStatusResponse.Builder responseBuilder = GetQueryStatusResponse.newBuilder();
       if (PlannerUtil.checkIfDDLPlan(rootNode)) {
         context.getSystemMetrics().counter("Query", "numDDLQuery").inc();
-        updateQuery(rootNode.getChild());
+        updateQuery(session, rootNode.getChild());
         responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
         responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
         responseBuilder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
@@ -150,7 +152,7 @@ public class GlobalEngine extends AbstractService {
         QueryJobManager queryJobManager = this.context.getQueryJobManager();
         QueryInfo queryInfo;
 
-        queryInfo = queryJobManager.createNewQueryJob(queryContext, sql, rootNode);
+        queryInfo = queryJobManager.createNewQueryJob(session, queryContext, sql, rootNode);
 
         if(queryInfo == null) {
           responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
@@ -186,7 +188,7 @@ public class GlobalEngine extends AbstractService {
     }
   }
 
-  public String explainQuery(String sql) throws IOException, SQLException, PlanningException {
+  public String explainQuery(Session session, String sql) throws IOException, SQLException, PlanningException {
     LOG.info("SQL: " + sql);
     // parse the query
 
@@ -194,35 +196,44 @@ public class GlobalEngine extends AbstractService {
     Expr planningContext = hiveQueryMode ? converter.parse(sql) : analyzer.parse(sql);
     LOG.info("hive.query.mode:" + hiveQueryMode);
 
-    LogicalPlan plan = createLogicalPlan(planningContext);
+    LogicalPlan plan = createLogicalPlan(session, planningContext);
     return plan.toString();
   }
 
-  public QueryId updateQuery(String sql) throws IOException, SQLException, PlanningException {
+  public QueryId updateQuery(Session session, String sql) throws IOException, SQLException, PlanningException {
     LOG.info("SQL: " + sql);
     // parse the query
     Expr expr = analyzer.parse(sql);
-    LogicalPlan plan = createLogicalPlan(expr);
+
+    LogicalPlan plan = createLogicalPlan(session, expr);
     LogicalRootNode rootNode = plan.getRootBlock().getRoot();
 
     if (!PlannerUtil.checkIfDDLPlan(rootNode)) {
       throw new SQLException("This is not update query:\n" + sql);
     } else {
-      updateQuery(rootNode.getChild());
+      updateQuery(session, rootNode.getChild());
       return QueryIdFactory.NULL_QUERY_ID;
     }
   }
 
-  private boolean updateQuery(LogicalNode root) throws IOException {
+  private boolean updateQuery(Session session, LogicalNode root) throws IOException {
 
     switch (root.getType()) {
+      case CREATE_DATABASE:
+        CreateDatabaseNode createDatabase = (CreateDatabaseNode) root;
+        createDatabase(session, createDatabase.getDatabaseName(), null, createDatabase.isIfNotExists());
+        return true;
+      case DROP_DATABASE:
+        DropDatabaseNode dropDatabaseNode = (DropDatabaseNode) root;
+        dropDatabase(session, dropDatabaseNode.getDatabaseName(), dropDatabaseNode.isIfExists());
+        return true;
       case CREATE_TABLE:
         CreateTableNode createTable = (CreateTableNode) root;
-        createTable(createTable);
+        createTable(session, createTable, createTable.isIfNotExists());
         return true;
       case DROP_TABLE:
         DropTableNode dropTable = (DropTableNode) root;
-        dropTable(dropTable.getTableName(), dropTable.isPurge());
+        dropTable(session, dropTable.getTableName(), dropTable.isIfExists(), dropTable.isPurge());
         return true;
 
       default:
@@ -230,10 +241,10 @@ public class GlobalEngine extends AbstractService {
     }
   }
 
-  private LogicalPlan createLogicalPlan(Expr expression) throws PlanningException {
+  private LogicalPlan createLogicalPlan(Session session, Expr expression) throws PlanningException {
 
     VerificationState state = new VerificationState();
-    preVerifier.visit(state, new Stack<Expr>(), expression);
+    preVerifier.verify(session, state, expression);
     if (!state.verified()) {
       StringBuilder sb = new StringBuilder();
       for (String error : state.getErrorMessages()) {
@@ -242,7 +253,7 @@ public class GlobalEngine extends AbstractService {
       throw new VerifyException(sb.toString());
     }
 
-    LogicalPlan plan = planner.createPlan(expression);
+    LogicalPlan plan = planner.createPlan(session, expression);
     LOG.info("=============================================");
     LOG.info("Non Optimized Query: \n" + plan.toString());
     LOG.info("=============================================");
@@ -251,7 +262,7 @@ public class GlobalEngine extends AbstractService {
     LOG.info("Optimized Query: \n" + plan.toString());
     LOG.info("=============================================");
 
-    annotatedPlanVerifier.visit(state, plan, plan.getRootBlock());
+    annotatedPlanVerifier.verify(session, state, plan);
 
     if (!state.verified()) {
       StringBuilder sb = new StringBuilder();
@@ -264,7 +275,7 @@ public class GlobalEngine extends AbstractService {
     return plan;
   }
 
-  private TableDesc createTable(CreateTableNode createTable) throws IOException {
+  private TableDesc createTable(Session session, CreateTableNode createTable, boolean ifNotExists) throws IOException {
     TableMeta meta;
 
     if (createTable.hasOptions()) {
@@ -280,26 +291,46 @@ public class GlobalEngine extends AbstractService {
       createTable.setPath(tablePath);
     }
 
-    return createTableOnPath(createTable.getTableName(), createTable.getTableSchema(), meta,
-        createTable.getPath(), !createTable.isExternal(), createTable.getPartitionMethod());
+    return createTableOnPath(session, createTable.getTableName(), createTable.getTableSchema(),
+        meta, createTable.getPath(), createTable.isExternal(), createTable.getPartitionMethod(), ifNotExists);
   }
 
-  public TableDesc createTableOnPath(String tableName, Schema schema, TableMeta meta,
-                                     Path path, boolean isCreated, PartitionMethodDesc partitionDesc)
+  public TableDesc createTableOnPath(Session session, String tableName, Schema schema, TableMeta meta,
+                                     Path path, boolean isExternal, PartitionMethodDesc partitionDesc,
+                                     boolean ifNotExists)
       throws IOException {
-    if (catalog.existsTable(tableName)) {
-      throw new AlreadyExistsTableException(tableName);
+    String databaseName;
+    String simpleTableName;
+    if (CatalogUtil.isFQTableName(tableName)) {
+      String [] splitted = CatalogUtil.splitFQTableName(tableName);
+      databaseName = splitted[0];
+      simpleTableName = splitted[1];
+    } else {
+      databaseName = session.getCurrentDatabase();
+      simpleTableName = tableName;
     }
+    String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
 
-    FileSystem fs = path.getFileSystem(context.getConf());
+    boolean exists = catalog.existsTable(databaseName, simpleTableName);
 
-    if (isCreated) {
-      fs.mkdirs(path);
+    if (exists) {
+      if (ifNotExists) {
+        LOG.info("relation \"" + qualifiedName + "\" is already exists." );
+        return catalog.getTableDesc(databaseName, simpleTableName);
+      } else {
+        throw new AlreadyExistsTableException(CatalogUtil.buildFQName(databaseName, tableName));
+      }
     }
 
-    if(!fs.exists(path)) {
-      LOG.error("ERROR: " + path.toUri() + " does not exist");
-      throw new IOException("ERROR: " + path.toUri() + " does not exist");
+    FileSystem fs = path.getFileSystem(context.getConf());
+
+    if (isExternal) {
+      if(!fs.exists(path)) {
+        LOG.error("ERROR: " + path.toUri() + " does not exist");
+        throw new IOException("ERROR: " + path.toUri() + " does not exist");
+      }
+    } else {
+      fs.mkdirs(path);
     }
 
     long totalSize = 0;
@@ -312,16 +343,73 @@ public class GlobalEngine extends AbstractService {
 
     TableStats stats = new TableStats();
     stats.setNumBytes(totalSize);
-    TableDesc desc = CatalogUtil.newTableDesc(tableName, schema, meta, path);
+    TableDesc desc = new TableDesc(CatalogUtil.buildFQName(databaseName, simpleTableName),
+        schema, meta, path, isExternal);
     desc.setStats(stats);
     if (partitionDesc != null) {
       desc.setPartitionMethod(partitionDesc);
     }
-    catalog.addTable(desc);
 
-    LOG.info("Table " + desc.getName() + " is created (" + desc.getStats().getNumBytes() + ")");
+    if (catalog.createTable(desc)) {
+      LOG.info("Table " + desc.getName() + " is created (" + desc.getStats().getNumBytes() + ")");
+      return desc;
+    } else {
+      LOG.info("Table creation " + tableName + " is failed.");
+      throw new CatalogException("Cannot create table \"" + tableName + "\".");
+    }
+  }
+
+  public boolean createDatabase(@Nullable Session session, String databaseName,
+                                @Nullable String tablespace,
+                                boolean ifNotExists) throws IOException {
 
-    return desc;
+    String tablespaceName;
+    if (tablespace == null) {
+      tablespaceName = DEFAULT_TABLESPACE_NAME;
+    } else {
+      tablespaceName = tablespace;
+    }
+
+    // CREATE DATABASE IF NOT EXISTS
+    boolean exists = catalog.existDatabase(databaseName);
+    if (exists) {
+      if (ifNotExists) {
+        LOG.info("database \"" + databaseName + "\" is already exists." );
+        return true;
+      } else {
+        throw new AlreadyExistsDatabaseException(databaseName);
+      }
+    }
+
+    if (catalog.createDatabase(databaseName, tablespaceName)) {
+      String normalized = CatalogUtil.normalizeIdentifier(databaseName);
+      Path databaseDir = StorageUtil.concatPath(context.getConf().getVar(TajoConf.ConfVars.WAREHOUSE_DIR), normalized);
+      FileSystem fs = databaseDir.getFileSystem(context.getConf());
+      fs.mkdirs(databaseDir);
+    }
+
+    return true;
+  }
+
+  public boolean dropDatabase(Session session, String databaseName, boolean ifExists) {
+
+    boolean exists = catalog.existDatabase(databaseName);
+    if(!exists) {
+      if (ifExists) { // DROP DATABASE IF EXISTS
+        LOG.info("database \"" + databaseName + "\" does not exists." );
+        return true;
+      } else { // Otherwise, it causes an exception.
+        throw new NoSuchDatabaseException(databaseName);
+      }
+    }
+
+    if (session.getCurrentDatabase().equals(databaseName)) {
+      throw new RuntimeException("ERROR: Cannot drop the current open database");
+    }
+
+    boolean result = catalog.dropDatabase(databaseName);
+    LOG.info("database " + databaseName + " is dropped.");
+    return result;
   }
 
   /**
@@ -330,15 +418,33 @@ public class GlobalEngine extends AbstractService {
    * @param tableName to be dropped
    * @param purge Remove all data if purge is true.
    */
-  public void dropTable(String tableName, boolean purge) {
+  public boolean dropTable(Session session, String tableName, boolean ifExists, boolean purge) {
     CatalogService catalog = context.getCatalog();
 
-    if (!catalog.existsTable(tableName)) {
-      throw new NoSuchTableException(tableName);
+    String databaseName;
+    String simpleTableName;
+    if (CatalogUtil.isFQTableName(tableName)) {
+      String [] splitted = CatalogUtil.splitFQTableName(tableName);
+      databaseName = splitted[0];
+      simpleTableName = splitted[1];
+    } else {
+      databaseName = session.getCurrentDatabase();
+      simpleTableName = tableName;
+    }
+    String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
+
+    boolean exists = catalog.existsTable(qualifiedName);
+    if(!exists) {
+      if (ifExists) { // DROP TABLE IF EXISTS
+        LOG.info("relation \"" + qualifiedName + "\" is already exists." );
+        return true;
+      } else { // Otherwise, it causes an exception.
+        throw new NoSuchTableException(qualifiedName);
+      }
     }
 
-    Path path = catalog.getTableDesc(tableName).getPath();
-    catalog.deleteTable(tableName);
+    Path path = catalog.getTableDesc(qualifiedName).getPath();
+    catalog.dropTable(qualifiedName);
 
     if (purge) {
       try {
@@ -349,7 +455,8 @@ public class GlobalEngine extends AbstractService {
       }
     }
 
-    LOG.info("Table \"" + tableName + "\" is " + (purge ? " purged." : " dropped."));
+    LOG.info(String.format("relation \"%s\" is " + (purge ? " purged." : " dropped."), qualifiedName));
+    return true;
   }
 
   public interface DistributedQueryHook {
@@ -388,9 +495,12 @@ public class GlobalEngine extends AbstractService {
     public void hook(QueryContext queryContext, LogicalPlan plan) throws Exception {
       LogicalRootNode rootNode = plan.getRootBlock().getRoot();
       CreateTableNode createTableNode = rootNode.getChild();
-      String tableName = createTableNode.getTableName();
+      String [] splitted  = CatalogUtil.splitFQTableName(createTableNode.getTableName());
+      String databaseName = splitted[0];
+      String tableName = splitted[1];
       queryContext.setOutputTable(tableName);
-      queryContext.setOutputPath(new Path(TajoConf.getWarehouseDir(context.getConf()), tableName));
+      queryContext.setOutputPath(
+          StorageUtil.concatPath(TajoConf.getWarehouseDir(context.getConf()), databaseName, tableName));
       if(createTableNode.getPartitionMethod() != null) {
         queryContext.setPartitionMethod(createTableNode.getPartitionMethod());
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
index 9fcfca9..9d54bb5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -51,6 +51,7 @@ import org.apache.tajo.master.metrics.WorkerResourceMetricsGaugeSet;
 import org.apache.tajo.master.querymaster.QueryJobManager;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.master.rm.WorkerResourceManager;
+import org.apache.tajo.master.session.SessionManager;
 import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.StorageManagerFactory;
@@ -72,6 +73,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+
 public class TajoMaster extends CompositeService {
   private static final String METRICS_GROUP_NAME = "tajomaster";
 
@@ -114,6 +118,7 @@ public class TajoMaster extends CompositeService {
   private AsyncDispatcher dispatcher;
   private TajoMasterClientService tajoMasterClientService;
   private TajoMasterService tajoMasterService;
+  private SessionManager sessionManager;
 
   private WorkerResourceManager resourceManager;
   //Web Server
@@ -142,7 +147,7 @@ public class TajoMaster extends CompositeService {
   }
 
   @Override
-  public void init(Configuration _conf) {
+  public void serviceInit(Configuration _conf) throws Exception {
     this.systemConf = (TajoConf) _conf;
 
     context = new MasterContext(systemConf);
@@ -165,6 +170,9 @@ public class TajoMaster extends CompositeService {
       addIfService(catalogServer);
       catalog = new LocalCatalogWrapper(catalogServer, systemConf);
 
+      sessionManager = new SessionManager(dispatcher);
+      addIfService(sessionManager);
+
       globalEngine = new GlobalEngine(context);
       addIfService(globalEngine);
 
@@ -176,13 +184,12 @@ public class TajoMaster extends CompositeService {
 
       tajoMasterService = new TajoMasterService(context);
       addIfService(tajoMasterService);
-
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
+      throw e;
     }
 
-    super.init(systemConf);
-
+    super.serviceInit(systemConf);
     LOG.info("Tajo Master is initialized.");
   }
 
@@ -336,9 +343,13 @@ public class TajoMaster extends CompositeService {
   }
 
   @Override
-  public void start() {
-    LOG.info("TajoMaster startup");
-    super.start();
+  public void serviceStart() throws Exception {
+    LOG.info("TajoMaster is starting up");
+
+    // check base tablespace and databases
+    checkBaseTBSpaceAndDatabase();
+
+    super.serviceStart();
 
     // Setting the system global configs
     systemConf.setSocketAddr(ConfVars.CATALOG_ADDRESS.varname,
@@ -375,6 +386,20 @@ public class TajoMaster extends CompositeService {
     defaultFS.setReplication(systemConfPath, (short) systemConf.getIntVar(ConfVars.SYSTEM_CONF_REPLICA_COUNT));
   }
 
+  private void checkBaseTBSpaceAndDatabase() throws IOException {
+    if (!catalog.existTablespace(DEFAULT_TABLESPACE_NAME)) {
+      catalog.createTablespace(DEFAULT_TABLESPACE_NAME, context.getConf().getVar(ConfVars.WAREHOUSE_DIR));
+    } else {
+      LOG.info(String.format("Default tablespace (%s) is already prepared.", DEFAULT_TABLESPACE_NAME));
+    }
+
+    if (!catalog.existDatabase(DEFAULT_DATABASE_NAME)) {
+      globalEngine.createDatabase(null, DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME, false);
+    } else {
+      LOG.info(String.format("Default database (%s) is already prepared.", DEFAULT_DATABASE_NAME));
+    }
+  }
+
   @Override
   public void stop() {
     if (webServer != null) {
@@ -448,6 +473,10 @@ public class TajoMaster extends CompositeService {
       return catalog;
     }
 
+    public SessionManager getSessionManager() {
+      return sessionManager;
+    }
+
     public GlobalEngine getGlobalEngine() {
       return globalEngine;
     }


Mime
View raw message