tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [29/51] [partial] TAJO-22: The package prefix should be org.apache.tajo. (DaeMyung Kang via hyunsik)
Date Tue, 02 Jul 2013 14:16:23 GMT
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IndexScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IndexScanNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IndexScanNode.java
new file mode 100644
index 0000000..af3b2c4
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IndexScanNode.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.gson.Gson;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.json.GsonCreator;
+
+public class IndexScanNode extends ScanNode {
+  
+  @Expose private SortSpec[]sortKeys;
+  @Expose private Schema keySchema = null;
+  @Expose private Datum[] datum = null;
+  //TODO- @Expose private IndexType type;
+  
+  public IndexScanNode(ScanNode scanNode , 
+      Schema keySchema , Datum[] datum, SortSpec[] sortKeys ) {
+    super();
+    setQual(scanNode.getQual());
+    setFromTable(scanNode.getFromTable());
+    setInSchema(scanNode.getInSchema());
+    setOutSchema(scanNode.getOutSchema());
+    setLocal(scanNode.isLocal());
+    setTargets(scanNode.getTargets());
+    setType(ExprType.BST_INDEX_SCAN);
+    this.sortKeys = sortKeys;
+    this.keySchema = keySchema;
+    this.datum = datum;
+  }
+  
+  public SortSpec[] getSortKeys() {
+    return this.sortKeys;
+  }
+  
+  public Schema getKeySchema() {
+    return this.keySchema;
+  }
+  
+  public Datum[] getDatum() {
+    return this.datum;
+  }
+  
+  public void setSortKeys(SortSpec[] sortKeys) {
+    this.sortKeys = sortKeys;
+  }
+  
+  public void setKeySchema( Schema keySchema ) {
+    this.keySchema = keySchema;
+  }
+  
+  @Override
+  public String toJSON() {
+    GsonCreator.getInstance().toJson(this, LogicalNode.class);
+    return null;
+  }
+  @Override
+  public String toString() {
+    Gson gson = GsonCreator.getInstance();
+    StringBuilder builder = new StringBuilder();
+    builder.append("IndexScanNode : {\n");
+    builder.append("  \"keySchema\" : \"" + gson.toJson(this.keySchema) + "\"\n");
+    builder.append("  \"sortKeys\" : \"" + gson.toJson(this.sortKeys) + " \"\n");
+    builder.append("  \"datums\" : \"" + gson.toJson(this.datum) + "\"\n");
+    builder.append("      <<\"superClass\" : " + super.toString());
+    builder.append(">>}");
+    builder.append("}");
+    return builder.toString();
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof IndexScanNode) {
+      IndexScanNode other = (IndexScanNode) obj;
+      
+      boolean eq = super.equals(other);
+      eq = eq && this.sortKeys.length == other.sortKeys.length;
+      if(eq) {
+        for(int i = 0 ; i < this.sortKeys.length ; i ++) {
+          eq = eq && this.sortKeys[i].getSortKey().equals(
+              other.sortKeys[i].getSortKey());
+          eq = eq && this.sortKeys[i].isAscending()
+              == other.sortKeys[i].isAscending();
+          eq = eq && this.sortKeys[i].isNullFirst()
+              == other.sortKeys[i].isNullFirst();
+        }
+      }
+      if(eq) {
+        for(int i = 0 ; i < this.datum.length ; i ++ ) {
+          eq = eq && this.datum[i].equals(other.datum[i]);
+        }
+      }
+     return eq;
+    }   
+    return false;
+  } 
+  
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    IndexScanNode indexNode = (IndexScanNode) super.clone();
+    indexNode.keySchema = (Schema) this.keySchema.clone();
+    indexNode.sortKeys = new SortSpec[this.sortKeys.length];
+    for(int i = 0 ; i < sortKeys.length ; i ++ )
+      indexNode.sortKeys[i] = (SortSpec) this.sortKeys[i].clone();
+    indexNode.datum = new Datum[this.datum.length];
+    for(int i = 0 ; i < datum.length ; i ++ ) {
+      indexNode.datum[i] = this.datum[i];
+    }
+    return indexNode;
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IndexWriteNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IndexWriteNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IndexWriteNode.java
new file mode 100644
index 0000000..fde8d7f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IndexWriteNode.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
+import org.apache.tajo.engine.json.GsonCreator;
+import org.apache.tajo.engine.parser.CreateIndexStmt;
+
+public class IndexWriteNode extends UnaryNode {
+  @Expose private String indexName;
+  @Expose private boolean unique = false;
+  @Expose private String tableName;
+  @Expose private IndexMethod method = IndexMethod.TWO_LEVEL_BIN_TREE;
+  @Expose private SortSpec[] sortSpecs;
+  @Expose private Options params = null;
+
+  public IndexWriteNode(CreateIndexStmt stmt) {
+    super(ExprType.CREATE_INDEX);
+    this.indexName = stmt.getIndexName();
+    this.unique = stmt.isUnique();
+    this.tableName = stmt.getTableName();
+    this.method = stmt.getMethod();
+    this.sortSpecs = stmt.getSortSpecs();
+    this.params = stmt.hasParams() ? stmt.getParams() : null;
+  }
+  
+  public String getIndexName() {
+    return this.indexName;
+  }
+  
+  public boolean isUnique() {
+    return this.unique;
+  }
+  
+  public void setUnique() {
+    this.unique = true;
+  }
+  
+  public String getTableName() {
+    return this.tableName;
+  }
+  
+  public IndexMethod getMethod() {
+    return this.method;
+  }
+  
+  public SortSpec[] getSortSpecs() {
+    return this.sortSpecs;
+  }
+  
+  public boolean hasParams() {
+    return this.params != null;
+  }
+  
+  public Options getParams() {
+    return this.params;
+  }
+
+  public String toJSON() {
+    for( int i = 0 ; i < this.sortSpecs.length ; i ++ ) {
+      sortSpecs[i].getSortKey().initFromProto();
+    }
+    return GsonCreator.getInstance().toJson(this, LogicalNode.class);
+  }
+  
+  @Override
+  public String toString() {
+    Gson gson = new GsonBuilder().setPrettyPrinting().create();
+    return gson.toJson(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IntersectNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IntersectNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IntersectNode.java
new file mode 100644
index 0000000..aa90df9
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IntersectNode.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.planner.logical;
+
+import org.apache.tajo.engine.json.GsonCreator;
+
+public class IntersectNode extends BinaryNode {
+
+  public IntersectNode() {
+    super(ExprType.INTERSECT);
+  }
+
+  public IntersectNode(LogicalNode outer, LogicalNode inner) {
+    this();
+    setOuter(outer);
+    setInner(inner);
+  }
+
+  public String toString() {
+    return getOuterNode().toString() + "\n INTERSECT \n" + getInnerNode().toString();
+  }
+
+  @Override
+  public String toJSON() {
+    return GsonCreator.getInstance().toJson(this, LogicalNode.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/JoinNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/JoinNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/JoinNode.java
new file mode 100644
index 0000000..c12e89e
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/JoinNode.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.json.GsonCreator;
+import org.apache.tajo.engine.parser.QueryBlock;
+import org.apache.tajo.engine.planner.JoinType;
+
+public class JoinNode extends BinaryNode implements Cloneable {
+  @Expose private JoinType joinType;
+  @Expose private EvalNode joinQual;
+  @Expose private QueryBlock.Target[] targets;
+
+  public JoinNode(JoinType joinType, LogicalNode left) {
+    super(ExprType.JOIN);
+    this.joinType = joinType;
+    setOuter(left);
+  }
+
+  public JoinNode(JoinType joinType, LogicalNode left, LogicalNode right) {
+    super(ExprType.JOIN);
+    this.joinType = joinType;
+    setOuter(left);
+    setInner(right);
+  }
+
+  public JoinType getJoinType() {
+    return this.joinType;
+  }
+
+  public void setJoinType(JoinType joinType) {
+    this.joinType = joinType;
+  }
+
+  public void setJoinQual(EvalNode joinQual) {
+    this.joinQual = joinQual;
+  }
+
+  public boolean hasJoinQual() {
+    return this.joinQual != null;
+  }
+
+  public EvalNode getJoinQual() {
+    return this.joinQual;
+  }
+
+  public boolean hasTargetList() {
+    return this.targets != null;
+  }
+
+  public QueryBlock.Target[] getTargets() {
+    return this.targets;
+  }
+
+  public void setTargetList(QueryBlock.Target[] targets) {
+    this.targets = targets;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof JoinNode) {
+      JoinNode other = (JoinNode) obj;
+      return super.equals(other) && outer.equals(other.outer)
+          && inner.equals(other.inner);
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    JoinNode join = (JoinNode) super.clone();
+    join.joinType = this.joinType;
+    join.joinQual = this.joinQual == null ? null : (EvalNode) this.joinQual.clone();
+    return join;
+  }
+
+  public String toString() {
+    return "\"Join\": \"joinType\": \"" + joinType +"\""
+        + (joinQual != null ? ", \"qual\": " + joinQual : "")
+        + "\n\"out schema: " + getOutSchema()
+        + "\n\"in schema: " + getInSchema()
+    		+ "\n" + getOuterNode().toString() + " and " + getInnerNode();
+  }
+
+  public String toJSON() {
+    return GsonCreator.getInstance().toJson(this, LogicalNode.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java
new file mode 100644
index 0000000..966e4a8
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.engine.json.GsonCreator;
+import org.apache.tajo.engine.parser.QueryBlock.LimitClause;
+import org.apache.tajo.util.TUtil;
+
+public final class LimitNode extends UnaryNode implements Cloneable {
+	@Expose
+  private LimitClause limitClause;
+
+	public LimitNode() {
+		super();
+	}
+
+  public LimitNode(LimitClause limitClause) {
+    super(ExprType.LIMIT);
+    this.limitClause = limitClause;
+  }
+  
+  public long getFetchFirstNum() {
+    return this.limitClause.getLimitRow();
+  }
+  
+  @Override 
+  public boolean equals(Object obj) {
+    if (obj instanceof LimitNode) {
+      LimitNode other = (LimitNode) obj;
+      return super.equals(other)
+          && TUtil.checkEquals(limitClause, other.limitClause)
+          && subExpr.equals(other.subExpr);
+    } else {
+      return false;
+    }
+  }
+  
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    LimitNode newLimitNode = (LimitNode) super.clone();
+    newLimitNode.limitClause = (LimitClause) limitClause.clone();
+    return newLimitNode;
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder(limitClause.toString());
+
+    sb.append("\n\"out schema: " + getOutSchema()
+        + "\n\"in schema: " + getInSchema());
+    return sb.toString()+"\n"
+        + getSubNode().toString();
+  }
+  
+  public String toJSON() {
+    return GsonCreator.getInstance().toJson(this, LogicalNode.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNode.java
new file mode 100644
index 0000000..c206753
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNode.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Schema;
+
+public abstract class LogicalNode implements Cloneable {
+	@Expose
+	private ExprType type;
+	@Expose
+	private Schema inputSchema;
+	@Expose
+	private Schema outputSchema;
+
+	@Expose
+	private double cost = 0;
+	
+	public LogicalNode() {
+		
+	}
+
+	public LogicalNode(ExprType type) {
+		this.type = type;
+	}
+	
+	public ExprType getType() {
+		return this.type;
+	}
+
+	public void setType(ExprType type) {
+		this.type = type;
+	}
+
+	public double getCost() {
+		return this.cost;
+	}
+
+	public void setCost(double cost) {
+		this.cost = cost;
+	}
+	
+	public void setInSchema(Schema inSchema) {
+	  this.inputSchema = inSchema;
+	}
+	
+	public Schema getInSchema() {
+	  return this.inputSchema;
+	}
+	
+	public void setOutSchema(Schema outSchema) {
+	  this.outputSchema = outSchema;
+	}
+	
+	public Schema getOutSchema() {
+	  return this.outputSchema;
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+	  if (obj instanceof LogicalNode) {
+	    LogicalNode other = (LogicalNode) obj;
+
+      boolean b1 = this.type == other.type;
+      boolean b2 = this.inputSchema.equals(other.inputSchema);
+      boolean b3 = this.outputSchema.equals(other.outputSchema);
+      boolean b4 = this.cost == other.cost;
+      
+      return b1 && b2 && b3 && b4;
+	  } else {
+	    return false;
+	  }
+	}
+	
+	@Override
+	public Object clone() throws CloneNotSupportedException {
+	  LogicalNode node = (LogicalNode)super.clone();
+	  node.type = type;
+	  node.inputSchema = 
+	      (Schema) (inputSchema != null ? inputSchema.clone() : null);
+	  node.outputSchema = 
+	      (Schema) (outputSchema != null ? outputSchema.clone() : null);
+	  
+	  return node;
+	}
+	
+	public abstract String toJSON();
+
+	public abstract void preOrder(LogicalNodeVisitor visitor);
+  public abstract void postOrder(LogicalNodeVisitor visitor);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNodeVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNodeVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNodeVisitor.java
new file mode 100644
index 0000000..5b0c1c2
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNodeVisitor.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.planner.logical;
+
+
+public interface LogicalNodeVisitor {
+  void visit(LogicalNode node);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalRootNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalRootNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalRootNode.java
new file mode 100644
index 0000000..154b931
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalRootNode.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import org.apache.tajo.engine.json.GsonCreator;
+
+public class LogicalRootNode extends UnaryNode implements Cloneable {
+  public LogicalRootNode() {
+    super(ExprType.ROOT);
+  }
+  
+  public String toString() {
+    return "Logical Plan Root\n\n" + getSubNode().toString();
+  }
+  
+  @Override
+  public String toJSON() {
+    return GsonCreator.getInstance().toJson(this, LogicalNode.class);
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof LogicalRootNode) {
+      LogicalRootNode other = (LogicalRootNode) obj;
+      boolean b1 = super.equals(other);
+      boolean b2 = subExpr.equals(other.subExpr);
+      
+      return b1 && b2;
+    } else {
+      return false;
+    }
+  }
+  
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    return super.clone();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ProjectionNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ProjectionNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ProjectionNode.java
new file mode 100644
index 0000000..2c948f3
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ProjectionNode.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.engine.json.GsonCreator;
+import org.apache.tajo.engine.parser.QueryBlock.Target;
+
+import java.util.Arrays;
+
+public class ProjectionNode extends UnaryNode {
+  /**
+   * the targets are always filled even if the query is 'select *'
+   */
+  @Expose	private Target [] targets;
+  @Expose private boolean distinct = false;
+
+  /**
+   * This method is for gson.
+   */
+	private ProjectionNode() {
+		super();
+	}
+
+	public ProjectionNode(Target [] targets) {		
+		super(ExprType.PROJECTION);
+		this.targets = targets;
+	}
+	
+	public Target [] getTargets() {
+	  return this.targets;
+	}
+
+  public void setTargetList(Target [] targets) {
+    this.targets = targets;
+  }
+	
+	public void setSubNode(LogicalNode subNode) {
+	  super.setSubNode(subNode);
+	}
+	
+	public String toString() {
+	  StringBuilder sb = new StringBuilder();
+	  sb.append("\"Projection\": {");
+    if (distinct) {
+      sb.append("\"distinct\": true, ");
+    }
+    sb.append("\"targets\": [");
+	  
+	  for (int i = 0; i < targets.length; i++) {
+	    sb.append("\"").append(targets[i]).append("\"");
+	    if( i < targets.length - 1) {
+	      sb.append(",");
+	    }
+	  }
+	  sb.append("],");
+	  sb.append("\n  \"out schema\": ").append(getOutSchema()).append(",");
+	  sb.append("\n  \"in schema\": ").append(getInSchema());
+	  sb.append("}");
+	  return sb.toString()+"\n"
+	      + getSubNode().toString();
+	}
+	
+	@Override
+  public boolean equals(Object obj) {
+	  if (obj instanceof ProjectionNode) {
+	    ProjectionNode other = (ProjectionNode) obj;
+	    
+	    boolean b1 = super.equals(other);
+	    boolean b2 = Arrays.equals(targets, other.targets);
+	    boolean b3 = subExpr.equals(other.subExpr);
+	    
+	    return b1 && b2 && b3;
+	  } else {
+	    return false;
+	  }
+	}
+	
+	@Override
+  public Object clone() throws CloneNotSupportedException {
+	  ProjectionNode projNode = (ProjectionNode) super.clone();
+	  projNode.targets = targets.clone();
+	  
+	  return projNode;
+	}
+	
+	public String toJSON() {
+	  return GsonCreator.getInstance().toJson(this, LogicalNode.class);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
new file mode 100644
index 0000000..64fa63a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.json.GsonCreator;
+import org.apache.tajo.engine.parser.QueryBlock;
+import org.apache.tajo.engine.parser.QueryBlock.FromTable;
+import org.apache.tajo.engine.parser.QueryBlock.Target;
+import org.apache.tajo.util.TUtil;
+
+public class ScanNode extends LogicalNode {
+	@Expose private FromTable table;
+	@Expose private EvalNode qual;
+	@Expose private QueryBlock.Target[] targets;
+	@Expose private boolean local;
+  @Expose private boolean broadcast;
+	
+	public ScanNode() {
+		super();
+		local = false;
+	}
+  
+	public ScanNode(FromTable table) {
+		super(ExprType.SCAN);
+		this.table = table;
+		this.setInSchema(table.getSchema());
+		this.setOutSchema(table.getSchema());
+		local = false;
+	}
+	
+	public String getTableId() {
+	  return table.getTableName();
+	}
+	
+	public boolean hasAlias() {
+	  return table.hasAlias();
+	}
+	
+	public boolean hasQual() {
+	  return qual != null;
+	}
+	
+	public EvalNode getQual() {
+	  return this.qual;
+	}
+	
+	public boolean isLocal() {
+	  return this.local;
+	}
+	
+	public void setLocal(boolean local) {
+	  this.local = local;
+	}
+	
+	public void setQual(EvalNode evalTree) {
+	  this.qual = evalTree;
+	}
+	
+	public boolean hasTargetList() {
+	  return this.targets != null;
+	}
+	
+	public void setTargets(Target [] targets) {
+	  this.targets = targets;
+	}
+	
+	public Target [] getTargets() {
+	  return this.targets;
+	}
+
+  public boolean isBroadcast() {
+    return broadcast;
+  }
+
+  public void setBroadcast() {
+    broadcast = true;
+  }
+	
+	public FromTable getFromTable() {
+	  return this.table;
+	}
+	
+	public void setFromTable(FromTable from) {
+	  this.table = from;
+	}
+	
+	public String toString() {
+	  StringBuilder sb = new StringBuilder();	  
+	  sb.append("\"Scan\" : {\"table\":\"")
+	  .append(table.getTableName()).append("\"");
+	  if (hasAlias()) {
+	    sb.append(",\"alias\": \"").append(table.getAlias());
+	  }
+
+    if (isBroadcast()) {
+      sb.append(",\"broadcast\": true\"");
+    }
+	  
+	  if (hasQual()) {
+	    sb.append(", \"qual\": \"").append(this.qual).append("\"");
+	  }
+	  
+	  if (hasTargetList()) {
+	    sb.append(", \"target list\": ");
+      boolean first = true;
+      for (Target target : targets) {
+        if (!first) {
+          sb.append(", ");
+        }
+        sb.append(target);
+        first = false;
+      }
+	  }
+	  
+	  sb.append(",");
+	  sb.append("\n  \"out schema\": ").append(getOutSchema());
+	  sb.append("\n  \"in schema\": ").append(getInSchema());
+	  return sb.toString();
+	}
+	
+	public String toJSON() {
+	  return GsonCreator.getInstance().toJson(this, LogicalNode.class);
+	}
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(this.table, this.qual, this.targets);
+  }
+	
+	@Override
+	public boolean equals(Object obj) {
+	  if (obj instanceof ScanNode) {
+	    ScanNode other = (ScanNode) obj;
+	    
+	    boolean eq = super.equals(other); 
+	    eq = eq && TUtil.checkEquals(this.table, other.table);
+	    eq = eq && TUtil.checkEquals(this.qual, other.qual);
+	    eq = eq && TUtil.checkEquals(this.targets, other.targets);
+	    
+	    return eq;
+	  }	  
+	  
+	  return false;
+	}	
+	
+	@Override
+	public Object clone() throws CloneNotSupportedException {
+	  ScanNode scanNode = (ScanNode) super.clone();
+	  
+	  scanNode.table = (FromTable) this.table.clone();
+	  
+	  if (hasQual()) {
+	    scanNode.qual = (EvalNode) this.qual.clone();
+	  }
+	  
+	  if (hasTargetList()) {
+	    scanNode.targets = new Target[targets.length];
+      for (int i = 0; i < targets.length; i++) {
+        scanNode.targets[i] = (Target) targets[i].clone();
+      }
+	  }
+	  
+	  return scanNode;
+	}
+	
+  @Override
+  public void preOrder(LogicalNodeVisitor visitor) {
+    visitor.visit(this);
+  }
+	
+	public void postOrder(LogicalNodeVisitor visitor) {        
+    visitor.visit(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
new file mode 100644
index 0000000..b822d8a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.json.GsonCreator;
+
+public class SelectionNode extends UnaryNode implements Cloneable {
+
+	@Expose
+	private EvalNode qual;
+	
+	public SelectionNode() {
+		super();
+	}
+	
+	public SelectionNode(EvalNode qual) {
+		super(ExprType.SELECTION);
+		setQual(qual);
+	}
+
+	public EvalNode getQual() {
+		return this.qual;
+	}
+
+	public void setQual(EvalNode qual) {
+		this.qual = qual;
+	}
+  
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("\"Selection\": {\"qual\": \"").append(qual.toString()).append("\",");
+    sb.append("\n  \"out schema\": ").append(getOutSchema()).append(",");
+    sb.append("\n  \"in schema\": ").append(getInSchema()).append("}");
+    
+    return sb.toString()+"\n"
+    + getSubNode().toString();
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof SelectionNode) {
+      SelectionNode other = (SelectionNode) obj;
+      return super.equals(other) 
+          && this.qual.equals(other.qual)
+          && subExpr.equals(other.subExpr);
+    } else {
+      return false;
+    }
+  }
+  
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    SelectionNode selNode = (SelectionNode) super.clone();
+    selNode.qual = (EvalNode) this.qual.clone();
+    
+    return selNode;
+  }
+  
+  public String toJSON() {
+    return GsonCreator.getInstance().toJson(this, LogicalNode.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
new file mode 100644
index 0000000..947553a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.engine.json.GsonCreator;
+import org.apache.tajo.util.TUtil;
+
+public final class SortNode extends UnaryNode implements Cloneable {
+	@Expose
+  private SortSpec[] sortKeys;
+	
+	public SortNode() {
+		super();
+	}
+
+  public SortNode(SortSpec[] sortKeys) {
+    super(ExprType.SORT);
+    Preconditions.checkArgument(sortKeys.length > 0, 
+        "At least one sort key must be specified");
+    this.sortKeys = sortKeys;
+  }
+
+  public SortNode(SortSpec[] sortKeys, Schema inSchema, Schema outSchema) {
+    this(sortKeys);
+    this.setInSchema(inSchema);
+    this.setOutSchema(outSchema);
+  }
+  
+  public SortSpec[] getSortKeys() {
+    return this.sortKeys;
+  }
+  
+  @Override 
+  public boolean equals(Object obj) {
+    if (obj instanceof SortNode) {
+      SortNode other = (SortNode) obj;
+      return super.equals(other)
+          && TUtil.checkEquals(sortKeys, other.sortKeys)
+          && subExpr.equals(other.subExpr);
+    } else {
+      return false;
+    }
+  }
+  
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    SortNode sort = (SortNode) super.clone();
+    sort.sortKeys = sortKeys.clone();
+    
+    return sort;
+  }
+  
+  public String toString() {
+    StringBuilder sb = new StringBuilder("Sort [key= ");
+    for (int i = 0; i < sortKeys.length; i++) {    
+      sb.append(sortKeys[i].getSortKey().getQualifiedName()).append(" ")
+          .append(sortKeys[i].isAscending() ? "asc" : "desc");
+      if(i < sortKeys.length - 1) {
+        sb.append(",");
+      }
+    }
+    sb.append("]");
+
+    sb.append("\n\"out schema: " + getOutSchema()
+        + "\n\"in schema: " + getInSchema());
+    return sb.toString()+"\n"
+        + getSubNode().toString();
+  }
+
+  public String toJSON() {
+    subExpr.toJSON();
+    for (int i = 0; i < sortKeys.length; i++) {
+      sortKeys[i].toJSON();
+    }
+    return GsonCreator.getInstance().toJson(this, LogicalNode.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreIndexNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreIndexNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreIndexNode.java
new file mode 100644
index 0000000..3c44374
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreIndexNode.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+public class StoreIndexNode extends StoreTableNode {
+
+  public StoreIndexNode(String tableName) {
+    super(tableName);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
new file mode 100644
index 0000000..c3d292f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.master.ExecutionBlock.PartitionType;
+import org.apache.tajo.util.TUtil;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+
+public class StoreTableNode extends UnaryNode implements Cloneable {
+  @Expose private String tableName;
+  @Expose private StoreType storageType = StoreType.CSV;
+  @Expose private PartitionType partitionType;
+  @Expose private int numPartitions;
+  @Expose private Column [] partitionKeys;
+  @Expose private boolean local;
+  @Expose private Options options;
+
+  public StoreTableNode(String tableName) {
+    super(ExprType.STORE);
+    this.tableName = tableName;
+    this.local = false;
+  }
+
+  public final String getTableName() {
+    return this.tableName;
+  }
+
+  public final void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+
+  public void setStorageType(StoreType storageType) {
+    this.storageType = storageType;
+  }
+
+  public StoreType getStorageType() {
+    return this.storageType;
+  }
+
+  public final void setLocal(boolean local) {
+    this.local = local;
+  }
+
+  public final boolean isLocal() {
+    return this.local;
+  }
+    
+  public final int getNumPartitions() {
+    return this.numPartitions;
+  }
+  
+  public final boolean hasPartitionKey() {
+    return this.partitionKeys != null;
+  }
+  
+  public final Column [] getPartitionKeys() {
+    return this.partitionKeys;
+  }
+
+  public final void setListPartition() {
+    this.partitionType = PartitionType.LIST;
+    this.partitionKeys = null;
+    this.numPartitions = 0;
+  }
+  
+  public final void setPartitions(PartitionType type, Column [] keys, int numPartitions) {
+    Preconditions.checkArgument(keys.length >= 0, 
+        "At least one partition key must be specified.");
+    Preconditions.checkArgument(numPartitions > 0,
+        "The number of partitions must be positive: %s", numPartitions);
+
+    this.partitionType = type;
+    this.partitionKeys = keys;
+    this.numPartitions = numPartitions;
+  }
+
+  public PartitionType getPartitionType() {
+    return this.partitionType;
+  }
+
+  public boolean hasOptions() {
+    return this.options != null;
+  }
+
+  public void setOptions(Options options) {
+    this.options = options;
+  }
+
+  public Options getOptions() {
+    return this.options;
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof StoreTableNode) {
+      StoreTableNode other = (StoreTableNode) obj;
+      return super.equals(other)
+          && this.tableName.equals(other.tableName)
+          && this.storageType.equals(other.storageType)
+          && this.numPartitions == other.numPartitions
+          && TUtil.checkEquals(partitionKeys, other.partitionKeys)
+          && TUtil.checkEquals(options, other.options)
+          && subExpr.equals(other.subExpr);
+    } else {
+      return false;
+    }
+  }
+  
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    StoreTableNode store = (StoreTableNode) super.clone();
+    store.tableName = tableName;
+    store.storageType = storageType != null ? storageType : null;
+    store.numPartitions = numPartitions;
+    store.partitionKeys = partitionKeys != null ? partitionKeys.clone() : null;
+    store.options = options != null ? (Options) options.clone() : null;
+    return store;
+  }
+  
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("\"Store\": {\"table\": \""+tableName);
+    if (storageType != null) {
+      sb.append(", storage: "+ storageType.name());
+    }
+    sb.append(", partnum: ").append(numPartitions).append("}")
+    .append(", ");
+    if (partitionKeys != null) {
+      sb.append("\"partition keys: [");
+      for (int i = 0; i < partitionKeys.length; i++) {
+        sb.append(partitionKeys[i]);
+        if (i < partitionKeys.length - 1)
+          sb.append(",");
+      }
+      sb.append("],");
+    }
+    
+    sb.append("\n  \"out schema\": ").append(getOutSchema()).append(",")
+    .append("\n  \"in schema\": ").append(getInSchema())
+    .append("}");
+    
+    return sb.toString() + "\n"
+        + getSubNode().toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
new file mode 100644
index 0000000..b415442
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.engine.json.GsonCreator;
+
+
+public abstract class UnaryNode extends LogicalNode implements Cloneable {
+	@Expose
+	LogicalNode subExpr;
+	
+	public UnaryNode() {
+		super();
+	}
+	
+	/**
+	 * @param type
+	 */
+	public UnaryNode(ExprType type) {
+		super(type);
+	}
+	
+	public void setSubNode(LogicalNode subNode) {
+		this.subExpr = subNode;
+	}
+	
+	public LogicalNode getSubNode() {
+		return this.subExpr;
+	}
+	
+	@Override
+  public Object clone() throws CloneNotSupportedException {
+	  UnaryNode unary = (UnaryNode) super.clone();
+	  unary.subExpr = (LogicalNode) (subExpr == null ? null : subExpr.clone());
+	  
+	  return unary;
+	}
+	
+	public void preOrder(LogicalNodeVisitor visitor) {
+	  visitor.visit(this);
+	  subExpr.preOrder(visitor);
+  }
+	
+	public void postOrder(LogicalNodeVisitor visitor) {
+	  subExpr.postOrder(visitor);	  
+	  visitor.visit(this);
+	}
+
+  public String toJSON() {
+    subExpr.toJSON();
+    return GsonCreator.getInstance().toJson(this, LogicalNode.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
new file mode 100644
index 0000000..e336549
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.planner.logical;
+
+import org.apache.tajo.engine.json.GsonCreator;
+
+public class UnionNode extends BinaryNode {
+
+  public UnionNode() {
+    super(ExprType.UNION);
+  }
+
+  public UnionNode(LogicalNode outer, LogicalNode inner) {
+    this();
+    setOuter(outer);
+    setInner(inner);
+  }
+
+  public String toString() {
+    return getOuterNode().toString() + "\n UNION \n" + getInnerNode().toString();
+  }
+
+  @Override
+  public String toJSON() {
+    return GsonCreator.getInstance().toJson(this, LogicalNode.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/extended/PipeType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/extended/PipeType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/extended/PipeType.java
new file mode 100644
index 0000000..849bd30
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/extended/PipeType.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical.extended;
+
+public enum PipeType {
+  PULL,
+  PUSH
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/extended/ReceiveNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/extended/ReceiveNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/extended/ReceiveNode.java
new file mode 100644
index 0000000..2a6112e
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/extended/ReceiveNode.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.planner.logical.extended;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.engine.json.GsonCreator;
+import org.apache.tajo.engine.planner.logical.ExprType;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.LogicalNodeVisitor;
+
+import java.net.URI;
+import java.util.*;
+import java.util.Map.Entry;
+
+public final class ReceiveNode extends LogicalNode implements Cloneable {
+  @Expose private PipeType pipeType;
+  @Expose private RepartitionType repaType;
+  @Expose private Map<String, List<URI>> fetchMap;
+
+  private ReceiveNode() {
+    super(ExprType.RECEIVE);
+  }
+  public ReceiveNode(PipeType pipeType, RepartitionType shuffleType) {
+    this();
+    this.pipeType = pipeType;
+    this.repaType = shuffleType;
+    this.fetchMap = Maps.newHashMap();
+  }
+
+  public PipeType getPipeType() {
+    return this.pipeType;
+  }
+
+  public RepartitionType getRepartitionType() {
+    return this.repaType;
+  }
+  
+  public void addData(String name, URI uri) {
+    if (fetchMap.containsKey(name)) {
+      fetchMap.get(name).add(uri);
+    } else {
+      fetchMap.put(name, Lists.newArrayList(uri));
+    }
+  }
+  
+  public Collection<URI> getSrcURIs(String name) {
+    return Collections.unmodifiableList(fetchMap.get(name));
+  }
+
+  public Collection<Entry<String, List<URI>>> getAllDataSet() {
+    return Collections.unmodifiableSet(fetchMap.entrySet());
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof ReceiveNode) {
+      ReceiveNode other = (ReceiveNode) obj;
+      return pipeType == other.pipeType && repaType == other.repaType;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(pipeType, repaType);
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    ReceiveNode receive = (ReceiveNode) super.clone();
+    receive.pipeType = pipeType;
+    receive.repaType = repaType;
+    receive.fetchMap = Maps.newHashMap();
+    // Both String and URI are immutable, but a list is mutable.
+    for (Entry<String, List<URI>> entry : fetchMap.entrySet()) {
+      receive.fetchMap
+          .put(entry.getKey(), new ArrayList<URI>(entry.getValue()));
+    }
+
+    return receive;
+  }
+
+  @Override
+  public String toString() {
+    Gson gson = new GsonBuilder().setPrettyPrinting().create();
+    return gson.toJson(this);
+  }
+
+  @Override
+  public String toJSON() {
+    return GsonCreator.getInstance().toJson(this, LogicalNode.class);
+  }
+
+  @Override
+  public void preOrder(LogicalNodeVisitor visitor) {
+    visitor.visit(this);
+  }
+  
+  @Override
+  public void postOrder(LogicalNodeVisitor visitor) {
+    visitor.visit(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/extended/RepartitionType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/extended/RepartitionType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/extended/RepartitionType.java
new file mode 100644
index 0000000..e46c55e
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/extended/RepartitionType.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical.extended;
+
+public enum RepartitionType {
+  NONE,
+  HASH,
+  SORT
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/extended/SendNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/extended/SendNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/extended/SendNode.java
new file mode 100644
index 0000000..a994c6c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/extended/SendNode.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.planner.logical.extended;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.json.GsonCreator;
+import org.apache.tajo.engine.planner.logical.ExprType;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.UnaryNode;
+import org.apache.tajo.util.TUtil;
+
+import java.net.URI;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * This logical node means that the worker sends intermediate data to 
+ * some destined one or more workers.
+ */
+public class SendNode extends UnaryNode {
+  @Expose private PipeType pipeType;
+  @Expose private RepartitionType repaType;
+  /** This will be used for pipeType == PUSH. */
+  @Expose private Map<Integer, URI> destURIs;
+  @Expose private Column[] partitionKeys;
+  @Expose private int numPartitions;
+
+  private SendNode() {
+    super(ExprType.SEND);
+  }
+  
+  public SendNode(PipeType pipeType, RepartitionType repaType) {
+    this();
+    this.pipeType = pipeType;
+    this.repaType = repaType;
+    this.destURIs = Maps.newHashMap();
+  }
+
+  public PipeType getPipeType() {
+    return this.pipeType;
+  }
+  
+  public RepartitionType getRepartitionType() {
+    return this.repaType;
+  }
+  
+  public URI getDestURI(int partition) {
+    return this.destURIs.get(partition);
+  }
+  
+  public void setPartitionKeys(Column [] keys, int numPartitions) {
+    Preconditions.checkState(repaType != RepartitionType.NONE,
+        "Hash or Sort repartition only requires the partition keys");
+    Preconditions.checkArgument(keys.length > 0, 
+        "At least one partition key must be specified.");
+    Preconditions.checkArgument(numPartitions > 0,
+        "The number of partitions must be positive: %s", numPartitions);
+    this.partitionKeys = keys;
+    this.numPartitions = numPartitions;
+  }
+  
+  public boolean hasPartitionKeys() {
+    return this.partitionKeys != null;
+  }
+  
+  public Column [] getPartitionKeys() {
+    return this.partitionKeys;
+  }
+  
+  public int getPartitionsNum() {
+    return this.numPartitions;
+  }
+  
+  public Iterator<Entry<Integer, URI>> getAllDestURIs() {
+    return this.destURIs.entrySet().iterator();
+  }
+  
+  public void putDestURI(int partition, URI uri) {
+    this.destURIs.put(partition, uri);
+  }
+  
+  public void setDestURIs(Map<Integer, URI> destURIs) {
+    this.destURIs = destURIs;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof SendNode) {
+      SendNode other = (SendNode) obj;
+      return pipeType == other.pipeType
+          && repaType == other.repaType
+          && TUtil.checkEquals(destURIs, other.destURIs);
+    } else {
+      return false;
+    }
+  }
+  
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(pipeType, repaType, destURIs);
+  }
+  
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    SendNode send = (SendNode) super.clone();
+    send.pipeType = pipeType;
+    send.repaType = repaType;
+    send.destURIs = Maps.newHashMap();
+    for (Entry<Integer, URI> entry : destURIs.entrySet()) {
+      send.destURIs.put(entry.getKey(), entry.getValue());
+    }
+    
+    return send;
+  }
+
+  @Override
+  public String toString() {
+    Gson gson = new GsonBuilder().setPrettyPrinting().create();
+    return gson.toJson(this);
+  }
+
+  @Override
+  public String toJSON() {
+    return GsonCreator.getInstance().toJson(this, LogicalNode.class);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/Edge.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/Edge.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/Edge.java
new file mode 100644
index 0000000..c8bca03
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/Edge.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical.join;
+
+import org.apache.tajo.engine.eval.EvalNode;
+
+public class Edge {
+  private String src;
+  private String target;
+  private EvalNode joinQual;
+
+  public Edge(String src, String target, EvalNode joinQual) {
+    this.src = src;
+    this.target = target;
+    this.joinQual = joinQual;
+  }
+
+  public String getSrc() {
+    return this.src;
+  }
+
+  public String getTarget() {
+    return this.target;
+  }
+
+  public EvalNode getJoinQual() {
+    return this.joinQual;
+  }
+
+  @Override
+  public String toString() {
+    return "(" + src + "=> " + target + ", " + joinQual + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinTree.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinTree.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinTree.java
new file mode 100644
index 0000000..d76af7d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinTree.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical.join;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.EvalTreeUtil;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class JoinTree {
+  private Map<String,List<Edge>> map
+      = Maps.newHashMap();
+
+  public void addJoin(EvalNode node) {
+    List<Column> left = EvalTreeUtil.findAllColumnRefs(node.getLeftExpr());
+    List<Column> right = EvalTreeUtil.findAllColumnRefs(node.getRightExpr());
+
+    String ltbName = left.get(0).getTableName();
+    String rtbName = right.get(0).getTableName();
+
+    Edge l2r = new Edge(ltbName, rtbName, node);
+    Edge r2l = new Edge(rtbName, ltbName, node);
+    List<Edge> edges;
+    if (map.containsKey(ltbName)) {
+      edges = map.get(ltbName);
+    } else {
+      edges = Lists.newArrayList();
+    }
+    edges.add(l2r);
+    map.put(ltbName, edges);
+
+    if (map.containsKey(rtbName)) {
+      edges = map.get(rtbName);
+    } else {
+      edges = Lists.newArrayList();
+    }
+    edges.add(r2l);
+    map.put(rtbName, edges);
+  }
+
+  public int degree(String tableName) {
+    return this.map.get(tableName).size();
+  }
+
+  public Collection<String> getTables() {
+    return Collections.unmodifiableCollection(this.map.keySet());
+  }
+
+  public Collection<Edge> getEdges(String tableName) {
+    return Collections.unmodifiableCollection(this.map.get(tableName));
+  }
+
+  public Collection<Edge> getAllEdges() {
+    List<Edge> edges = Lists.newArrayList();
+    for (List<Edge> edgeList : map.values()) {
+      edges.addAll(edgeList);
+    }
+    return Collections.unmodifiableCollection(edges);
+  }
+
+  public int getTableNum() {
+    return this.map.size();
+  }
+
+  public int getJoinNum() {
+    int sum = 0;
+    for (List<Edge> edgeList : map.values()) {
+      sum += edgeList.size();
+    }
+
+    return sum / 2;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
new file mode 100644
index 0000000..6c90a40
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.collect.Sets;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.eval.ConstEval;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.parser.QueryBlock;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+
+import java.io.IOException;
+import java.util.Set;
+
+public abstract class AggregationExec extends UnaryPhysicalExec {
+  protected GroupbyNode annotation;
+
+  @SuppressWarnings("unused")
+  protected final EvalNode havingQual;
+
+  protected Set<Column> nonNullGroupingFields;
+  protected int keylist [];
+  protected int measureList[];
+  protected final EvalNode evals [];
+  protected EvalContext evalContexts [];
+
+  public AggregationExec(final TaskAttemptContext context, GroupbyNode plan,
+                         PhysicalExec child) throws IOException {
+    super(context, plan.getInSchema(), plan.getOutSchema(), child);
+    this.havingQual = plan.getHavingCondition();
+
+    nonNullGroupingFields = Sets.newHashSet();
+    // keylist will contain a list of IDs of grouping column
+    keylist = new int[plan.getGroupingColumns().length];
+    Column col;
+    for (int idx = 0; idx < plan.getGroupingColumns().length; idx++) {
+      col = plan.getGroupingColumns()[idx];
+      keylist[idx] = inSchema.getColumnId(col.getQualifiedName());
+      nonNullGroupingFields.add(col);
+    }
+
+    // measureList will contain a list of IDs of measure fields
+    int valueIdx = 0;
+    measureList = new int[plan.getTargets().length - keylist.length];
+    if (measureList.length > 0) {
+      search: for (int inputIdx = 0; inputIdx < plan.getTargets().length; inputIdx++) {
+        for (int key : keylist) { // eliminate key field
+          if (plan.getTargets()[inputIdx].getColumnSchema().getColumnName()
+              .equals(inSchema.getColumn(key).getColumnName())) {
+            continue search;
+          }
+        }
+        measureList[valueIdx] = inputIdx;
+        valueIdx++;
+      }
+    }
+
+    evals = new EvalNode[plan.getTargets().length];
+    evalContexts = new EvalContext[plan.getTargets().length];
+    for (int i = 0; i < plan.getTargets().length; i++) {
+      QueryBlock.Target t = plan.getTargets()[i];
+      if (t.getEvalTree().getType() == EvalNode.Type.FIELD && !nonNullGroupingFields.contains(t.getColumnSchema())) {
+        evals[i] = new ConstEval(DatumFactory.createNullDatum());
+        evalContexts[i] = evals[i].newContext();
+      } else {
+        evals[i] = t.getEvalTree();
+        evalContexts[i] = evals[i].newContext();
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    nonNullGroupingFields.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
new file mode 100644
index 0000000..7747dcd
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class BNLJoinExec extends BinaryPhysicalExec {
+  // from logical plan
+  private EvalNode joinQual;
+  private EvalContext qualCtx;
+
+  private final List<Tuple> outerTupleSlots;
+  private final List<Tuple> innerTupleSlots;
+  private Iterator<Tuple> outerIterator;
+  private Iterator<Tuple> innerIterator;
+
+  private boolean innerEnd;
+  private boolean outerEnd;
+
+  // temporal tuples and states for nested loop join
+  private FrameTuple frameTuple;
+  private Tuple outerTuple = null;
+  private Tuple outputTuple = null;
+  private Tuple innext = null;
+
+  private final int TUPLE_SLOT_SIZE = 10000;
+
+  // projection
+  private final int[] targetIds;
+
+  public BNLJoinExec(final TaskAttemptContext context, final JoinNode join,
+                     final PhysicalExec outer, PhysicalExec inner) {
+    super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()),
+        SchemaUtil.merge(outer.getSchema(), inner.getSchema()), outer, inner);
+    this.joinQual = join.getJoinQual();
+    this.qualCtx = this.joinQual.newContext();
+    this.outerTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE);
+    this.innerTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE);
+    this.outerIterator = outerTupleSlots.iterator();
+    this.innerIterator = innerTupleSlots.iterator();
+    this.innerEnd = false;
+    this.outerEnd = false;
+
+    // for projection
+    targetIds = RowStoreUtil.getTargetIds(inSchema, outSchema);
+
+    // for join
+    frameTuple = new FrameTuple();
+    outputTuple = new VTuple(outSchema.getColumnNum());
+  }
+
+  public Tuple next() throws IOException {
+
+    if (outerTupleSlots.isEmpty()) {
+      for (int k = 0; k < TUPLE_SLOT_SIZE; k++) {
+        Tuple t = outerChild.next();
+        if (t == null) {
+          outerEnd = true;
+          break;
+        }
+        outerTupleSlots.add(t);
+      }
+      outerIterator = outerTupleSlots.iterator();
+      outerTuple = outerIterator.next();
+    }
+
+    if (innerTupleSlots.isEmpty()) {
+      for (int k = 0; k < TUPLE_SLOT_SIZE; k++) {
+        Tuple t = innerChild.next();
+        if (t == null) {
+          innerEnd = true;
+          break;
+        }
+        innerTupleSlots.add(t);
+      }
+      innerIterator = innerTupleSlots.iterator();
+    }
+
+    if((innext = innerChild.next()) == null){
+      innerEnd = true;
+    }
+
+    while (true) {
+      if (!innerIterator.hasNext()) { // if inneriterator ended
+        if (outerIterator.hasNext()) { // if outertupleslot remains
+          outerTuple = outerIterator.next();
+          innerIterator = innerTupleSlots.iterator();
+        } else {
+          if (innerEnd) {
+            innerChild.rescan();
+            innerEnd = false;
+            
+            if (outerEnd) {
+              return null;
+            }
+            outerTupleSlots.clear();
+            for (int k = 0; k < TUPLE_SLOT_SIZE; k++) {
+              Tuple t = outerChild.next();
+              if (t == null) {
+                outerEnd = true;
+                break;
+              }
+              outerTupleSlots.add(t);
+            }
+            if (outerTupleSlots.isEmpty()) {
+              return null;
+            }
+            outerIterator = outerTupleSlots.iterator();
+            outerTuple = outerIterator.next();
+            
+          } else {
+            outerIterator = outerTupleSlots.iterator();
+            outerTuple = outerIterator.next();
+          }
+          
+          innerTupleSlots.clear();
+          if (innext != null) {
+            innerTupleSlots.add(innext);
+            for (int k = 1; k < TUPLE_SLOT_SIZE; k++) { // fill inner
+              Tuple t = innerChild.next();
+              if (t == null) {
+                innerEnd = true;
+                break;
+              }
+              innerTupleSlots.add(t);
+            }
+          } else {
+            for (int k = 0; k < TUPLE_SLOT_SIZE; k++) { // fill inner
+              Tuple t = innerChild.next();
+              if (t == null) {
+                innerEnd = true;
+                break;
+              }
+              innerTupleSlots.add(t);
+            }
+          }
+          
+          if ((innext = innerChild.next()) == null) {
+            innerEnd = true;
+          }
+          innerIterator = innerTupleSlots.iterator();
+        }
+      }
+
+      frameTuple.set(outerTuple, innerIterator.next());
+      if (joinQual != null) {
+        joinQual.eval(qualCtx, inSchema, frameTuple);
+        if (joinQual.terminate(qualCtx).asBool()) {
+          RowStoreUtil.project(frameTuple, outputTuple, targetIds);
+          return outputTuple;
+        }
+      } else {
+        RowStoreUtil.project(frameTuple, outputTuple, targetIds);
+        return outputTuple;
+      }
+    }
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+    innerEnd = false;
+    innerTupleSlots.clear();
+    outerTupleSlots.clear();
+    innerIterator = innerTupleSlots.iterator();
+    outerIterator = outerTupleSlots.iterator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
new file mode 100644
index 0000000..e3b299d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+
+import java.io.IOException;
+
+public class BSTIndexScanExec extends PhysicalExec {
+  private ScanNode scanNode;
+  private SeekableScanner fileScanner;
+  
+  private EvalNode qual;
+  private EvalContext qualCtx;
+  private BSTIndex.BSTIndexReader reader;
+  
+  private final Projector projector;
+  private EvalContext [] evalContexts;
+  
+  private Datum[] datum = null;
+  
+  private boolean initialize = true;
+  
+  public BSTIndexScanExec(TaskAttemptContext context,
+                          StorageManager sm , ScanNode scanNode ,
+       Fragment fragment, Path fileName , Schema keySchema,
+       TupleComparator comparator , Datum[] datum) throws IOException {
+    super(context, scanNode.getInSchema(), scanNode.getOutSchema());
+    this.scanNode = scanNode;
+    this.qual = scanNode.getQual();
+    if(this.qual == null) {
+      this.qualCtx = null;
+    } else {
+      this.qualCtx = this.qual.newContext();
+    }
+    this.datum = datum;
+
+    this.fileScanner = (SeekableScanner)StorageManager.getScanner(context.getConf(),
+        fragment.getMeta(), fragment, outSchema);
+    this.projector = new Projector(inSchema, outSchema, scanNode.getTargets());
+    this.evalContexts = projector.renew();
+
+    this.reader = new BSTIndex(sm.getFileSystem().getConf()).
+        getIndexReader(fileName, keySchema, comparator);
+    this.reader.open();
+  }
+
+  @Override
+  public void init() throws IOException {
+
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    if(initialize) {
+      //TODO : more complicated condition
+      Tuple key = new VTuple(datum.length);
+      key.put(datum);
+      long offset = reader.find(key);
+      if (offset == -1) {
+        reader.close();
+        fileScanner.close();
+        return null;
+      }else {
+        fileScanner.seek(offset);
+      }
+      initialize = false;
+    } else {
+      if(!reader.isCurInMemory()) {
+        return null;
+      }
+      long offset = reader.next();
+      if(offset == -1 ) {
+        reader.close();
+        fileScanner.close();
+        return null;
+      } else { 
+      fileScanner.seek(offset);
+      }
+    }
+    Tuple tuple;
+    Tuple outTuple = new VTuple(this.outSchema.getColumnNum());
+    if (!scanNode.hasQual()) {
+      if ((tuple = fileScanner.next()) != null) {
+        projector.eval(evalContexts, tuple);
+        projector.terminate(evalContexts, outTuple);
+        return outTuple;
+      } else {
+        return null;
+      }
+    } else {
+       while( reader.isCurInMemory() && (tuple = fileScanner.next()) != null) {
+         qual.eval(qualCtx, inSchema, tuple);
+         if (qual.terminate(qualCtx).asBool()) {
+           projector.eval(evalContexts, tuple);
+           projector.terminate(evalContexts, outTuple);
+           return outTuple;
+         } else {
+           fileScanner.seek(reader.next());
+         }
+       }
+     }
+
+    return null;
+  }
+  @Override
+  public void rescan() throws IOException {
+    fileScanner.reset();
+  }
+
+  @Override
+  public void close() throws IOException {
+    reader.close();
+    fileScanner.close();
+  }
+
+}


Mime
View raw message