tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [4/5] TAJO-255: Cleanup exceptions of engine. (hyunsik)
Date Wed, 16 Oct 2013 09:28:28 GMT
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/exception/NoSuchQueryIdException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/exception/NoSuchQueryIdException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/exception/NoSuchQueryIdException.java
deleted file mode 100644
index 153164b..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/exception/NoSuchQueryIdException.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * 
- */
-package org.apache.tajo.engine.exception;
-
-public class NoSuchQueryIdException extends Exception {
-
-  private static final long serialVersionUID = -4425982532461186746L;
-
-  public NoSuchQueryIdException() {
-    
-  }
-  
-  public NoSuchQueryIdException(String message) {
-    super(message);
-  }
-  
-  public NoSuchQueryIdException(Exception e) {
-    super(e);
-  }
-  
-  public NoSuchQueryIdException(String message, Exception e) {
-    super(message, e);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/exception/RangeOverflowException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/exception/RangeOverflowException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/exception/RangeOverflowException.java
new file mode 100644
index 0000000..409d6ed
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/exception/RangeOverflowException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.exception;
+
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleRange;
+
+public class RangeOverflowException extends RuntimeException {
+  public RangeOverflowException(TupleRange range, Tuple overflowValue, long inc) {
+    super("Overflow Error: tried to increase " + inc + " to " + overflowValue + ", but the range " + range);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/exception/UndefinedFunctionException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/exception/UndefinedFunctionException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/exception/UndefinedFunctionException.java
new file mode 100644
index 0000000..36d9c5d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/exception/UndefinedFunctionException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.exception;
+
+
+public class UndefinedFunctionException extends InvalidQueryException {
+	private static final long serialVersionUID = 113593927391549716L;
+
+	/**
+	 * @param signature
+	 */
+	public UndefinedFunctionException(String signature) {
+		super("Error: call to undefined function "+signature+"()");	
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/exception/UnfinishedTaskException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/exception/UnfinishedTaskException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/exception/UnfinishedTaskException.java
deleted file mode 100644
index 0f9c47f..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/exception/UnfinishedTaskException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * 
- */
-package org.apache.tajo.engine.exception;
-
-public class UnfinishedTaskException extends Exception {
-
-  private static final long serialVersionUID = -3229141373378209229L;
-  
-  public UnfinishedTaskException() {
-    
-  }
-
-  public UnfinishedTaskException(String message) {
-    super(message);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/exception/UnknownWorkerException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/exception/UnknownWorkerException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/exception/UnknownWorkerException.java
deleted file mode 100644
index c524b1d..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/exception/UnknownWorkerException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * 
- */
-package org.apache.tajo.engine.exception;
-
-public class UnknownWorkerException extends Exception {
-
-  /**
-   * 
-   */
-  private static final long serialVersionUID = -3677733092100608744L;
-  private String unknownName;
-
-  public UnknownWorkerException(String unknownName) {
-    this.unknownName = unknownName;
-  }
-
-  public UnknownWorkerException(String unknownName, Exception e) {
-    super(e);
-    this.unknownName = unknownName;
-  }
-
-  public String getUnknownName() {
-    return this.unknownName;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/exception/VerifyException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/exception/VerifyException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/exception/VerifyException.java
new file mode 100644
index 0000000..beda787
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/exception/VerifyException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.exception;
+
+import org.apache.tajo.engine.planner.PlanningException;
+
+public class VerifyException extends PlanningException {
+  public VerifyException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/HiveConverter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/HiveConverter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/HiveConverter.java
index 95d5c2d..e3df22c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/HiveConverter.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/HiveConverter.java
@@ -27,8 +27,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.engine.query.exception.SQLParseError;
-import org.apache.tajo.engine.query.exception.SQLSyntaxError;
 
 import java.math.BigInteger;
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
index 70d1975..0560e8b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
@@ -28,8 +28,6 @@ import org.apache.tajo.algebra.*;
 import org.apache.tajo.algebra.Aggregation.GroupType;
 import org.apache.tajo.algebra.LiteralValue.LiteralType;
 import org.apache.tajo.engine.parser.SQLParser.*;
-import org.apache.tajo.engine.query.exception.SQLParseError;
-import org.apache.tajo.engine.query.exception.SQLSyntaxError;
 import org.apache.tajo.storage.CSVFile;
 
 import java.util.HashMap;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLErrorListener.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLErrorListener.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLErrorListener.java
index 24ef3cb..1df5f7b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLErrorListener.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLErrorListener.java
@@ -20,7 +20,6 @@ package org.apache.tajo.engine.parser;
 
 import org.antlr.v4.runtime.*;
 import org.apache.commons.lang.StringUtils;
-import org.apache.tajo.engine.query.exception.SQLParseError;
 
 public class SQLErrorListener extends BaseErrorListener {
   public void syntaxError(Recognizer<?, ?> recognizer,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLParseError.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLParseError.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLParseError.java
new file mode 100644
index 0000000..2106b53
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLParseError.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.parser;
+
+
+import org.antlr.v4.runtime.Token;
+import org.apache.commons.lang.StringUtils;
+
+public class SQLParseError extends RuntimeException {
+  private String header;
+  private String errorLine;
+  private int charPositionInLine;
+  private int line;
+  private Token offendingToken;
+  private String detailedMessage;
+
+  public SQLParseError(Token offendingToken,
+                       int line, int charPositionInLine,
+                       String msg,
+                       String errorLine) {
+    super(msg);
+    this.offendingToken = offendingToken;
+    this.charPositionInLine = charPositionInLine;
+    this.line = line;
+    this.errorLine = errorLine;
+    this.header = msg;
+  }
+
+  @Override
+  public String getMessage() {
+    if (detailedMessage == null) {
+      if (offendingToken != null) {
+        detailedMessage = getDetailedMessageWithLocation();
+      } else {
+        StringBuilder sb = new StringBuilder();
+        sb.append("ERROR: ").append(header).append("\n");
+        sb.append("LINE: ").append(errorLine);
+        detailedMessage = sb.toString();
+      }
+    }
+
+    return detailedMessage;
+  }
+
+  public String getMessageHeader(){
+    return this.header;
+  }
+
+  private String getDetailedMessageWithLocation() {
+    StringBuilder sb = new StringBuilder();
+    int displayLimit = 80;
+    String queryPrefix = "LINE " + line + ":" + charPositionInLine + " ";
+    String prefixPadding = StringUtils.repeat(" ", queryPrefix.length());
+    String locationString;
+
+    int tokenLength = offendingToken.getStopIndex() - offendingToken.getStartIndex() + 1;
+    if(tokenLength > 0){
+      locationString = StringUtils.repeat(" ", charPositionInLine) + StringUtils.repeat("^", tokenLength);
+    } else {
+      locationString = StringUtils.repeat(" ", charPositionInLine) + "^";
+    }
+
+    sb.append("ERROR: ").append(header).append("\n");
+    sb.append(queryPrefix);
+
+    if (errorLine.length() > displayLimit) {
+      int padding = (displayLimit / 2);
+
+      String ellipsis = " ... ";
+      int startPos = locationString.length() - padding - 1;
+      if (startPos <= 0) {
+        startPos = 0;
+        sb.append(errorLine.substring(startPos, displayLimit)).append(ellipsis).append("\n");
+        sb.append(prefixPadding).append(locationString);
+      } else if (errorLine.length() - (locationString.length() + padding) <= 0) {
+        startPos = errorLine.length() - displayLimit - 1;
+        sb.append(ellipsis).append(errorLine.substring(startPos)).append("\n");
+        sb.append(prefixPadding).append(StringUtils.repeat(" ", ellipsis.length()))
+            .append(locationString.substring(startPos));
+      } else {
+        sb.append(ellipsis).append(errorLine.substring(startPos, startPos + displayLimit)).append(ellipsis).append("\n");
+        sb.append(prefixPadding).append(StringUtils.repeat(" ", ellipsis.length()))
+            .append(locationString.substring(startPos));
+      }
+    } else {
+      sb.append(errorLine).append("\n");
+      sb.append(prefixPadding).append(locationString);
+    }
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLSyntaxError.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLSyntaxError.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLSyntaxError.java
new file mode 100644
index 0000000..d565509
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLSyntaxError.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.parser;
+
+
+import org.apache.tajo.engine.exception.InvalidQueryException;
+
+public class SQLSyntaxError extends InvalidQueryException {
+  private static final long serialVersionUID = 5388279335175632066L;
+
+  private String errorMessage;
+  private String detailedMessage;
+  private SQLParseError parseError;
+
+  public SQLSyntaxError(String errorMessage) {
+    this.errorMessage = errorMessage;
+  }
+
+  public SQLSyntaxError(SQLParseError e) {
+    this.errorMessage = e.getMessageHeader();
+    this.parseError = e;
+  }
+
+  @Override
+  public String getMessage() {
+    if (detailedMessage == null) {
+      if (parseError != null) {
+        detailedMessage = parseError.getMessage();
+      } else {
+        detailedMessage = String.format("ERROR: %s\n", errorMessage);
+      }
+    }
+    return detailedMessage;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
index 395a9e2..df6081d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@ -25,6 +25,8 @@ import org.apache.tajo.annotation.NotThreadSafe;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.exception.NoSuchColumnException;
+import org.apache.tajo.engine.exception.VerifyException;
 import org.apache.tajo.engine.planner.graph.DirectedGraphCursor;
 import org.apache.tajo.engine.planner.graph.SimpleDirectedGraph;
 import org.apache.tajo.engine.planner.logical.*;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 74b6da3..e41605b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -37,12 +37,13 @@ import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.exception.VerifyException;
 import org.apache.tajo.engine.function.AggFunction;
 import org.apache.tajo.engine.function.GeneralFunction;
 import org.apache.tajo.engine.planner.LogicalPlan.QueryBlock;
 import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.engine.query.exception.InvalidQueryException;
-import org.apache.tajo.engine.query.exception.UndefinedFunctionException;
+import org.apache.tajo.engine.exception.InvalidQueryException;
+import org.apache.tajo.engine.exception.UndefinedFunctionException;
 import org.apache.tajo.engine.utils.SchemaUtil;
 import org.apache.tajo.exception.InternalException;
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/NoSuchColumnException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/NoSuchColumnException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/NoSuchColumnException.java
deleted file mode 100644
index 56ae230..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/NoSuchColumnException.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner;
-
-public class NoSuchColumnException extends VerifyException {
-  public NoSuchColumnException(String columnName) {
-    super("ERROR: no such column '" + columnName + "'");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
index e8bdbc5..ebe47b4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
@@ -21,7 +21,7 @@
  */
 package org.apache.tajo.engine.planner;
 
-import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.physical.PhysicalExec;
 import org.apache.tajo.exception.InternalException;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index a64eecf..a8aaa68 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -26,8 +26,8 @@ import com.google.common.collect.ObjectArrays;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.DataChannel;
-import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.proto.CatalogProtos;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index e224328..cca523e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -30,7 +30,7 @@ import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.engine.eval.*;
 import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.engine.query.exception.InvalidQueryException;
+import org.apache.tajo.engine.exception.InvalidQueryException;
 import org.apache.tajo.storage.TupleComparator;
 
 import java.util.*;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangeOverflowException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangeOverflowException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangeOverflowException.java
deleted file mode 100644
index c18d403..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangeOverflowException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner;
-
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleRange;
-
-public class RangeOverflowException extends RuntimeException {
-  public RangeOverflowException(TupleRange range, Tuple overflowValue, long inc) {
-    super("Overflow Error: tried to increase " + inc + " to " + overflowValue + ", but the range " + range);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
index b52be45..4f18c95 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
@@ -24,6 +24,7 @@ import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.exception.RangeOverflowException;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.storage.VTuple;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/VerifyException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/VerifyException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/VerifyException.java
deleted file mode 100644
index f8427d8..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/VerifyException.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner;
-
-public class VerifyException extends PlanningException {
-  public VerifyException(String message) {
-    super(message);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
new file mode 100644
index 0000000..0401718
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+
+public class DataChannel {
+  private ExecutionBlockId srcId;
+  private ExecutionBlockId targetId;
+  private TransmitType transmitType = TransmitType.PULL_TRANSMIT;
+  private PartitionType partitionType;
+  private Integer partitionNum = 1;
+  private Column[] key;
+
+  private Schema schema;
+
+  private StoreType storeType = StoreType.CSV;
+
+  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId) {
+    this.srcId = srcId;
+    this.targetId = targetId;
+  }
+
+  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, PartitionType partitionType) {
+    this(srcId, targetId);
+    this.partitionType = partitionType;
+  }
+
+  public DataChannel(ExecutionBlock src, ExecutionBlock target, PartitionType partitionType, int partNum) {
+    this(src.getId(), target.getId(), partitionType, partNum);
+    setSchema(src.getPlan().getOutSchema());
+  }
+
+  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, PartitionType partitionType, int partNum) {
+    this(srcId, targetId, partitionType);
+    this.partitionNum = partNum;
+  }
+
+  public DataChannel(DataChannelProto proto) {
+    this.srcId = new ExecutionBlockId(proto.getSrcId());
+    this.targetId = new ExecutionBlockId(proto.getTargetId());
+    this.transmitType = proto.getTransmitType();
+    this.partitionType = proto.getPartitionType();
+    if (proto.hasSchema()) {
+      this.setSchema(new Schema(proto.getSchema()));
+    }
+    if (proto.getPartitionKeyCount() > 0) {
+      key = new Column[proto.getPartitionKeyCount()];
+      for (int i = 0; i < proto.getPartitionKeyCount(); i++) {
+        key[i] = new Column(proto.getPartitionKey(i));
+      }
+    } else {
+      key = new Column[] {};
+    }
+    if (proto.hasPartitionNum()) {
+      this.partitionNum = proto.getPartitionNum();
+    }
+  }
+
+  public ExecutionBlockId getSrcId() {
+    return srcId;
+  }
+
+  public ExecutionBlockId getTargetId() {
+    return targetId;
+  }
+
+  public PartitionType getPartitionType() {
+    return partitionType;
+  }
+
+  public TransmitType getTransmitType() {
+    return this.transmitType;
+  }
+
+  public void setTransmitType(TransmitType transmitType) {
+    this.transmitType = transmitType;
+  }
+
+  public void setPartition(PartitionType partitionType, Column [] keys, int numPartitions) {
+    Preconditions.checkArgument(keys.length >= 0, "At least one partition key must be specified.");
+    Preconditions.checkArgument(numPartitions > 0, "The number of partitions must be positive: %s", numPartitions);
+
+    this.partitionType = partitionType;
+    this.key = keys;
+    this.partitionNum = numPartitions;
+  }
+
+  public void setPartitionType(PartitionType partitionType) {
+    this.partitionType = partitionType;
+  }
+
+  public boolean hasPartitionKey() {
+    return key != null;
+  }
+
+  public void setPartitionKey(Column [] key) {
+    this.key = key;
+  }
+
+  public Column [] getPartitionKey() {
+    return this.key;
+  }
+
+  public void setPartitionNum(int partNum) {
+    this.partitionNum = partNum;
+  }
+
+  public int getPartitionNum() {
+    return partitionNum;
+  }
+
+  public boolean hasStoreType() {
+    return this.storeType != null;
+  }
+
+  public void setStoreType(StoreType storeType) {
+    this.storeType = storeType;
+  }
+
+  public StoreType getStoreType() {
+    return storeType;
+  }
+
+  public DataChannelProto getProto() {
+    DataChannelProto.Builder builder = DataChannelProto.newBuilder();
+    builder.setSrcId(srcId.getProto());
+    builder.setTargetId(targetId.getProto());
+    if (transmitType != null) {
+      builder.setTransmitType(transmitType);
+    }
+    builder.setPartitionType(partitionType);
+    if (schema != null) {
+      builder.setSchema(schema.getProto());
+    }
+    if (key != null) {
+      for (Column column : key) {
+        builder.addPartitionKey(column.getProto());
+      }
+    }
+    if (partitionNum != null) {
+      builder.setPartitionNum(partitionNum);
+    }
+    return builder.build();
+  }
+
+  public void setSchema(Schema schema) {
+    this.schema = (Schema) schema.clone();
+  }
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("[").append(srcId.getQueryId()).append("] ");
+    sb.append(srcId.getId()).append(" => ").append(targetId.getId());
+    sb.append(" (type=").append(partitionType);
+    if (hasPartitionKey()) {
+      sb.append(", key=");
+      boolean first = true;
+      for (Column column : getPartitionKey()) {
+        if (first) {
+          first = false;
+        } else {
+          sb.append(",");
+        }
+        sb.append(column.getColumnName());
+      }
+      sb.append(", num=").append(partitionNum);
+    }
+    sb.append(")");
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
new file mode 100644
index 0000000..0dc393c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.*;
+
+import java.util.*;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
+
+/**
+ * A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
+ * An ExecutionBlock is a basic execution unit that could be distributed across a number of nodes.
+ * An ExecutionBlock class contains input information (e.g., child execution blocks or input
+ * tables), and output information (e.g., partition type, partition key, and partition number).
+ * In addition, it includes a logical plan to be executed in each node.
+ */
+public class ExecutionBlock {
+  private ExecutionBlockId executionBlockId;
+  private LogicalNode plan = null;
+  private StoreTableNode store = null;
+  private List<ScanNode> scanlist = new ArrayList<ScanNode>();
+  private ExecutionBlock parent;
+  private Map<ScanNode, ExecutionBlock> childSubQueries = new HashMap<ScanNode, ExecutionBlock>();
+  private PartitionType outputType;
+  private Enforcer enforcer = new Enforcer();
+
+  private boolean hasJoinPlan;
+  private boolean hasUnionPlan;
+
+  private Set<String> broadcasted = new HashSet<String>();
+
+  public ExecutionBlock(ExecutionBlockId executionBlockId) {
+    this.executionBlockId = executionBlockId;
+  }
+
+  public ExecutionBlockId getId() {
+    return executionBlockId;
+  }
+
+  public PartitionType getPartitionType() {
+    return outputType;
+  }
+
+  public void setPlan(LogicalNode plan) {
+    hasJoinPlan = false;
+    hasUnionPlan = false;
+    this.scanlist.clear();
+    this.plan = plan;
+
+    LogicalNode node = plan;
+    ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
+    s.add(node);
+    while (!s.isEmpty()) {
+      node = s.remove(s.size()-1);
+      if (node instanceof UnaryNode) {
+        UnaryNode unary = (UnaryNode) node;
+        s.add(s.size(), unary.getChild());
+      } else if (node instanceof BinaryNode) {
+        BinaryNode binary = (BinaryNode) node;
+        if (binary.getType() == NodeType.JOIN) {
+          hasJoinPlan = true;
+        } else if (binary.getType() == NodeType.UNION) {
+          hasUnionPlan = true;
+        }
+        s.add(s.size(), binary.getLeftChild());
+        s.add(s.size(), binary.getRightChild());
+      } else if (node instanceof ScanNode) {
+        scanlist.add((ScanNode)node);
+      } else if (node instanceof TableSubQueryNode) {
+        TableSubQueryNode subQuery = (TableSubQueryNode) node;
+        s.add(s.size(), subQuery.getSubQuery());
+      }
+    }
+  }
+
+
+  public LogicalNode getPlan() {
+    return plan;
+  }
+
+  public Enforcer getEnforcer() {
+    return enforcer;
+  }
+
+  public boolean isRoot() {
+    return !hasParentBlock() || !(getParentBlock().hasParentBlock()) && getParentBlock().hasUnion();
+  }
+
+  public boolean hasParentBlock() {
+    return parent != null;
+  }
+
+  public ExecutionBlock getParentBlock() {
+    return parent;
+  }
+
+  public Collection<ExecutionBlock> getChildBlocks() {
+    return Collections.unmodifiableCollection(childSubQueries.values());
+  }
+
+  public boolean isLeafBlock() {
+    return childSubQueries.size() == 0;
+  }
+
+  public StoreTableNode getStoreTableNode() {
+    return store;
+  }
+
+  public ScanNode [] getScanNodes() {
+    return this.scanlist.toArray(new ScanNode[scanlist.size()]);
+  }
+
+  public Schema getOutputSchema() {
+    return store.getOutSchema();
+  }
+
+  public boolean hasJoin() {
+    return hasJoinPlan;
+  }
+
+  public boolean hasUnion() {
+    return hasUnionPlan;
+  }
+
+  public void addBroadcastTables(Collection<String> tableNames) {
+    broadcasted.addAll(tableNames);
+  }
+
+  public void addBroadcastTable(String tableName) {
+    broadcasted.add(tableName);
+  }
+
+  public boolean isBroadcastTable(String tableName) {
+    return broadcasted.contains(tableName);
+  }
+
+  public Collection<String> getBroadcastTables() {
+    return broadcasted;
+  }
+
+  public String toString() {
+    return executionBlockId.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
new file mode 100644
index 0000000..88fd68a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import java.util.ArrayList;
+
+/**
+ * A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
+ * This class is a pointer to an ExecutionBlock that the query engine should execute.
+ * For each call of nextBlock(), it retrieves a next ExecutionBlock in a postfix order.
+ */
+public class ExecutionBlockCursor {
+  private MasterPlan masterPlan;
+  private ArrayList<ExecutionBlock> orderedBlocks = new ArrayList<ExecutionBlock>();
+  private int cursor = 0;
+
+  public ExecutionBlockCursor(MasterPlan plan) {
+    this.masterPlan = plan;
+    buildOrder(plan.getRoot());
+  }
+
+  public int size() {
+    return orderedBlocks.size();
+  }
+
+  private void buildOrder(ExecutionBlock current) {
+    if (!masterPlan.isLeaf(current.getId())) {
+      if (masterPlan.getChildCount(current.getId()) == 1) {
+        ExecutionBlock block = masterPlan.getChild(current, 0);
+        buildOrder(block);
+      } else {
+        for (ExecutionBlock exec : masterPlan.getChilds(current)) {
+          buildOrder(exec);
+        }
+      }
+    }
+    orderedBlocks.add(current);
+  }
+
+  public boolean hasNext() {
+    return cursor < orderedBlocks.size();
+  }
+
+  public ExecutionBlock nextBlock() {
+    return orderedBlocks.get(cursor++);
+  }
+
+  public ExecutionBlock peek() {
+    return orderedBlocks.get(cursor);
+  }
+
+  public ExecutionBlock peek(int skip) {
+    return  orderedBlocks.get(cursor + skip);
+  }
+
+  public void reset() {
+    cursor = 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
new file mode 100644
index 0000000..e540570
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -0,0 +1,588 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
+import org.apache.tajo.engine.eval.EvalTreeUtil;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.storage.AbstractStorageManager;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.*;
+
+public class GlobalPlanner {
+  private static Log LOG = LogFactory.getLog(GlobalPlanner.class);
+
+  private TajoConf conf;
+  private AbstractStorageManager sm;
+
+  public GlobalPlanner(final TajoConf conf, final AbstractStorageManager sm)
+      throws IOException {
+    this.conf = conf;
+    this.sm = sm;
+  }
+
+  public class GlobalPlanContext {
+    MasterPlan plan;
+    Set<String> broadcastTables = new HashSet<String>();
+    LogicalNode topmost;
+    LogicalNode lastRepartionableNode;
+    ExecutionBlock topMostLeftExecBlock;
+  }
+
+  /**
+   * Builds a master plan from the given logical plan.
+   */
+  public void build(MasterPlan masterPlan)
+      throws IOException, PlanningException {
+
+    DistributedPlannerVisitor planner = new DistributedPlannerVisitor();
+    GlobalPlanContext globalPlanContext = new GlobalPlanContext();
+    globalPlanContext.plan = masterPlan;
+    LOG.info(masterPlan.getLogicalPlan());
+
+    LogicalNode rootNode = PlannerUtil.clone(masterPlan.getLogicalPlan().getRootBlock().getRoot());
+    planner.visitChild(globalPlanContext, masterPlan.getLogicalPlan(), rootNode, new Stack<LogicalNode>());
+
+    ExecutionBlock terminalBlock = masterPlan.createTerminalBlock();
+
+    if (globalPlanContext.lastRepartionableNode != null
+        && globalPlanContext.lastRepartionableNode.getType() == NodeType.UNION) {
+      UnionNode unionNode = (UnionNode) globalPlanContext.lastRepartionableNode;
+      ConsecutiveUnionFinder finder = new ConsecutiveUnionFinder();
+      UnionsFinderContext finderContext = new UnionsFinderContext();
+      finder.visitChild(finderContext, masterPlan.getLogicalPlan(), unionNode, new Stack<LogicalNode>());
+
+      for (UnionNode union : finderContext.unionList) {
+        TableSubQueryNode leftSubQuery = union.getLeftChild();
+        TableSubQueryNode rightSubQuery = union.getRightChild();
+        if (leftSubQuery.getSubQuery().getType() != NodeType.UNION) {
+          ExecutionBlock execBlock = masterPlan.newExecutionBlock();
+          execBlock.setPlan(leftSubQuery);
+          DataChannel dataChannel = new DataChannel(execBlock, terminalBlock, NONE_PARTITION, 1);
+          masterPlan.addConnect(dataChannel);
+        }
+        if (rightSubQuery.getSubQuery().getType() != NodeType.UNION) {
+          ExecutionBlock execBlock = masterPlan.newExecutionBlock();
+          execBlock.setPlan(rightSubQuery);
+          DataChannel dataChannel = new DataChannel(execBlock, terminalBlock, NONE_PARTITION, 1);
+          masterPlan.addConnect(dataChannel);
+        }
+      }
+    } else {
+      DataChannel dataChannel = new DataChannel(globalPlanContext.topMostLeftExecBlock, terminalBlock, NONE_PARTITION, 1);
+      dataChannel.setSchema(globalPlanContext.topmost.getOutSchema());
+      masterPlan.addConnect(dataChannel);
+    }
+    masterPlan.setTerminal(terminalBlock);
+    LOG.info(masterPlan);
+  }
+
+  private ExecutionBlock buildRepartitionBlocks(MasterPlan masterPlan, LogicalNode lastDistNode, LogicalNode curNode,
+                                                LogicalNode childNode, ExecutionBlock lastChildBlock)
+      throws PlanningException {
+
+    ExecutionBlock currentBlock = null;
+    ExecutionBlock childBlock;
+    childBlock = lastChildBlock;
+
+    NodeType shuffleRequiredNodeType = lastDistNode.getType();
+    if (shuffleRequiredNodeType == NodeType.GROUP_BY) {
+      ExecutionBlock [] blocks = buildGroupBy(masterPlan, lastDistNode, curNode, childNode, childBlock);
+      currentBlock = blocks[0];
+    } else if (shuffleRequiredNodeType == NodeType.SORT) {
+      ExecutionBlock [] blocks = buildSortPlan(masterPlan, lastDistNode, curNode, childNode, childBlock);
+      currentBlock = blocks[0];
+    } else if (shuffleRequiredNodeType == NodeType.JOIN) {
+      ExecutionBlock [] blocks = buildJoinPlan(masterPlan, lastDistNode, childBlock, lastChildBlock);
+      currentBlock = blocks[0];
+    }
+
+    return currentBlock;
+  }
+
+  public static ScanNode buildInputExecutor(LogicalPlan plan, DataChannel channel) {
+    Preconditions.checkArgument(channel.getSchema() != null,
+        "Channel schema (" + channel.getSrcId().getId() +" -> "+ channel.getTargetId().getId()+") is not initialized");
+    TableMeta meta = new TableMetaImpl(channel.getSchema(), channel.getStoreType(), new Options());
+    TableDesc desc = new TableDescImpl(channel.getSrcId().toString(), meta, new Path("/"));
+    return new ScanNode(plan.newPID(), desc);
+  }
+
+  private DataChannel createDataChannelFromJoin(ExecutionBlock leftBlock, ExecutionBlock rightBlock,
+                                                ExecutionBlock parent, JoinNode join, boolean leftTable) {
+    ExecutionBlock childBlock = leftTable ? leftBlock : rightBlock;
+
+    DataChannel channel = new DataChannel(childBlock, parent, HASH_PARTITION, 32);
+    if (join.getJoinType() != JoinType.CROSS) {
+      Column [][] joinColumns = PlannerUtil.joinJoinKeyForEachTable(join.getJoinQual(),
+          leftBlock.getPlan().getOutSchema(), rightBlock.getPlan().getOutSchema());
+      if (leftTable) {
+        channel.setPartitionKey(joinColumns[0]);
+      } else {
+        channel.setPartitionKey(joinColumns[1]);
+      }
+    }
+    return channel;
+  }
+
+  private ExecutionBlock [] buildJoinPlan(MasterPlan masterPlan, LogicalNode lastDistNode,
+                                          ExecutionBlock childBlock, ExecutionBlock lastChildBlock)
+      throws PlanningException {
+    ExecutionBlock currentBlock;
+
+    JoinNode joinNode = (JoinNode) lastDistNode;
+    LogicalNode leftNode = joinNode.getLeftChild();
+    LogicalNode rightNode = joinNode.getRightChild();
+
+    boolean leftBroadcasted = false;
+    boolean rightBroadcasted = false;
+
+    if (leftNode.getType() == NodeType.SCAN && rightNode.getType() == NodeType.SCAN ) {
+      ScanNode leftScan = (ScanNode) leftNode;
+      ScanNode rightScan = (ScanNode) rightNode;
+
+      TableMeta leftMeta = leftScan.getTableDesc().getMeta();
+      TableMeta rightMeta = rightScan.getTableDesc().getMeta();
+      long broadcastThreshold = conf.getLongVar(TajoConf.ConfVars.BROADCAST_JOIN_THRESHOLD);
+
+      if (leftMeta.getStat().getNumBytes() < broadcastThreshold) {
+        leftBroadcasted = true;
+      }
+      if (rightMeta.getStat().getNumBytes() < broadcastThreshold) {
+        rightBroadcasted = true;
+      }
+
+      if (leftBroadcasted || rightBroadcasted) {
+        currentBlock = masterPlan.newExecutionBlock();
+        currentBlock.setPlan(joinNode);
+        if (leftBroadcasted) {
+          currentBlock.addBroadcastTable(leftScan.getCanonicalName());
+        }
+        if (rightBroadcasted) {
+          currentBlock.addBroadcastTable(rightScan.getCanonicalName());
+        }
+        return new ExecutionBlock[] { currentBlock, childBlock };
+      }
+    }
+
+    // symmetric repartition join
+
+    ExecutionBlock leftBlock;
+    if (lastChildBlock == null) {
+      leftBlock = masterPlan.newExecutionBlock();
+      leftBlock.setPlan(leftNode);
+    } else {
+      leftBlock = lastChildBlock;
+    }
+    ExecutionBlock rightBlock = masterPlan.newExecutionBlock();
+    rightBlock.setPlan(rightNode);
+
+    currentBlock = masterPlan.newExecutionBlock();
+
+    DataChannel leftChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, true);
+    DataChannel rightChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, false);
+
+    ScanNode leftScan = buildInputExecutor(masterPlan.getLogicalPlan(), leftChannel);
+    ScanNode rightScan = buildInputExecutor(masterPlan.getLogicalPlan(), rightChannel);
+
+    joinNode.setLeftChild(leftScan);
+    joinNode.setRightChild(rightScan);
+    currentBlock.setPlan(joinNode);
+
+    masterPlan.addConnect(leftChannel);
+    masterPlan.addConnect(rightChannel);
+
+    return new ExecutionBlock[] { currentBlock, childBlock };
+
+  }
+
+  private ExecutionBlock [] buildGroupBy(MasterPlan masterPlan, LogicalNode lastDistNode, LogicalNode currentNode,
+                                         LogicalNode childNode, ExecutionBlock childBlock) throws PlanningException {
+    ExecutionBlock currentBlock = null;
+    GroupbyNode groupByNode = (GroupbyNode) lastDistNode;
+
+    if (groupByNode.isDistinct()) {
+      if (childBlock == null) { // first repartition node
+        childBlock = masterPlan.newExecutionBlock();
+      }
+      childBlock.setPlan(groupByNode.getChild());
+      currentBlock = masterPlan.newExecutionBlock();
+
+      LinkedHashSet<Column> columnsForDistinct = new LinkedHashSet<Column>();
+
+      for (Target target : groupByNode.getTargets()) {
+        List<AggregationFunctionCallEval> functions = EvalTreeUtil.findDistinctAggFunction(target.getEvalTree());
+        for (AggregationFunctionCallEval function : functions) {
+          if (function.isDistinct()) {
+            columnsForDistinct.addAll(EvalTreeUtil.findDistinctRefColumns(function));
+          }
+        }
+      }
+
+      Set<Column> existingColumns = Sets.newHashSet(groupByNode.getGroupingColumns());
+      columnsForDistinct.removeAll(existingColumns); // remove existing grouping columns
+      SortSpec [] sortSpecs = PlannerUtil.columnsToSortSpec(columnsForDistinct);
+      currentBlock.getEnforcer().enforceSortAggregation(groupByNode.getPID(), sortSpecs);
+
+      DataChannel channel;
+      channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
+      channel.setPartitionKey(groupByNode.getGroupingColumns());
+      channel.setSchema(groupByNode.getInSchema());
+
+      GroupbyNode secondGroupBy = groupByNode;
+      ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
+      secondGroupBy.setChild(scanNode);
+
+      LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
+      if (parent instanceof UnaryNode && parent != secondGroupBy) {
+        ((UnaryNode)parent).setChild(secondGroupBy);
+      }
+
+      masterPlan.addConnect(channel);
+      currentBlock.setPlan(currentNode);
+      
+    } else {
+
+      GroupbyNode firstGroupBy = PlannerUtil.transformGroupbyTo2P(groupByNode);
+      firstGroupBy.setHavingCondition(null);
+
+      if (firstGroupBy.getChild().getType() == NodeType.TABLE_SUBQUERY &&
+          ((TableSubQueryNode)firstGroupBy.getChild()).getSubQuery().getType() == NodeType.UNION) {
+
+        UnionNode unionNode = PlannerUtil.findTopNode(groupByNode, NodeType.UNION);
+        ConsecutiveUnionFinder finder = new ConsecutiveUnionFinder();
+        UnionsFinderContext finderContext = new UnionsFinderContext();
+        finder.visitChild(finderContext, masterPlan.getLogicalPlan(), unionNode, new Stack<LogicalNode>());
+
+        currentBlock = masterPlan.newExecutionBlock();
+        GroupbyNode secondGroupBy = groupByNode;
+        for (UnionNode union : finderContext.unionList) {
+          TableSubQueryNode leftSubQuery = union.getLeftChild();
+          TableSubQueryNode rightSubQuery = union.getRightChild();
+          DataChannel dataChannel;
+          if (leftSubQuery.getSubQuery().getType() != NodeType.UNION) {
+            ExecutionBlock execBlock = masterPlan.newExecutionBlock();
+            GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
+            g1.setChild(leftSubQuery);
+            execBlock.setPlan(g1);
+            dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
+
+            ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
+            secondGroupBy.setChild(scanNode);
+            masterPlan.addConnect(dataChannel);
+          }
+          if (rightSubQuery.getSubQuery().getType() != NodeType.UNION) {
+            ExecutionBlock execBlock = masterPlan.newExecutionBlock();
+            GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
+            g1.setChild(rightSubQuery);
+            execBlock.setPlan(g1);
+            dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
+
+            ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
+            secondGroupBy.setChild(scanNode);
+            masterPlan.addConnect(dataChannel);
+          }
+        }
+        LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
+        if (parent instanceof UnaryNode && parent != secondGroupBy) {
+          ((UnaryNode)parent).setChild(secondGroupBy);
+        }
+        currentBlock.setPlan(currentNode);
+      } else {
+
+        if (childBlock == null) { // first repartition node
+          childBlock = masterPlan.newExecutionBlock();
+        }
+        childBlock.setPlan(firstGroupBy);
+
+        currentBlock = masterPlan.newExecutionBlock();
+
+        DataChannel channel;
+        if (firstGroupBy.isEmptyGrouping()) {
+          channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 1);
+          channel.setPartitionKey(firstGroupBy.getGroupingColumns());
+        } else {
+          channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
+          channel.setPartitionKey(firstGroupBy.getGroupingColumns());
+        }
+        channel.setSchema(firstGroupBy.getOutSchema());
+
+        GroupbyNode secondGroupBy = groupByNode;
+        ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
+        secondGroupBy.setChild(scanNode);
+
+        LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
+        if (parent instanceof UnaryNode && parent != secondGroupBy) {
+          ((UnaryNode)parent).setChild(secondGroupBy);
+        }
+
+        masterPlan.addConnect(channel);
+        currentBlock.setPlan(currentNode);
+      }
+    }
+
+    return new ExecutionBlock [] {currentBlock, childBlock};
+  }
+
+  private ExecutionBlock [] buildSortPlan(MasterPlan masterPlan, LogicalNode lastDistNode, LogicalNode currentNode,
+                                          LogicalNode childNode, ExecutionBlock childBlock) {
+    ExecutionBlock currentBlock = null;
+
+    SortNode firstSort = (SortNode) lastDistNode;
+    if (childBlock == null) {
+      childBlock = masterPlan.newExecutionBlock();
+    }
+    childBlock.setPlan(firstSort);
+
+    currentBlock = masterPlan.newExecutionBlock();
+    DataChannel channel = new DataChannel(childBlock, currentBlock, RANGE_PARTITION, 32);
+    channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(firstSort.getSortKeys()).toArray());
+    channel.setSchema(childNode.getOutSchema());
+
+    SortNode secondSort = PlannerUtil.clone(lastDistNode);
+    ScanNode secondScan = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
+    secondSort.setChild(secondScan);
+
+    LimitNode limitAndSort;
+    LimitNode limitOrNull = PlannerUtil.findTopNode(currentNode, NodeType.LIMIT);
+    if (limitOrNull != null) {
+      limitAndSort = PlannerUtil.clone(limitOrNull);
+      limitAndSort.setChild(firstSort);
+
+      if (childBlock.getPlan().getType() == NodeType.SORT) {
+        childBlock.setPlan(limitAndSort);
+      } else {
+        LogicalNode sortParent = PlannerUtil.findTopParentNode(childBlock.getPlan(), NodeType.SORT);
+        if (sortParent != null) {
+          if (sortParent instanceof UnaryNode) {
+            ((UnaryNode)sortParent).setChild(limitAndSort);
+          }
+        }
+      }
+    }
+
+    LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
+    if (parent instanceof UnaryNode && parent != secondSort) {
+      ((UnaryNode)parent).setChild(secondSort);
+    }
+
+    masterPlan.addConnect(channel);
+    currentBlock.setPlan(currentNode);
+
+    return new ExecutionBlock[] { currentBlock, childBlock };
+  }
+
+  public class DistributedPlannerVisitor extends BasicLogicalPlanVisitor<GlobalPlanContext, LogicalNode> {
+
+    @Override
+    public LogicalNode visitRoot(GlobalPlanContext context, LogicalPlan plan, LogicalRootNode node,
+                                 Stack<LogicalNode> stack) throws PlanningException {
+      super.visitRoot(context, plan, node, stack);
+
+      if (context.lastRepartionableNode != null && context.lastRepartionableNode.getType() != NodeType.UNION) {
+        context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node, context.topmost, context.topMostLeftExecBlock);
+      } else if (context.lastRepartionableNode != null && context.lastRepartionableNode.getType() == NodeType.UNION) {
+
+      } else {
+        ExecutionBlock execBlock = context.plan.newExecutionBlock();
+        execBlock.setPlan(node);
+        context.topMostLeftExecBlock = execBlock;
+      }
+
+      context.topmost = node;
+      return node;
+    }
+
+    @Override
+    public LogicalNode visitProjection(GlobalPlanContext context, LogicalPlan plan, ProjectionNode node,
+                                       Stack<LogicalNode> stack) throws PlanningException {
+      super.visitProjection(context, plan, node, stack);
+      context.topmost = node;
+      return node;
+    }
+
+    @Override
+    public LogicalNode visitLimit(GlobalPlanContext context, LogicalPlan plan, LimitNode node, Stack<LogicalNode> stack)
+        throws PlanningException {
+      super.visitLimit(context, plan, node, stack);
+      context.topmost = node;
+      return node;
+    }
+
+    @Override
+    public LogicalNode visitSort(GlobalPlanContext context, LogicalPlan plan, SortNode node, Stack<LogicalNode> stack)
+        throws PlanningException {
+
+      super.visitSort(context, plan, node, stack);
+
+      if (context.lastRepartionableNode != null) {
+        context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node, context.topmost,
+            context.topMostLeftExecBlock);
+      }
+
+      context.topmost = node;
+      context.lastRepartionableNode = node;
+
+      return node;
+    }
+
+    @Override
+    public LogicalNode visitGroupBy(GlobalPlanContext context, LogicalPlan plan, GroupbyNode node,
+                                    Stack<LogicalNode> stack) throws PlanningException {
+      super.visitGroupBy(context, plan, node, stack);
+
+      if (context.lastRepartionableNode != null) {
+        context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node,
+            context.topmost, context.topMostLeftExecBlock);
+      }
+
+      context.topmost = node;
+      context.lastRepartionableNode = node;
+      return node;
+    }
+
+    @Override
+    public LogicalNode visitFilter(GlobalPlanContext context, LogicalPlan plan, SelectionNode node,
+                                   Stack<LogicalNode> stack) throws PlanningException {
+      super.visitFilter(context, plan, node, stack);
+      context.topmost = node;
+      return node;
+    }
+
+    @Override
+    public LogicalNode visitJoin(GlobalPlanContext context, LogicalPlan plan, JoinNode node, Stack<LogicalNode> stack)
+        throws PlanningException {
+      super.visitJoin(context, plan, node, stack);
+
+      if (context.lastRepartionableNode != null) {
+        context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node, context.topmost,
+            context.topMostLeftExecBlock);
+      }
+
+      context.topmost = node;
+      context.lastRepartionableNode = node;
+
+      return node;
+    }
+
+    @Override
+    public LogicalNode visitUnion(GlobalPlanContext context, LogicalPlan plan, UnionNode node,
+                                  Stack<LogicalNode> stack) throws PlanningException {
+      super.visitUnion(context, plan, node, stack);
+
+      if (context.lastRepartionableNode != null && context.lastRepartionableNode.getType() != NodeType.UNION) {
+        context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node,
+            context.topmost, context.topMostLeftExecBlock);
+      }
+
+      context.topmost = node;
+      context.lastRepartionableNode = node;
+      return node;
+    }
+
+    @Override
+    public LogicalNode visitExcept(GlobalPlanContext context, LogicalPlan plan, ExceptNode node,
+                                   Stack<LogicalNode> stack) throws PlanningException {
+      super.visitExcept(context, plan, node, stack);
+      context.topmost = node;
+      return node;
+    }
+
+    @Override
+    public LogicalNode visitIntersect(GlobalPlanContext context, LogicalPlan plan, IntersectNode node,
+                                      Stack<LogicalNode> stack) throws PlanningException {
+      super.visitIntersect(context, plan, node, stack);
+      context.topmost = node;
+      return node;
+    }
+
+    @Override
+    public LogicalNode visitTableSubQuery(GlobalPlanContext context, LogicalPlan plan, TableSubQueryNode node,
+                                          Stack<LogicalNode> stack) throws PlanningException {
+      super.visitTableSubQuery(context, plan, node, stack);
+      context.topmost = node;
+      return node;
+    }
+
+    @Override
+    public LogicalNode visitScan(GlobalPlanContext context, LogicalPlan plan, ScanNode node, Stack<LogicalNode> stack)
+        throws PlanningException {
+      context.topmost = node;
+      return node;
+    }
+
+    @Override
+    public LogicalNode visitStoreTable(GlobalPlanContext context, LogicalPlan plan, StoreTableNode node,
+                                       Stack<LogicalNode> stack) throws PlanningException {
+      super.visitStoreTable(context, plan, node, stack);
+      context.topmost = node;
+      return node;
+    }
+
+    @Override
+    public LogicalNode visitInsert(GlobalPlanContext context, LogicalPlan plan, InsertNode node,
+                                   Stack<LogicalNode> stack)
+        throws PlanningException {
+      super.visitInsert(context, plan, node, stack);
+      context.topmost = node;
+      return node;
+    }
+  }
+
+  private class UnionsFinderContext {
+    List<UnionNode> unionList = new ArrayList<UnionNode>();
+  }
+
+  @SuppressWarnings("unused")
+  private class ConsecutiveUnionFinder extends BasicLogicalPlanVisitor<UnionsFinderContext, LogicalNode> {
+    @Override
+    public LogicalNode visitUnion(UnionsFinderContext context, LogicalPlan plan, UnionNode node,
+                                  Stack<LogicalNode> stack)
+        throws PlanningException {
+      if (node.getType() == NodeType.UNION) {
+        context.unionList.add(node);
+      }
+
+      stack.push(node);
+      TableSubQueryNode leftSubQuery = node.getLeftChild();
+      TableSubQueryNode rightSubQuery = node.getRightChild();
+      if (leftSubQuery.getSubQuery().getType() == NodeType.UNION) {
+        visitChild(context, plan, leftSubQuery, stack);
+      }
+      if (rightSubQuery.getSubQuery().getType() == NodeType.UNION) {
+        visitChild(context, plan, rightSubQuery, stack);
+      }
+      stack.pop();
+
+      return node;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index 35b62c9..b2804cf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -21,14 +21,11 @@
  */
 package org.apache.tajo.engine.planner.global;
 
-import org.apache.tajo.DataChannel;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.engine.planner.LogicalPlan;
 import org.apache.tajo.engine.planner.graph.SimpleDirectedGraph;
-import org.apache.tajo.master.ExecutionBlock;
-import org.apache.tajo.master.ExecutionBlockCursor;
-import org.apache.tajo.master.QueryContext;
+import org.apache.tajo.engine.query.QueryContext;
 
 import java.util.ArrayList;
 import java.util.HashMap;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
index fb4a3b9..e0f5f4c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.engine.planner.physical;
 
 import com.google.common.collect.Sets;
-import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.datum.DatumFactory;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
index 21f8749..b2737b7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
@@ -18,7 +18,7 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.logical.JoinNode;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index fe7bd44..4430609 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.engine.eval.EvalContext;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
index e98d505..f1e3a00 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
@@ -18,7 +18,7 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Schema;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
index ae87330..c1f84a3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
@@ -18,7 +18,7 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.planner.Target;
 import org.apache.tajo.engine.planner.logical.EvalExprNode;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 4742136..19fde88 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -20,7 +20,7 @@ package org.apache.tajo.engine.planner.physical;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
index 019db82..681e340 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
@@ -18,7 +18,7 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.planner.logical.GroupbyNode;
 import org.apache.tajo.storage.Tuple;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
index ac591d3..4ab2b84 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
@@ -18,7 +18,7 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index 355b357..8a42cff 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -18,7 +18,7 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
index 665a3e3..6b2d7b8 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
@@ -18,7 +18,7 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.engine.planner.logical.JoinNode;
 import org.apache.tajo.storage.Tuple;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index cc12690..f291750 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -18,7 +18,7 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
index 8c8a6b1..c5e2d24 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
@@ -18,7 +18,7 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.engine.planner.logical.JoinNode;
 import org.apache.tajo.storage.Tuple;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
index f68ff59..c9aced4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
@@ -23,7 +23,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LimitExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LimitExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LimitExec.java
index eb3f3c8..d736c25 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LimitExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LimitExec.java
@@ -18,7 +18,7 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.engine.planner.logical.LimitNode;
 import org.apache.tajo.storage.Tuple;


Mime
View raw message