tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [33/39] tajo git commit: Basic pushdown to PostgreSQL has been completed.
Date Wed, 02 Sep 2015 11:06:56 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-plan/src/main/java/org/apache/tajo/plan/expr/SimpleEvalNodeVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/SimpleEvalNodeVisitor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/SimpleEvalNodeVisitor.java
index 9515fe8..6c49e32 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/SimpleEvalNodeVisitor.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/SimpleEvalNodeVisitor.java
@@ -20,7 +20,6 @@ package org.apache.tajo.plan.expr;
 
 import com.google.common.base.Preconditions;
 import org.apache.tajo.exception.TajoInternalError;
-import org.apache.tajo.exception.UnsupportedException;
 
 import java.util.Stack;
 
@@ -36,7 +35,7 @@ public abstract class SimpleEvalNodeVisitor<CONTEXT> {
     EvalNode result;
 
     if (evalNode instanceof UnaryEval) {
-      result = visitUnaryEval(context, stack, (UnaryEval) evalNode);
+      result = visitUnaryEval(context, (UnaryEval) evalNode, stack);
     } else if (evalNode instanceof BinaryEval) {
       result = visitBinaryEval(context, stack, (BinaryEval) evalNode);
     } else {
@@ -50,7 +49,7 @@ public abstract class SimpleEvalNodeVisitor<CONTEXT> {
         result = visitRowConstant(context, (RowConstantEval) evalNode, stack);
         break;
       case FIELD:
-        result = visitField(context, stack, (FieldEval) evalNode);
+        result = visitField(context, (FieldEval) evalNode, stack);
         break;
 
 
@@ -88,7 +87,7 @@ public abstract class SimpleEvalNodeVisitor<CONTEXT> {
     return result;
   }
 
-  protected EvalNode visitUnaryEval(CONTEXT context, Stack<EvalNode> stack, UnaryEval unaryEval) {
+  protected EvalNode visitUnaryEval(CONTEXT context, UnaryEval unaryEval, Stack<EvalNode> stack) {
     stack.push(unaryEval);
     visit(context, unaryEval.getChild(), stack);
     stack.pop();
@@ -126,7 +125,7 @@ public abstract class SimpleEvalNodeVisitor<CONTEXT> {
     return evalNode;
   }
 
-  protected EvalNode visitField(CONTEXT context, Stack<EvalNode> stack, FieldEval evalNode) {
+  protected EvalNode visitField(CONTEXT context, FieldEval evalNode, Stack<EvalNode> stack) {
     return evalNode;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java
index eb546d1..b3fc019 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java
@@ -64,7 +64,7 @@ public class ConstantFolding extends SimpleEvalNodeVisitor<LogicalPlanner.PlanCo
   }
 
   @Override
-  public EvalNode visitUnaryEval(LogicalPlanner.PlanContext context, Stack<EvalNode> stack, UnaryEval unaryEval) {
+  public EvalNode visitUnaryEval(LogicalPlanner.PlanContext context, UnaryEval unaryEval, Stack<EvalNode> stack) {
     stack.push(unaryEval);
     EvalNode child = visit(context, unaryEval.getChild(), stack);
     stack.pop();

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantPropagation.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantPropagation.java b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantPropagation.java
index 6fe3b3e..24488bd 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantPropagation.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantPropagation.java
@@ -72,7 +72,7 @@ public class ConstantPropagation extends SimpleEvalNodeVisitor<LogicalPlanner.Pl
   }
 
   @Override
-  public EvalNode visitUnaryEval(LogicalPlanner.PlanContext context, Stack<EvalNode> stack, UnaryEval unaryEval) {
+  public EvalNode visitUnaryEval(LogicalPlanner.PlanContext context, UnaryEval unaryEval, Stack<EvalNode> stack) {
     stack.push(unaryEval);
 
     if (unaryEval.getChild().getType() == EvalType.FIELD) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java
index 8e369d6..f5e6b78 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.plan.logical;
 
 import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
@@ -37,6 +38,7 @@ public class ScanNode extends RelationNode implements Projectable, SelectableNod
 	@Expose protected EvalNode qual;
 	@Expose protected Target[] targets;
   @Expose protected boolean broadcastTable;
+  @Expose protected long limit = -1; // -1 is infinite
 
   protected ScanNode(int pid, NodeType nodeType) {
     super(pid, nodeType);
@@ -150,6 +152,29 @@ public class ScanNode extends RelationNode implements Projectable, SelectableNod
 	  return this.targets;
 	}
 
+  /**
+   *
+   *
+   * @return
+   */
+  public boolean hasLimit() {
+    return limit > 0;
+  }
+
+  /**
+   * How many rows will be retrieved?
+   *
+   * @return The number of rows to be retrieved
+   */
+  public long getLimit() {
+    return limit;
+  }
+
+  public void setLimit(long num) {
+    Preconditions.checkArgument(num > 0, "The number of fetch rows in limit is negative");
+    this.limit = num;
+  }
+
   public TableDesc getTableDesc() {
     return tableDesc;
   }
@@ -249,9 +274,4 @@ public class ScanNode extends RelationNode implements Projectable, SelectableNod
 
     return planStr;
   }
-
-  public static boolean isScanNode(LogicalNode node) {
-    return node.getType() == NodeType.SCAN ||
-        node.getType() == NodeType.PARTITIONS_SCAN;
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/CommonConditionReduceRule.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/CommonConditionReduceRule.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/CommonConditionReduceRule.java
index f66350a..5524256 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/CommonConditionReduceRule.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/CommonConditionReduceRule.java
@@ -111,7 +111,7 @@ public class CommonConditionReduceRule implements LogicalPlanRewriteRule {
     }
 
     @Override
-    protected EvalNode visitUnaryEval(Object context, Stack<EvalNode> stack, UnaryEval unaryEval) {
+    protected EvalNode visitUnaryEval(Object context, UnaryEval unaryEval, Stack<EvalNode> stack) {
       stack.push(unaryEval);
       EvalNode child = unaryEval.getChild();
       visit(context, child, stack);

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java
index 7de0b05..fef1528 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java
@@ -102,9 +102,9 @@ public class EvalNodeSerializer
   }
 
   @Override
-  public EvalNode visitUnaryEval(EvalTreeProtoBuilderContext context, Stack<EvalNode> stack, UnaryEval unary) {
+  public EvalNode visitUnaryEval(EvalTreeProtoBuilderContext context, UnaryEval unary, Stack<EvalNode> stack) {
     // visiting and registering childs
-    super.visitUnaryEval(context, stack, unary);
+    super.visitUnaryEval(context, unary, stack);
     int [] childIds = registerGetChildIds(context, unary);
 
     // building itself
@@ -183,7 +183,7 @@ public class EvalNodeSerializer
   }
 
   @Override
-  public EvalNode visitField(EvalTreeProtoBuilderContext context, Stack<EvalNode> stack, FieldEval field) {
+  public EvalNode visitField(EvalTreeProtoBuilderContext context, FieldEval field, Stack<EvalNode> stack) {
     PlanProto.EvalNode.Builder builder = createEvalBuilder(context, field);
     builder.setField(field.getColumnRef().getProto());
     context.treeBuilder.addNodes(builder);

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
index a9dca4c..6dd5a7f 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
@@ -560,20 +560,6 @@ public class PlannerUtil {
     }
   }
 
-  /**
-   * fill targets with FieldEvals from a given schema
-   *
-   * @param schema  to be transformed to targets
-   * @param targets to be filled
-   */
-  public static void schemaToTargets(Schema schema, Target[] targets) {
-    FieldEval eval;
-    for (int i = 0; i < schema.size(); i++) {
-      eval = new FieldEval(schema.getColumn(i));
-      targets[i] = new Target(eval);
-    }
-  }
-
   public static Target[] schemaToTargets(Schema schema) {
     Target[] targets = new Target[schema.size()];
 
@@ -585,17 +571,6 @@ public class PlannerUtil {
     return targets;
   }
 
-  public static Target[] schemaToTargetsWithGeneratedFields(Schema schema) {
-    List<Target> targets = TUtil.newList();
-
-    FieldEval eval;
-    for (int i = 0; i < schema.size(); i++) {
-      eval = new FieldEval(schema.getColumn(i));
-      targets.add(new Target(eval));
-    }
-    return targets.toArray(new Target[targets.size()]);
-  }
-
   public static SortSpec[] schemaToSortSpecs(Schema schema) {
     return columnsToSortSpecs(schema.toArray());
   }
@@ -992,4 +967,14 @@ public class PlannerUtil {
     }
     return inSubqueries;
   }
+
+  public static int [] getTargetIds(Schema inputSchema, Column...targets) {
+    int [] targetIds = new int[targets.length];
+    for (int i = 0; i < targetIds.length; i++) {
+      targetIds[i] = inputSchema.getColumnId(targets[i].getQualifiedName());
+    }
+    Arrays.sort(targetIds);
+
+    return targetIds;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-plan/src/main/proto/Plan.proto
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/proto/Plan.proto b/tajo-plan/src/main/proto/Plan.proto
index da1e187..274e674 100644
--- a/tajo-plan/src/main/proto/Plan.proto
+++ b/tajo-plan/src/main/proto/Plan.proto
@@ -577,11 +577,12 @@ message EnforceProperty {
     SORTED_INPUT = 0;
     OUTPUT_DISTINCT = 1;
     GROUP_BY = 2;
-    JOIN = 3;
-    SORT = 4;
-    BROADCAST = 5;
-    COLUMN_PARTITION = 6;
+    JOIN    = 3;
+    SORT              = 4;
+    BROADCAST         = 5;
+    COLUMN_PARTITION  = 6;
     DISTINCT_GROUP_BY = 7;
+    STORAGE_PUSHDOWN  = 8;
   }
 
   // Identifies which field is filled in.
@@ -675,4 +676,8 @@ message DistinctGroupbyEnforcer {
   repeated SortSpecArray sortSpecArrays = 3;
   required bool isMultipleAggregation = 4 [default = false];
   optional MultipleAggregationStage multipleAggregationStage = 5;
+}
+
+message StoragePushdownEnforcer {
+  required int32 marker = 1;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java
index c3dc959..cc8ad83 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java
@@ -25,6 +25,7 @@ import org.apache.tajo.exception.NotImplementedException;
 import org.apache.tajo.exception.TajoRuntimeException;
 import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.LogicalNode;
 
 import java.io.IOException;
 
@@ -49,6 +50,11 @@ public abstract class AbstractScanner implements Scanner {
   }
 
   @Override
+  public void pushOperators(LogicalNode planPart) {
+    throw new TajoRuntimeException(new UnsupportedException());
+  }
+
+  @Override
   public boolean isProjectable() {
     throw new TajoRuntimeException(new NotImplementedException());
   }
@@ -69,6 +75,10 @@ public abstract class AbstractScanner implements Scanner {
   }
 
   @Override
+  public void setLimit(long num) {
+  }
+
+  @Override
   public boolean isSplittable() {
     throw new TajoRuntimeException(new NotImplementedException());
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
index db64992..6b633bd 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -28,6 +28,7 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.exception.TajoRuntimeException;
 import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.LogicalNode;
 import org.apache.tajo.storage.fragment.Fragment;
 
 import java.io.IOException;
@@ -153,6 +154,11 @@ public class MergeScanner implements Scanner {
   }
 
   @Override
+  public void pushOperators(LogicalNode planPart) {
+    throw new TajoRuntimeException(new UnsupportedException());
+  }
+
+  @Override
   public boolean isProjectable() {
     return projectable;
   }
@@ -173,6 +179,11 @@ public class MergeScanner implements Scanner {
   }
 
   @Override
+  public void setLimit(long num) {
+    throw new TajoRuntimeException(new UnsupportedException());
+  }
+
+  @Override
   public Schema getSchema() {
     return schema;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java
index 9c025f7..a20adf7 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java
@@ -26,6 +26,7 @@ import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.exception.TajoRuntimeException;
 import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.LogicalNode;
 import org.apache.tajo.storage.fragment.Fragment;
 
 import java.io.IOException;
@@ -73,6 +74,11 @@ public class NullScanner implements Scanner {
   }
 
   @Override
+  public void pushOperators(LogicalNode planPart) {
+    throw new TajoRuntimeException(new UnsupportedException());
+  }
+
+  @Override
   public boolean isProjectable() {
     return false;
   }
@@ -93,6 +99,11 @@ public class NullScanner implements Scanner {
   }
 
   @Override
+  public void setLimit(long num) {
+    throw new TajoRuntimeException(new UnsupportedException());
+  }
+
+  @Override
   public boolean isSplittable() {
     return true;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/PlanPushable.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/PlanPushable.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/PlanPushable.java
new file mode 100644
index 0000000..7500e9a
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/PlanPushable.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.SchemaObject;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.LogicalNode;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Scanner Interface
+ */
+
+public interface PlanPushable {
+  void pushdown(LogicalNode pushed);
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java
index 2fcb2fd..f421515 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java
@@ -22,6 +22,7 @@ import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.SchemaObject;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.LogicalNode;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -58,6 +59,11 @@ public interface Scanner extends SchemaObject, Closeable {
    */
   void close() throws IOException;
 
+  /**
+   *
+   * @param planPart
+   */
+  void pushOperators(LogicalNode planPart);
 
   /**
    * It returns if the projection is executed in the underlying scanner layer.
@@ -96,6 +102,15 @@ public interface Scanner extends SchemaObject, Closeable {
    */
   void setFilter(EvalNode filter);
 
+
+  /**
+   * This method does not guarantee that the scanner will retrieve the specified number of rows.
+   * This information is used for a hint.
+   *
+   * @param num The number of rows to be retrieved.
+   */
+  void setLimit(long num);
+
   /**
    * It returns if the file is splittable.
    *

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
index 83ae935..92d8b72 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
@@ -97,7 +97,7 @@ public abstract class Tablespace {
     return name + "=" + uri.toString();
   }
 
-  public abstract long getTableVolume(URI uri) throws IOException;
+  public abstract long getTableVolume(URI uri) throws UnsupportedException;
 
   /**
    * if {@link StorageProperty#isArbitraryPathAllowed} is true,
@@ -150,7 +150,7 @@ public abstract class Tablespace {
    * It is called by a Repartitioner for range shuffling when the SortRangeType of SortNode is USING_STORAGE_MANAGER.
    * In general Repartitioner determines the partition range using previous output statistics data.
    * In the special cases, such as HBase Repartitioner uses the result of this method.
-   *
+   *ㅂ                                               ㅂ
    * @param queryContext The current query context which contains query properties.
    * @param tableDesc The table description for the target data.
    * @param inputSchema The input schema
@@ -187,38 +187,18 @@ public abstract class Tablespace {
    * @param meta The table meta
    * @param schema The input schema
    * @param fragment The fragment for scanning
-   * @param target Columns which are selected.
-   * @return Scanner instance
-   * @throws java.io.IOException
-   */
-  public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException {
-    return getScanner(meta, schema, FragmentConvertor.convert(conf, fragment), target);
-  }
-
-  /**
-   * Returns Scanner instance.
-   *
-   * @param meta The table meta
-   * @param schema The input schema
-   * @param fragment The fragment for scanning
-   * @return Scanner instance
-   * @throws java.io.IOException
-   */
-  public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException {
-    return getScanner(meta, schema, fragment, schema);
-  }
-
-  /**
-   * Returns Scanner instance.
-   *
-   * @param meta The table meta
-   * @param schema The input schema
-   * @param fragment The fragment for scanning
    * @param target The output schema
    * @return Scanner instance
    * @throws java.io.IOException
    */
-  public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
+  public Scanner getScanner(TableMeta meta,
+                            Schema schema,
+                            Fragment fragment,
+                            @Nullable Schema target) throws IOException {
+    if (target == null) {
+      target = schema;
+    }
+
     if (fragment.isEmpty()) {
       Scanner scanner = new NullScanner(conf, schema, meta, fragment);
       scanner.setTarget(target.toArray());
@@ -255,7 +235,7 @@ public abstract class Tablespace {
    */
   public synchronized SeekableScanner getSeekableScanner(TableMeta meta, Schema schema, FragmentProto fragment,
                                                          Schema target) throws IOException {
-    return (SeekableScanner)this.getScanner(meta, schema, fragment, target);
+    return (SeekableScanner)this.getScanner(meta, schema, FragmentConvertor.convert(conf, fragment), target);
   }
 
   /**
@@ -405,4 +385,9 @@ public abstract class Tablespace {
   public MetadataProvider getMetadataProvider() {
     throw new TajoRuntimeException(new UnsupportedException("Linked Metadata Provider for " + name));
   }
+
+  @SuppressWarnings("unused")
+  public int markAccetablePlanPart(LogicalPlan plan) {
+    throw new TajoRuntimeException(new UnsupportedException());
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
index 17f03a0..e57dc3a 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -33,8 +33,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.catalog.MetadataProvider;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.JavaResourceUtil;
 import org.apache.tajo.util.Pair;
 import org.apache.tajo.util.UriUtil;
 
@@ -75,13 +76,14 @@ public class TablespaceManager implements StorageService {
   protected static final Map<Class<?>, Constructor<?>> CONSTRUCTORS = Maps.newHashMap();
   protected static final Map<String, Class<? extends Tablespace>> TABLE_SPACE_HANDLERS = Maps.newHashMap();
 
-  public static final Class [] TABLESPACE_PARAM = new Class [] {String.class, URI.class, JSONObject.class};
+  public static final Class[] TABLESPACE_PARAM = new Class[]{String.class, URI.class, JSONObject.class};
 
   public static final String TABLESPACE_SPEC_CONFIGS_KEY = "configs";
 
   static {
     instance = new TablespaceManager();
   }
+
   /**
    * Singleton instance
    */
@@ -130,7 +132,7 @@ public class TablespaceManager implements StorageService {
   private JSONObject loadFromConfig(String fileName) {
     String json;
     try {
-      json = FileUtil.readTextFileFromResource(fileName);
+      json = JavaResourceUtil.readTextFromResource(fileName);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -182,7 +184,7 @@ public class TablespaceManager implements StorageService {
     String handlerClass = (String) storageDesc.get(KEY_STORAGE_HANDLER);
 
     return new Pair<String, Class<? extends Tablespace>>(
-        storageType,(Class<? extends Tablespace>) Class.forName(handlerClass));
+        storageType, (Class<? extends Tablespace>) Class.forName(handlerClass));
   }
 
   private void loadTableSpaces(JSONObject json, boolean override) {
@@ -339,7 +341,7 @@ public class TablespaceManager implements StorageService {
 
     // Find the longest matched one. For example, assume that the caller tries to find /x/y/z, and
     // there are /x and /x/y. In this case, /x/y will be chosen because it is more specific.
-    for (Map.Entry<URI, Tablespace> entry: TABLE_SPACES.headMap(URI.create(uri), true).entrySet()) {
+    for (Map.Entry<URI, Tablespace> entry : TABLE_SPACES.headMap(URI.create(uri), true).entrySet()) {
       if (uri.startsWith(entry.getKey().toString())) {
         lastOne = entry.getValue();
       }
@@ -401,6 +403,11 @@ public class TablespaceManager implements StorageService {
     return space.getTableUri(databaseName, tableName);
   }
 
+  @Override
+  public long getTableVolumn(URI tableUri) throws UnsupportedException {
+    return get(tableUri).get().getTableVolume(tableUri);
+  }
+
   public static Iterable<Tablespace> getAllTablespaces() {
     return TABLE_SPACES.values();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
index 07720c7..bd46551 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Maps;
 import com.google.protobuf.ByteString;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tajo.annotation.ThreadSafe;
+import org.apache.tajo.exception.TajoInternalError;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
@@ -72,8 +73,8 @@ public class FragmentConvertor {
         CONSTRUCTOR_CACHE.put(clazz, constructor);
       }
       result = constructor.newInstance(new Object[]{fragment.getContents()});
-    } catch (Exception e) {
-      throw new RuntimeException(e);
+    } catch (Throwable e) {
+      throw new TajoInternalError(e);
     }
 
     return result;

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
index dfdff85..35461fa 100644
--- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
@@ -39,7 +39,7 @@
   <!--- Registered Scanner Handler -->
   <property>
     <name>tajo.storage.scanner-handler</name>
-    <value>text,json,raw,rcfile,row,parquet,orc,sequencefile,avro,hbase</value>
+    <value>text,json,raw,rcfile,row,parquet,orc,sequencefile,avro,hbase,jdbc</value>
   </property>
 
   <!--- Fragment Class Configurations -->
@@ -83,6 +83,10 @@
     <name>tajo.storage.fragment.hbase.class</name>
     <value>org.apache.tajo.storage.hbase.HBaseFragment</value>
   </property>
+  <property>
+    <name>tajo.storage.fragment.jdbc.class</name>
+    <value>org.apache.tajo.storage.jdbc.JdbcFragment</value>
+  </property>
 
   <!--- Scanner Handler -->
   <property>

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
index f637da0..c57a67a 100644
--- a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
@@ -82,6 +82,10 @@
     <name>tajo.storage.fragment.hbase.class</name>
     <value>org.apache.tajo.storage.hbase.HBaseFragment</value>
   </property>
+  <property>
+    <name>tajo.storage.fragment.jdbc.class</name>
+    <value>org.apache.tajo.storage.jdbc.JdbcFragment</value>
+  </property>
 
   <!--- Scanner Handler -->
   <property>

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
index 7fc6d2a..90f7aa0 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
@@ -38,6 +38,7 @@ import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.datum.TextDatum;
 import org.apache.tajo.exception.*;
 import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.LogicalNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.util.BytesUtils;
@@ -415,6 +416,11 @@ public class HBaseScanner implements Scanner {
   }
 
   @Override
+  public void pushOperators(LogicalNode planPart) {
+    throw new TajoRuntimeException(new UnsupportedException());
+  }
+
+  @Override
   public boolean isProjectable() {
     return true;
   }
@@ -438,6 +444,10 @@ public class HBaseScanner implements Scanner {
   }
 
   @Override
+  public void setLimit(long num) {
+  }
+
+  @Override
   public boolean isSplittable() {
     return true;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
index 330522b..0064be4 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
@@ -44,10 +44,7 @@ import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.TextDatum;
-import org.apache.tajo.exception.DataTypeMismatchException;
-import org.apache.tajo.exception.InvalidTablePropertyException;
-import org.apache.tajo.exception.MissingTablePropertyException;
-import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.exception.*;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.CreateTableNode;
@@ -102,8 +99,8 @@ public class HBaseTablespace extends Tablespace {
   }
 
   @Override
-  public long getTableVolume(URI uri) throws IOException {
-    return 0;
+  public long getTableVolume(URI uri) throws UnsupportedException {
+    throw new UnsupportedException();
   }
 
   @Override
@@ -738,8 +735,6 @@ public class HBaseTablespace extends Tablespace {
 
   public List<Set<EvalNode>> findIndexablePredicateSet(@Nullable EvalNode qual,
                                                        Column[] indexableColumns) throws IOException {
-    Preconditions.checkNotNull(qual);
-
     List<Set<EvalNode>> indexablePredicateList = new ArrayList<Set<EvalNode>>();
 
     // if a query statement has a search condition, try to find indexable predicates

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java
index 8844fa5..33f1d04 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java
@@ -29,6 +29,9 @@ import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.ColumnStats;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.logical.LogicalNode;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.Fragment;
 
@@ -82,6 +85,11 @@ public abstract class FileScanner implements Scanner {
   }
 
   @Override
+  public void pushOperators(LogicalNode planPart) {
+    throw new TajoRuntimeException(new UnsupportedException());
+  }
+
+  @Override
   public void setTarget(Column[] targets) {
     if (inited) {
       throw new IllegalStateException("Should be called before init()");
@@ -89,6 +97,10 @@ public abstract class FileScanner implements Scanner {
     this.targets = targets;
   }
 
+  @Override
+  public void setLimit(long num) {
+  }
+
   public static FileSystem getFileSystem(TajoConf tajoConf, Path path) throws IOException {
     String tajoUser = tajoConf.getVar(TajoConf.ConfVars.USERNAME);
     FileSystem fs;

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
index bcc0c88..9f29c34 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
@@ -35,6 +35,8 @@ import org.apache.tajo.*;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.plan.logical.LogicalNode;
@@ -126,9 +128,14 @@ public class FileTablespace extends Tablespace {
   }
 
   @Override
-  public long getTableVolume(URI uri) throws IOException {
+  public long getTableVolume(URI uri) throws UnsupportedException {
     Path path = new Path(uri);
-    ContentSummary summary = fs.getContentSummary(path);
+    ContentSummary summary;
+    try {
+      summary = fs.getContentSummary(path);
+    } catch (IOException e) {
+      throw new TajoInternalError(e);
+    }
     return summary.getLength();
   }
 
@@ -146,7 +153,7 @@ public class FileTablespace extends Tablespace {
   public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, FileStatus status)
       throws IOException {
     Fragment fragment = new FileFragment(path.getName(), path, 0, status.getLen());
-    return getScanner(meta, schema, fragment);
+    return getScanner(meta, schema, fragment, null);
   }
 
   public FileSystem getFileSystem() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
index eabab22..76c9362 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
@@ -25,6 +25,7 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.FieldSerializerDeserializer;
 import org.apache.tajo.storage.Tuple;
 
@@ -41,12 +42,7 @@ public class CSVLineDeserializer extends TextLineDeserializer {
 
   public CSVLineDeserializer(Schema schema, TableMeta meta, Column [] projected) {
     super(schema, meta);
-
-    targetColumnIndexes = new int[projected.length];
-    for (int i = 0; i < projected.length; i++) {
-      targetColumnIndexes[i] = schema.getColumnId(projected[i].getQualifiedName());
-    }
-    Arrays.sort(targetColumnIndexes);
+    targetColumnIndexes = PlannerUtil.getTargetIds(schema, projected);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
index 90bec65..5f8a4d1 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
@@ -106,7 +106,7 @@ public class TestDelimitedTextFile {
     TableMeta meta = CatalogUtil.newTableMeta("JSON");
     meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "-1");
     FileFragment fragment =  getFileFragment("testErrorTolerance1.json");
-    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null);
     scanner.init();
 
     Tuple tuple;
@@ -128,7 +128,7 @@ public class TestDelimitedTextFile {
     TableMeta meta = CatalogUtil.newTableMeta("JSON");
     meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1");
     FileFragment fragment =  getFileFragment("testErrorTolerance1.json");
-    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null);
     scanner.init();
 
     assertNotNull(scanner.next());
@@ -150,7 +150,7 @@ public class TestDelimitedTextFile {
     TableMeta meta = CatalogUtil.newTableMeta("JSON");
     meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "0");
     FileFragment fragment =  getFileFragment("testErrorTolerance2.json");
-    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null);
     scanner.init();
 
     try {
@@ -169,7 +169,7 @@ public class TestDelimitedTextFile {
     TableMeta meta = CatalogUtil.newTableMeta("JSON");
     meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1");
     FileFragment fragment = getFileFragment("testErrorTolerance3.json");
-    Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null);
     scanner.init();
 
     try {
@@ -185,7 +185,7 @@ public class TestDelimitedTextFile {
     TableMeta meta = CatalogUtil.newTableMeta("JSON");
     meta.putOption(StorageConstants.TEXT_SKIP_HEADER_LINE, "2");
     FileFragment fragment = getFileFragment("testNormal.json");
-    Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null);
 
     scanner.init();
 
@@ -212,7 +212,7 @@ public class TestDelimitedTextFile {
     meta.putOption(StorageConstants.TEXT_SKIP_HEADER_LINE, "1");
     meta.putOption(StorageConstants.TEXT_DELIMITER, ",");
     FileFragment fragment = getFileFragment("testSkip.txt");
-    Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null);
     
     scanner.init();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
index 21fff58..b800ed2 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
@@ -38,6 +38,7 @@ import org.apache.tajo.storage.text.DelimitedLineReader;
 import org.apache.tajo.storage.text.DelimitedTextFile;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.JavaResourceUtil;
 import org.junit.Test;
 
 import java.io.File;
@@ -213,7 +214,7 @@ public class TestLineReader {
 
   @Test
   public void testByteBufLineReaderWithoutTerminating() throws IOException {
-    String path = FileUtil.getResourcePath("dataset/testLineText.txt").getFile();
+    String path = JavaResourceUtil.getResourceURL("dataset/testLineText.txt").getFile();
     File file = new File(path);
     String data = FileUtil.readTextFile(file);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index afe0f13..9c194b2 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -42,7 +42,7 @@ import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.rcfile.RCFile;
 import org.apache.tajo.storage.sequencefile.SequenceFileScanner;
 import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.JavaResourceUtil;
 import org.apache.tajo.util.KeyValueSet;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -342,7 +342,7 @@ public class TestStorages {
     TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
     meta.setOptions(CatalogUtil.newDefaultProperty(storeType));
     if (storeType.equalsIgnoreCase("AVRO")) {
-      String path = FileUtil.getResourcePath("dataset/testVariousTypes.avsc").toString();
+      String path = JavaResourceUtil.getResourceURL("dataset/testVariousTypes.avsc").toString();
       meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path);
     }
 
@@ -379,7 +379,7 @@ public class TestStorages {
 
     FileStatus status = fs.getFileStatus(tablePath);
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner =  sm.getScanner(meta, schema, fragment);
+    Scanner scanner =  sm.getScanner(meta, schema, fragment, null);
     scanner.init();
 
     Tuple retrieved;
@@ -468,7 +468,7 @@ public class TestStorages {
 
     FileStatus status = fs.getFileStatus(tablePath);
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null);
     scanner.init();
 
     Tuple retrieved;
@@ -543,7 +543,7 @@ public class TestStorages {
     assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
 
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null);
     scanner.init();
 
     Tuple retrieved;
@@ -612,7 +612,7 @@ public class TestStorages {
     assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
 
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null);
     scanner.init();
 
     Tuple retrieved;
@@ -681,7 +681,7 @@ public class TestStorages {
     assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
 
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null);
     scanner.init();
 
     assertTrue(scanner instanceof SequenceFileScanner);
@@ -755,7 +755,7 @@ public class TestStorages {
     assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
 
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null);
     scanner.init();
 
     assertTrue(scanner instanceof SequenceFileScanner);
@@ -800,7 +800,7 @@ public class TestStorages {
 
       FileStatus status = fs.getFileStatus(tablePath);
       FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-      Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
+      Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null);
       scanner.init();
 
       Tuple retrieved;
@@ -936,7 +936,7 @@ public class TestStorages {
 
     FileStatus status = fs.getFileStatus(tablePath);
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner = sm.getScanner(meta, schema, fragment);
+    Scanner scanner = sm.getScanner(meta, schema, fragment, null);
     scanner.init();
 
     Tuple retrieved;
@@ -997,7 +997,7 @@ public class TestStorages {
     inSchema.addColumn("col5", Type.INT8);
 
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, inSchema, fragment);
+    Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, inSchema, fragment, null);
 
     Schema target = new Schema();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
index 341cc07..960448c 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
@@ -25,6 +25,7 @@ import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.JavaResourceUtil;
 import org.apache.tajo.util.NetUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -46,7 +47,7 @@ public class TestAvroUtil {
 
   @Before
   public void setUp() throws Exception {
-    schemaUrl = FileUtil.getResourcePath("dataset/testVariousTypes.avsc");
+    schemaUrl = JavaResourceUtil.getResourceURL("dataset/testVariousTypes.avsc");
     assertNotNull(schemaUrl);
 
     File file = new File(schemaUrl.getPath());

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
index 88d7536..75e59da 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
@@ -69,7 +69,7 @@ public class TestJsonSerDe {
     FileSystem fs = FileSystem.getLocal(conf);
     FileStatus status = fs.getFileStatus(tablePath);
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null);
     scanner.init();
 
     Tuple tuple = scanner.next();
@@ -108,7 +108,7 @@ public class TestJsonSerDe {
     schema.addColumn("col1", TajoDataTypes.Type.TEXT);
     schema.addColumn("col2", TajoDataTypes.Type.TEXT);
     schema.addColumn("col3", TajoDataTypes.Type.TEXT);
-    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null);
     scanner.init();
 
     Tuple tuple = scanner.next();

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/ConnectionInfo.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/ConnectionInfo.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/ConnectionInfo.java
index 9a42b0d..57b0825 100644
--- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/ConnectionInfo.java
+++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/ConnectionInfo.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java
index 2fc42b7..e8a18fc 100644
--- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java
+++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java
@@ -60,20 +60,8 @@ public class JdbcFragment implements Fragment, Comparable<JdbcFragment>, Cloneab
     return inputSourceId;
   }
 
-  @Override
-  public CatalogProtos.FragmentProto getProto() {
-    JdbcFragmentProto.Builder builder = JdbcFragmentProto.newBuilder();
-    builder.setInputSourceId(this.inputSourceId);
-    builder.setUri(this.uri);
-    if(hostNames != null) {
-      builder.addAllHosts(TUtil.newList(hostNames));
-    }
-
-    CatalogProtos.FragmentProto.Builder fragmentBuilder = CatalogProtos.FragmentProto.newBuilder();
-    fragmentBuilder.setId(this.inputSourceId);
-    fragmentBuilder.setStoreType(BuiltinStorages.TEXT);
-    fragmentBuilder.setContents(builder.buildPartial().toByteString());
-    return fragmentBuilder.build();
+  public String getUri() {
+    return uri;
   }
 
   @Override
@@ -97,6 +85,22 @@ public class JdbcFragment implements Fragment, Comparable<JdbcFragment>, Cloneab
   }
 
   @Override
+  public CatalogProtos.FragmentProto getProto() {
+    JdbcFragmentProto.Builder builder = JdbcFragmentProto.newBuilder();
+    builder.setInputSourceId(this.inputSourceId);
+    builder.setUri(this.uri);
+    if(hostNames != null) {
+      builder.addAllHosts(TUtil.newList(hostNames));
+    }
+
+    CatalogProtos.FragmentProto.Builder fragmentBuilder = CatalogProtos.FragmentProto.newBuilder();
+    fragmentBuilder.setId(this.inputSourceId);
+    fragmentBuilder.setStoreType("JDBC");
+    fragmentBuilder.setContents(builder.buildPartial().toByteString());
+    return fragmentBuilder.build();
+  }
+
+  @Override
   public int compareTo(JdbcFragment o) {
     return this.uri.compareTo(o.uri);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java
index 7f63c66..f623f14 100644
--- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java
+++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.exception.*;
 import org.apache.tajo.util.KeyValueSet;
@@ -187,7 +188,7 @@ public abstract class JdbcMetadataProviderBase implements MetadataProvider {
       // get columns
       resultForColumns = connection.getMetaData().getColumns(databaseName, schemaName, tableName, null);
 
-      List<Pair<Integer, Column>> columns = Lists.newArrayList();
+      final List<Pair<Integer, Column>> columns = Lists.newArrayList();
 
       while(resultForColumns.next()) {
         final int ordinalPos = resultForColumns.getInt("ORDINAL_POSITION");
@@ -208,19 +209,27 @@ public abstract class JdbcMetadataProviderBase implements MetadataProvider {
       });
 
       // transform the pair list into collection for columns
-      Schema schema = new Schema(Collections2.transform(columns, new Function<Pair<Integer,Column>, Column>() {
+      final Schema schema = new Schema(Collections2.transform(columns, new Function<Pair<Integer,Column>, Column>() {
         @Override
         public Column apply(@Nullable Pair<Integer, Column> columnPair) {
           return columnPair.getSecond();
         }
       }));
 
-      return new TableDesc(
+
+      // fill the table stats
+      final TableStats stats = new TableStats();
+      stats.setNumRows(-1); // unknown
+
+      final TableDesc table = new TableDesc(
           CatalogUtil.buildFQName(databaseName, name),
           schema,
           new TableMeta("rowstore", new KeyValueSet()),
           space.getTableUri(databaseName, name)
       );
+      table.setStats(stats);
+
+      return table;
 
     } catch (SQLException e) {
       throw new TajoInternalError(e);

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java
new file mode 100644
index 0000000..897f6c5
--- /dev/null
+++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.jdbc;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedDataTypeException;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.datetime.DateTimeUtil;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.*;
+import java.util.Iterator;
+
+public abstract class JdbcScanner implements Scanner {
+  private static final Log LOG = LogFactory.getLog(JdbcScanner.class);
+
+  protected final DatabaseMetaData dbMetaData;
+  protected final String tableName;
+  protected final Schema schema;
+  protected final TableMeta tableMeta;
+  protected final JdbcFragment fragment;
+  protected final TableStats stats;
+  protected final SQLBuilder builder;
+
+  protected Column [] targets;
+  protected EvalNode filter;
+  protected Long limit;
+  protected LogicalNode planPart;
+  protected VTuple outTuple;
+  protected String generatedSql;
+  protected ResultSetIterator iter;
+
+  public JdbcScanner(final DatabaseMetaData dbMetaData,
+                     final Schema tableSchema,
+                     final TableMeta tableMeta,
+                     final JdbcFragment fragment) {
+
+    Preconditions.checkNotNull(dbMetaData);
+    Preconditions.checkNotNull(tableSchema);
+    Preconditions.checkNotNull(tableMeta);
+    Preconditions.checkNotNull(fragment);
+
+    this.dbMetaData = dbMetaData;
+    this.tableName = ConnectionInfo.fromURI(fragment.getUri()).tableName;
+    this.schema = tableSchema;
+    this.tableMeta = tableMeta;
+    this.fragment = fragment;
+    this.stats = new TableStats();
+    builder = getSQLBuilder(getSQLExprBuilder());
+  }
+
+  @Override
+  public void init() throws IOException {
+    if (targets == null) {
+      targets = schema.toArray();
+    }
+    outTuple = new VTuple(targets.length);
+
+    if (planPart == null) {
+      generatedSql = builder.build(tableName, targets, filter, limit);
+    } else {
+      generatedSql = builder.build(planPart);
+    }
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    if (iter == null) {
+      iter = executeQueryAndGetIter();
+    }
+
+    if (iter.hasNext()) {
+      return iter.next();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public void reset() throws IOException {
+    if (iter != null) {
+      iter.rewind();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (iter != null) {
+      iter.close();
+    }
+  }
+
+  @Override
+  public void pushOperators(LogicalNode planPart) {
+    this.planPart = planPart;
+  }
+
+
+  @Override
+  public boolean isProjectable() {
+    return true;
+  }
+
+  @Override
+  public void setTarget(Column [] targets) {
+    this.targets = targets;
+  }
+
+  @Override
+  public boolean isSelectable() {
+    return true;
+  }
+
+  @Override
+  public void setFilter(EvalNode filter) {
+    this.filter = filter;
+  }
+
+  @Override
+  public void setLimit(long num) {
+    this.limit = num;
+  }
+
+  @Override
+  public boolean isSplittable() {
+    return false;
+  }
+
+  @Override
+  public float getProgress() {
+    return 0;
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    return stats;
+  }
+
+  @Override
+  public Schema getSchema() {
+    return schema;
+  }
+
+  protected SQLBuilder getSQLBuilder(SQLExpressionGenerator exprBuilder) {
+    return new SQLBuilder(dbMetaData, getSQLExprBuilder());
+  }
+  protected SQLExpressionGenerator getSQLExprBuilder() {
+    return new SQLExpressionGenerator(dbMetaData);
+  }
+
+  protected void convertTuple(ResultSet resultSet, VTuple tuple) {
+    try {
+      for (int column_idx = 0; column_idx < targets.length; column_idx++) {
+        final Column c = targets[column_idx];
+        final int resultIdx = column_idx + 1;
+
+        switch (c.getDataType().getType()) {
+        case INT1:
+        case INT2:
+          tuple.put(column_idx, DatumFactory.createInt2(resultSet.getShort(resultIdx)));
+          break;
+        case INT4:
+          tuple.put(column_idx, DatumFactory.createInt4(resultSet.getInt(resultIdx)));
+          break;
+        case INT8:
+          tuple.put(column_idx, DatumFactory.createInt8(resultSet.getLong(resultIdx)));
+          break;
+        case FLOAT4:
+          tuple.put(column_idx, DatumFactory.createFloat4(resultSet.getFloat(resultIdx)));
+          break;
+        case FLOAT8:
+          tuple.put(column_idx, DatumFactory.createFloat8(resultSet.getDouble(resultIdx)));
+          break;
+        case CHAR:
+        case VARCHAR:
+        case TEXT:
+          tuple.put(column_idx, DatumFactory.createText(resultSet.getString(resultIdx)));
+          break;
+        case DATE:
+          Date date = resultSet.getDate(resultIdx);
+          tuple.put(column_idx, DatumFactory.createDate(date.getYear(), date.getMonth(), date.getDay()));
+          break;
+        case TIME:
+          tuple.put(column_idx, DatumFactory.createTime(resultSet.getTime(resultIdx).getTime()));
+          break;
+        case TIMESTAMP:
+          tuple.put(column_idx,
+              DatumFactory.createTimestamp(DateTimeUtil.javaTimeToJulianTime(resultSet.getTime(resultIdx).getTime())));
+          break;
+        case BINARY:
+        case VARBINARY:
+        case BLOB:
+          tuple.put(column_idx,
+              DatumFactory.createBlob(resultSet.getBytes(resultIdx)));
+          break;
+        default:
+          throw new TajoInternalError(new UnsupportedDataTypeException(c.getDataType().getType().name()));
+        }
+      }
+    } catch (SQLException s) {
+      throw new TajoInternalError(s);
+    }
+  }
+
+  private ResultSetIterator executeQueryAndGetIter() {
+    try {
+      LOG.info("Executed SQL Statement: " + generatedSql);
+      Connection conn = DriverManager.getConnection(fragment.uri);
+      Statement statement = conn.createStatement();
+      ResultSet resultset = statement.executeQuery(generatedSql);
+      return new ResultSetIterator((resultset));
+    } catch (SQLException s) {
+      throw new TajoInternalError(s);
+    }
+  }
+
+  public class ResultSetIterator implements Iterator<Tuple>, Closeable {
+
+    private final ResultSet resultSet;
+
+    private boolean didNext = false;
+    private boolean hasNext = false;
+
+    public ResultSetIterator(ResultSet resultSet) {
+      this.resultSet = resultSet;
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (!didNext) {
+
+        try {
+          hasNext = resultSet.next();
+        } catch (SQLException e) {
+          throw new RuntimeException(e);
+        }
+
+        didNext = true;
+      }
+      return hasNext;
+    }
+
+    @Override
+    public Tuple next() {
+      if (!didNext) {
+        try {
+          resultSet.next();
+        } catch (SQLException e) {
+          throw new RuntimeException(e);
+        }
+      }
+      didNext = false;
+      convertTuple(resultSet, outTuple);
+      return outTuple;
+    }
+
+    @Override
+    public void remove() {
+      throw new TajoRuntimeException(new UnsupportedException());
+    }
+
+    public void rewind() {
+      try {
+        resultSet.isBeforeFirst();
+      } catch (SQLException e) {
+        throw new TajoInternalError(e);
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      try {
+        resultSet.close();
+      } catch (SQLException e) {
+        LOG.warn(e);;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
index 04709b0..d3ec273 100644
--- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
+++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
@@ -25,20 +25,22 @@ import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.OverridableConf;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.exception.NotImplementedException;
+import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.exception.TajoRuntimeException;
 import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.storage.FormatProperty;
-import org.apache.tajo.storage.StorageProperty;
-import org.apache.tajo.storage.Tablespace;
-import org.apache.tajo.storage.TupleRange;
+import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.Fragment;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.net.URI;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.SQLException;
 import java.util.List;
 
 /**
@@ -52,6 +54,7 @@ public abstract class JdbcTablespace extends Tablespace {
   static final StorageProperty STORAGE_PROPERTY = new StorageProperty("rowstore", false, true, false, true);
   static final FormatProperty  FORMAT_PROPERTY = new FormatProperty(false, false, false);
 
+  private Connection conn;
 
   public JdbcTablespace(String name, URI uri, JSONObject config) {
     super(name, uri, config);
@@ -59,11 +62,16 @@ public abstract class JdbcTablespace extends Tablespace {
 
   @Override
   protected void storageInit() throws IOException {
+    try {
+      this.conn = DriverManager.getConnection(uri.toASCIIString());
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
   }
 
   @Override
-  public long getTableVolume(URI uri) throws IOException {
-    return 0;
+  public long getTableVolume(URI uri) throws UnsupportedException {
+    throw new UnsupportedException();
   }
 
   @Override
@@ -138,4 +146,18 @@ public abstract class JdbcTablespace extends Tablespace {
   }
 
   public abstract MetadataProvider getMetadataProvider();
+
+  @Override
+  public abstract Scanner getScanner(TableMeta meta,
+                            Schema schema,
+                            Fragment fragment,
+                            @Nullable Schema target) throws IOException;
+
+  public DatabaseMetaData getDatabaseMetaData() {
+    try {
+      return conn.getMetaData();
+    } catch (SQLException e) {
+      throw new TajoInternalError(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLBuilder.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLBuilder.java
new file mode 100644
index 0000000..57a4bba
--- /dev/null
+++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLBuilder.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.jdbc;
+
+import com.google.common.base.Function;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.Target;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.util.StringUtils;
+
+import javax.annotation.Nullable;
+import java.sql.DatabaseMetaData;
+import java.util.Stack;
+
+/**
+ * Generator to build a SQL statement from a plan fragment
+ */
+public class SQLBuilder {
+  @SuppressWarnings("unused")
+  private final DatabaseMetaData dbMetaData;
+  private final SQLExpressionGenerator sqlExprGen;
+
+  public static class SQLBuilderContext {
+    StringBuilder sb;
+  }
+
+  public SQLBuilder(DatabaseMetaData dbMetaData, SQLExpressionGenerator exprGen) {
+    this.dbMetaData = dbMetaData;
+    this.sqlExprGen = exprGen;
+  }
+
+  public String build(String tableName, Column [] targets, @Nullable EvalNode filter, @Nullable Long limit) {
+
+    StringBuilder selectClause = new StringBuilder("SELECT ");
+    selectClause.append(StringUtils.join(targets, ",", new Function<Column, String>() {
+      @Override
+      public String apply(@Nullable Column input) {
+        return input.getSimpleName();
+      }
+    })).append(" ");
+
+    StringBuilder fromClause = new StringBuilder("FROM ");
+    fromClause.append(tableName).append(" ");
+
+    StringBuilder whereClause = null;
+    if (filter != null) {
+      whereClause = new StringBuilder("WHERE ");
+      whereClause.append(sqlExprGen.generate(filter)).append(" ");
+    }
+
+    StringBuilder limitClause = null;
+    if (limit != null) {
+      limitClause = new StringBuilder("LIMIT ");
+      limitClause.append(limit).append(" ");
+    }
+
+    return generateSelectStmt(selectClause, fromClause, whereClause, limitClause);
+  }
+
+  public String generateSelectStmt(StringBuilder selectClause,
+                                   StringBuilder fromClause,
+                                   @Nullable StringBuilder whereClause,
+                                   @Nullable StringBuilder limitClause) {
+    return
+        selectClause.toString() +
+        fromClause.toString() +
+        (whereClause != null ? whereClause.toString() : "") +
+        (limitClause != null ? limitClause.toString() : "");
+  }
+
+  public String build(LogicalNode planPart) {
+    SQLBuilderContext context = new SQLBuilderContext();
+    visit(context, planPart, new Stack<LogicalNode>());
+    return context.sb.toString();
+  }
+
+  public void visit(SQLBuilderContext context, LogicalNode node, Stack<LogicalNode> stack) {
+    stack.push(node);
+
+    switch (node.getType()) {
+    case SCAN:
+      visitScan(context, (ScanNode) node, stack);
+      break;
+
+    case GROUP_BY:
+      visitGroupBy(context, (GroupbyNode) node, stack);
+      break;
+
+    case SELECTION:
+      visitFilter(context, (SelectionNode) node, stack);
+      break;
+
+    case PROJECTION:
+      visitProjection(context, (ProjectionNode) node, stack);
+      break;
+
+    case TABLE_SUBQUERY:
+      visitDerivedSubquery(context, (TableSubQueryNode) node, stack);
+      break;
+
+    default:
+      throw new TajoRuntimeException(new UnsupportedException("plan node '" + node.getType().name() + "'"));
+    }
+
+    stack.pop();
+  }
+
+  public void visitDerivedSubquery(SQLBuilderContext ctx, TableSubQueryNode derivedSubquery, Stack<LogicalNode> stack) {
+    ctx.sb.append(" (");
+    visit(ctx, derivedSubquery.getSubQuery(), stack);
+    ctx.sb.append(" ) ").append(derivedSubquery.getTableName());
+  }
+
+  public void visitProjection(SQLBuilderContext ctx, ProjectionNode projection, Stack<LogicalNode> stack) {
+
+    visit(ctx, projection.getChild(), stack);
+  }
+
+  public void visitGroupBy(SQLBuilderContext ctx, GroupbyNode groupby, Stack<LogicalNode> stack) {
+    visit(ctx, groupby.getChild(), stack);
+    ctx.sb.append("GROUP BY ").append(StringUtils.join(groupby.getGroupingColumns(), ",", 0)).append(" ");
+  }
+
+  public void visitFilter(SQLBuilderContext ctx, SelectionNode filter, Stack<LogicalNode> stack) {
+    visit(ctx, filter.getChild(), stack);
+    ctx.sb.append("WHERE " + sqlExprGen.generate(filter.getQual()));
+  }
+
+  public void visitScan(SQLBuilderContext ctx, ScanNode scan, Stack<LogicalNode> stack) {
+
+    StringBuilder selectClause = new StringBuilder("SELECT ");
+    selectClause.append(StringUtils.join(scan.getTargets(), ",", new Function<Target, String>() {
+      @Override
+      public String apply(@Nullable Target t) {
+        StringBuilder sb = new StringBuilder(sqlExprGen.generate(t.getEvalTree()));
+        if (t.hasAlias()) {
+          sb.append(" AS ").append(t.getAlias());
+        }
+        return sb.toString();
+      }
+    }));
+
+    ctx.sb.append("FROM ").append(scan.getTableName()).append(" ");
+
+    if (scan.hasAlias()) {
+      ctx.sb.append("AS ").append(scan.getAlias()).append(" ");
+    }
+
+    if (scan.hasQual()) {
+      ctx.sb.append("WHERE " + sqlExprGen.generate(scan.getQual()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLExpressionGenerator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLExpressionGenerator.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLExpressionGenerator.java
new file mode 100644
index 0000000..76c687a
--- /dev/null
+++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLExpressionGenerator.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.jdbc;
+
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedDataTypeException;
+import org.apache.tajo.plan.expr.*;
+
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import java.util.Stack;
+
+/**
+ * A generator to build a SQL representation from a sql expression
+ */
+public class SQLExpressionGenerator extends SimpleEvalNodeVisitor<SQLExpressionGenerator.Context> {
+  final private DatabaseMetaData dbMetaData;
+
+  final private String DEFAULT_QUOTE_STR = "'";
+  private String QUOTE_STR;
+
+  public SQLExpressionGenerator(DatabaseMetaData dbMetaData) {
+    this.dbMetaData = dbMetaData;
+    initDatabaseDependentSQLRepr();
+  }
+
+  private void initDatabaseDependentSQLRepr() {
+    String quoteStr = null;
+    try {
+      quoteStr = dbMetaData.getIdentifierQuoteString();
+    } catch (SQLException e) {
+    }
+    this.QUOTE_STR = quoteStr != null ? quoteStr : DEFAULT_QUOTE_STR;
+  }
+
+  public String quote(String text) {
+    return QUOTE_STR + text + QUOTE_STR;
+  }
+
+  public String generate(EvalNode node) {
+    Context context = new Context();
+    visit(context, node, new Stack<EvalNode>());
+    return context.sb.toString();
+  }
+
+  public static class Context {
+    StringBuilder sb = new StringBuilder();
+
+    public void put(String text) {
+      sb.append(" ").append(text).append(" ");
+    }
+  }
+
+  protected EvalNode visitBinaryEval(Context context, Stack<EvalNode> stack, BinaryEval binaryEval) {
+    stack.push(binaryEval);
+    visit(context, binaryEval.getLeftExpr(), stack);
+    context.sb.append(convertBinOperatorToSQLRepr(binaryEval.getType())).append(" ");
+    visit(context, binaryEval.getRightExpr(), stack);
+    stack.pop();
+    return binaryEval;
+  }
+
+  protected EvalNode visitUnaryEval(Context context, UnaryEval unary, Stack<EvalNode> stack) {
+
+    switch (unary.getType()) {
+    case NOT:
+      context.sb.append("NOT ");
+      super.visitUnaryEval(context, unary, stack);
+      break;
+    case SIGNED:
+      SignedEval signed = (SignedEval) unary;
+      if (signed.isNegative()) {
+        context.sb.append("-");
+      }
+      super.visitUnaryEval(context, unary, stack);
+      break;
+    case IS_NULL:
+      super.visitUnaryEval(context, unary, stack);
+
+      IsNullEval isNull = (IsNullEval) unary;
+      if (isNull.isNot()) {
+        context.sb.append("IS NOT NULL ");
+      } else {
+        context.sb.append("IS NULL ");
+      }
+      break;
+
+    case CAST:
+      super.visitUnaryEval(context, unary, stack);
+      context.sb.append(" AS ").append(convertTajoTypeToSQLType(unary.getValueType()));
+    }
+    return unary;
+  }
+
+  protected EvalNode visitRowConstant(Context context, RowConstantEval row, Stack<EvalNode> stack) {
+    StringBuilder sb = new StringBuilder("(");
+
+    boolean first = true;
+    for (Datum d : row.getValues()) {
+      if (!first) {
+        sb.append(",");
+        first = false;
+      }
+      sb.append(convertLiteralToSQLRepr(d));
+    }
+
+    sb.append(")");
+
+    context.put(sb.toString());
+
+    return row;
+  }
+
+  @Override
+  protected EvalNode visitField(Context context, FieldEval field, Stack<EvalNode> stack) {
+    context.put(field.getName());
+    return field;
+  }
+
+  @Override
+  protected EvalNode visitConst(Context context, ConstEval constant, Stack<EvalNode> stack) {
+    context.sb.append(convertLiteralToSQLRepr(constant.getValue()));
+    return constant;
+  }
+
+  /**
+   * convert Tajo literal into SQL representation
+   *
+   * @param d Datum
+   */
+  public String convertLiteralToSQLRepr(Datum d) {
+    switch (d.type()) {
+    case INT1:
+    case INT2:
+    case INT4:
+    case INT8:
+    case FLOAT4:
+    case FLOAT8:
+    case NUMERIC:
+      return d.asChars();
+
+    case TEXT:
+    case VARCHAR:
+    case CHAR:
+      return quote(d.asChars());
+
+    case DATE:
+      return "DATE " + quote(d.asChars());
+
+    case TIME:
+      return "TIME " + quote(d.asChars());
+
+    case TIMESTAMP:
+      return "TIMESTAMP " + quote(d.asChars());
+
+    case NULL_TYPE:
+      return "NULL";
+
+    default:
+      throw new TajoRuntimeException(new UnsupportedDataTypeException(d.type().name()));
+    }
+  }
+
+  /**
+   * Convert Tajo DataType into SQL DataType
+   *
+   * @param dataType Tajo DataType
+   * @return SQL DataType
+   */
+  public String convertTajoTypeToSQLType(DataType dataType) {
+    switch (dataType.getType()) {
+    case INT1:
+      return "TINYINT";
+    case INT2:
+      return "SMALLINT";
+    case INT4:
+      return "INTEGER";
+    case INT8:
+      return "BIGINT";
+    case FLOAT4:
+      return "FLOAT";
+    case FLOAT8:
+      return "DOUBLE";
+    default:
+      return dataType.getType().name();
+    }
+  }
+
+  /**
+   * Convert EvalType the operator notation into SQL notation
+   *
+   * @param op EvalType
+   * @return SQL representation
+   */
+  public String convertBinOperatorToSQLRepr(EvalType op) {
+    return op.getOperatorName();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-jdbc/src/main/test/java/org/apache/tajo/storage/jdbc/JdbcTablespaceTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-jdbc/src/main/test/java/org/apache/tajo/storage/jdbc/JdbcTablespaceTestBase.java b/tajo-storage/tajo-storage-jdbc/src/main/test/java/org/apache/tajo/storage/jdbc/JdbcTablespaceTestBase.java
deleted file mode 100644
index 9eee00a..0000000
--- a/tajo-storage/tajo-storage-jdbc/src/main/test/java/org/apache/tajo/storage/jdbc/JdbcTablespaceTestBase.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.jdbc;
-
-public abstract class JdbcTablespaceTestBase {
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-mysql/src/main/java/org/apache/tajo/storage/mysql/MySQLTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-mysql/src/main/java/org/apache/tajo/storage/mysql/MySQLTablespace.java b/tajo-storage/tajo-storage-mysql/src/main/java/org/apache/tajo/storage/mysql/MySQLTablespace.java
index d3a0c6a..4a2171e 100644
--- a/tajo-storage/tajo-storage-mysql/src/main/java/org/apache/tajo/storage/mysql/MySQLTablespace.java
+++ b/tajo-storage/tajo-storage-mysql/src/main/java/org/apache/tajo/storage/mysql/MySQLTablespace.java
@@ -18,11 +18,17 @@
 
 package org.apache.tajo.storage.mysql;
 
+import com.google.common.base.Preconditions;
 import net.minidev.json.JSONObject;
 import org.apache.tajo.catalog.*;
+import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.jdbc.JdbcFragment;
 import org.apache.tajo.storage.jdbc.JdbcTablespace;
 
+import javax.annotation.Nullable;
+import java.io.IOException;
 import java.net.URI;
 
 /**
@@ -42,4 +48,27 @@ public class MySQLTablespace extends JdbcTablespace {
   public MetadataProvider getMetadataProvider() {
     return new MySQLMetadataProvider(this, database);
   }
+
+  @Override
+  public Scanner getScanner(TableMeta meta,
+                            Schema schema,
+                            Fragment fragment,
+                            @Nullable Schema target) throws IOException {
+    if (!(fragment instanceof JdbcFragment)) {
+      throw new TajoInternalError("fragment must be JdbcFragment");
+    }
+
+    if (target == null) {
+      target = schema;
+    }
+
+    if (fragment.isEmpty()) {
+      Scanner scanner = new NullScanner(conf, schema, meta, fragment);
+      scanner.setTarget(target.toArray());
+
+      return scanner;
+    }
+
+    return new MySQLJdbcScanner(getDatabaseMetaData(), schema, meta, (JdbcFragment) fragment);
+  }
 }


Mime
View raw message